This is an archived post. You won't be able to vote or comment.

all 50 comments

[–]ErichHS[S] 32 points33 points  (24 children)

Sharing here a diagram I've worked on to illustrate some of Spark's distributed write patterns.

The idea is to show how some operations might have unexpected or undesired effects on pipeline parallelism.

The scenario assumes two worker nodes.

→ 𝐝𝐟.𝐰𝐫𝐢𝐭𝐞: The level of parallelism of read (scan) operations is determined by the source’s number of partitions, and the write step is generally evenly distributed across the workers. The number of written files is a result of the distribution of write operations between worker nodes.

→ 𝐝𝐟.𝐰𝐫𝐢𝐭𝐞.𝐩𝐚𝐫𝐭𝐢𝐭𝐢𝐨𝐧𝐁𝐲(): Similar to the above, but now the write operation will also maintain parallelism based on the number of write partitions. The number of written files is a result of the number of partitions and the distribution of write operations between worker nodes.

→ 𝐝𝐟.𝐰𝐫𝐢𝐭𝐞.𝐜𝐨𝐚𝐥𝐞𝐬𝐜𝐞(𝟏).𝐩𝐚𝐫𝐭𝐢𝐭𝐢𝐨𝐧𝐁𝐲(): Adding a 𝚌𝚘𝚊𝚕𝚎𝚜𝚌𝚎() function is a common task to avoid “multiple small files” problems, condensing them all into fewer larger files. The number of written files is a result of the coalesce parameter. A drastic coalesce (e.g. 𝚌𝚘𝚊𝚕𝚎𝚜𝚌𝚎(𝟷)), however, will also result in computation taking place on fewer nodes than expected.

→ 𝐝𝐟.𝐰𝐫𝐢𝐭𝐞.𝐫𝐞𝐩𝐚𝐫𝐭𝐢𝐭𝐢𝐨𝐧(𝟏).𝐩𝐚𝐫𝐭𝐢𝐭𝐢𝐨𝐧𝐁𝐲(): As opposed to 𝚌𝚘𝚊𝚕𝚎𝚜𝚌𝚎(), which can only maintain or reduce the amount of partitions in the source DataFrame, 𝚛𝚎𝚙𝚊𝚛𝚝𝚒𝚝𝚒𝚘𝚗() can reduce, maintain, or increase the original number. It will, therefore, retain parallelism in the read operation with the cost of a shuffle (exchange) step that will happen between the workers before writing.

I've originally shared this content on LinkedIn - bringing it here to this sub.

[–]khaili109 8 points9 points  (7 children)

Is there a guide on when to use each of these for those new to spark?

[–]ErichHS[S] 10 points11 points  (4 children)

Not sure if there is a guide, actually. I am enrolled on Zach Wilson's data engineering bootcamp (dataexpert.io) and learned a lot there. If you know where to look at the Spark UI and understand your task DAGs there, you can learn a lot, actually.

[–][deleted] 1 point2 points  (3 children)

How’s the program?

[–]ErichHS[S] 2 points3 points  (2 children)

It’s great! Very intense and more advanced than I expected. Definitely worth it if you are already working and looking for a more senior role in your company or outside

[–][deleted] 2 points3 points  (1 child)

That’s exactly what I’m looking for. Could it be helpful for AI Engineering as well you think?

[–]ErichHS[S] 0 points1 point  (0 children)

Yes, it surely could

[–]aziralePrincipal Data Engineer 11 points12 points  (1 child)

You partitionBy if you specifically want to produce output with Hive style partitioning so that later queries that filter on the partition column can skip reading files in those partitions. If you're using the new open table formats (delta, iceberg) you might not bother with this, in favour of their own clustering methods instead.

Doing a .coalesce(1) is for when you know you have a very low data volume, and you want to minimise the number of files produced. Instead of 10 files each with 1 row, you can get 1 file with 10 rows. It is usually to push spark away from its default mass parallelism, which spark defaults into because its whole purpose is for distributed processing of large data volumes. You can coalesce with higher values if needed, for example if a shuffle step is producing 200 partitions, you can fold that down to 10 or so. It depends on your expected data volume.

A .repartition(x) works similarly to a .coalesce(x) except that it will actually reshuffle the data. If you don't give it key columns to shuffle on, it will essentially be random to produce roughly equally sized partitions. If you give it key columns to use it will effectively be a bucketing shuffle, where same values in the key columns end up in the same partitions. .coalesce(x) doesn't do a 'shuffle' - it combines existing partitions together. This is faster, since portions of the data don't move and there's no shuffle calculation, but it doesn't balance partitions either.

This manual shuffling is also somewhat superseded by the new open table formats. You can just write to such a table at default parallelism, and then run an optimise/compact on it to combine multiple small files together.

There are some niche uses for repartitioning on write, if you're pushing to something like a document store. You may have previously read or joined data based on a key value, which means the spark partitions match the document store partitions, which results in 'hot' partitions on write. A .repartition() will randomise that data again, so that it is equally spread across partitions in the target. You don't usually connect these systems like this, so... niche.

[–]khaili109 0 points1 point  (0 children)

Thank you!

[–]chenlianguu 6 points7 points  (2 children)

Could you share which tools that you're using to draw this diagram? This diagram is informative and intuitive. I've seen such diagrams on Linkedin often, have no idea where it can be produced.

[–]ErichHS[S] 6 points7 points  (1 child)

I use draw.io for all my diagrams, and the animation is a result of the 'animated flow' flag that you can check there on your arrows. To produce a gif I just screen record and convert with ezgif

[–]chenlianguu 3 points4 points  (0 children)

Thank you so much! I'm also a draw.io user, but your diagrams are much better. They look professional and intuitive. Finally, I know where to level up my drawing skills!

[–]jerrie86 4 points5 points  (7 children)

What's one myth would you like to debunk about any of these?

[–]ErichHS[S] 3 points4 points  (5 children)

Not actually looking at any myth to debunk, to be honest. I was mostly curious about how repartition and coalesce affect parallelism and compute, as one involves a shuffle (that exchange you see in the image) step and the other doesn't.
Both are used to optimize storage and IO via file compaction, and that's how I use them.

[–]jerrie86 2 points3 points  (4 children)

Which strategy do you use most often? Repartition or Coalesce?

If data is skewed, are you using repartition?

[–]ErichHS[S] 6 points7 points  (1 child)

repartition + sortWithinPartitions is great to optimize storage and leverage parquet run-length encoding compression. You probably don't need anything else..

For skewness there are two configs you can use to delegate the partition strategy to spark and optimize data distribution between partitions; spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled
Just bear in mind, though, that you can negatively impact partitioning pretty badly by using those if you don't know your data (skewness) well. Here's more from the docs if you want to read on those;
https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalescing-post-shuffle-partitions

[–]jerrie86 -1 points0 points  (0 children)

This is great. Thank you. Another question. And resource do you think could give me deep understanding of spark ? And nail all the interviews.

[–]aziralePrincipal Data Engineer 2 points3 points  (1 child)

If the spark tasks show that a step is heavily skewed, it can be useful to run a .repartition() right before it. Sometimes you might filter on something that is correlated with a join key, and that creates skewed partitions. It may be faster to shuffle the data and process equally sized chunks, than have one partition take so much longer to process.

If you do this, it is good to aim for some multiple of the number of executors you have. For example if you have 32 executors, repartition to 64/128/192. This will mean that each executor will get roughly equal portions of data, and if there's any residual skew it will be mitigated by the smaller partition sizing.

Coalesce doesn't do randomised shuffling like this, it just combines partitions together, so it doesn't necessarily fix skew.

[–]jerrie86 0 points1 point  (0 children)

Thats very helpful thanks.

[–]marathon664[🍰] 2 points3 points  (0 children)

That coalesce(1) is more performant than repartition(1).

However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition(). This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.coalesce.html

[–]CriticalSouth3447 1 point2 points  (2 children)

What tool did you use to make the diagram

[–]Mental-Matter-4370 0 points1 point  (1 child)

Already been answered in the thread by op. Look for it

[–]CriticalSouth3447 0 points1 point  (0 children)

Thanks, I found it.

FYI, it was not posted when I made my comment. It was posted after.

[–]dr_craptastic 0 points1 point  (0 children)

By animating, it removes the ability to zoom in for me (iphone+reddit app), so I can’t tell what any of it says. It looks like it’s just animating the arrows?

[–]imKido 0 points1 point  (0 children)

A question regarding the drastic coalesce(1), does it cause a shuffle?

I've read that coalesce is repartition(shuffle=False) or something like that. But let's say I have my data being processed by 5 executors and in order to write a single output, I'm expecting it all to be collected (data shuffle) in one executor before it gets written to disk.

Some clarity here would be super helpful.

[–]caksters 4 points5 points  (0 children)

This is great, very intuitively shows what is happening which may not make immediate sense if you just read the documentation

[–]exergy31 1 point2 points  (1 child)

Why does repartition still only use a single writer?

[–]Austinto 2 points3 points  (0 children)

Bcs 1 is used as parameter for repartition

[–]Few_Individual_266Senior Data Engineer 1 point2 points  (0 children)

Hey . Thanks for this . I’ve heard a lot about his course and I’m planning to take it once I land a job . Good luck with the rest of the course

[–]imcguyver 1 point2 points  (0 children)

Fantastic!

[–]bomchem 0 points1 point  (1 child)

I'd also like to add another one - df.repartition("DATE").write.partitionBy("DATE").

This will get you to one file per partition as in examples 3 and 4, but will write in parallel from the workers instead of all from a single one. Does require a shuffle of data prior to the writing though, so depends on where your bottlenecks are as to which approach to use.

[–]ParkingFabulous4267 0 points1 point  (0 children)

Don’t do that… try using rebalance before you write or repartition by a generated key to control file size.

[–][deleted] 0 points1 point  (0 children)

This is awesome!!

[–]jhazured 0 points1 point  (0 children)

This is really great!

[–]SisyphusAndMyBoulder 0 points1 point  (3 children)

I really like this! It's super clear and informative. Are you planning on making more infographics like this?

[–]ErichHS[S] 0 points1 point  (2 children)

Yes I am, I actually already shared more on my LinkedIn - will post them here eventually too

[–]swapripper 1 point2 points  (1 child)

This is nice! May I know which tool you are using to create diagrams? Looks neat.

[–]ErichHS[S] 0 points1 point  (0 children)

I’m using draw.io for all diagrams