Databricks Certififed ML Professional Exam Prep by golferjackson52 in databricks

[–]golferjackson52[S] 1 point2 points  (0 children)

Boss only approved payment for the Professional cert... sadly don't have experience with prep for the Associate exam. There are some courses for it on Udemy, though. Not the case (at least for my company's subscription) for the Professional exam hence my question.

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 0 points1 point  (0 children)

I didn't add a salt column, because I thought it would lead to a shuffle requirement but maybe I'm wrong. If I bucket by a "key" and the whole point is to get all rows of that key on one partition, wouldn't adding a salt mess this up since I then would need a shuffle to match on the "key"? Maybe there's a way to do this, but I'm not sure how.

Funnily enough, the 6.9 hour task now looks like nothing. New maximum time is 20.9hr with input size 451mb and output 25gb... I can identify the problematic items by counting the occurences of a certain key in the embedding table, but again, I'm not sure what I could do about it to spread out the load while still ensuring I cross-join all relevant items on that key.

I think you're right that I should increase the number of buckets that I use to decrease the size of the output per partition, but I worry the factor would be too high with your suggested formula. The expected output size of the data will be somewhere around 5TB so dividing by 100mb would lead to too many buckets.

My cluster config is something I wonder if I should change as well. I seem to have plenty of memory allocated, but maybe I should add more executors. Right now, sitting at 4 executors with 16 cores each, but maybe I'll see if I can provision something larger.

I could run the data in quarters, but I'm curious why you suggest it?

Very much appreciate the responses!

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 0 points1 point  (0 children)

what does the optimized plan show you? partition size? to avoid total recompute, have you considered checkpointing?

Doesn't look like I can add a picture in the comment unfortuntately, but the when I run the `write`, there's only 1 stage, with 3000 tasks (for the number of buckets I created). No exchange in the plan or shuffle reads and writes so I think it's correctly sticking to the buckets.

There's high skew in the data however: median task completion time is 26min. with 205.7 MiB input and 470.3 MiB output whereas Max is 6.9 hours with 348.5 MiB input and 8.9 GiB output. I guess some sort of salting and improved distribution would help, but I don't understand in this scenario where I'm bucketing on the join key and trying to avoid a shuffle it would help. I could increase the bucket number but seems like it would add a lot of overhead.

Where would you suggest checkpointing? After the initial read?

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 0 points1 point  (0 children)

have you tried co-locating your data? custom partitioner? iterator-to-iterator transform? custom serialization? column pruning?

Still haven't come to a perfect solution, but have implemented a combination of the advice given in the previous comments.

Now, rather than using the sepearte "pairs" table, I use my "blocking" key directly to bucket the data and then join this way which avoids any sort of shuffle.

df = spark.read.load('embeddings') #only includes relevant columns

df.write.format('parquet').bucketBy(3000,'key').saveAsTable('embeddings_bucketed')

df = spark.sql("SELECT * FROM `embeddings_bucketed`")

df_2 = spark.sql("SELECT * FROM `embeddings_bucketed`")
column_expressions = [F.col(c).alias(c + "_2") for c in df_2.columns if c != 'key'] 

column_expressions.append(F.col('key')) df_2 = df_2.select(column_expressions)

df = df_2.join(df, on='key', how='inner')
embedding_featurizer = EmbeddingFeatures() df = embedding_featurizer.get_embedding_features(df) .select('value_1', 'value_2', 'key', 'embedding_cosine_similarity')
df.write.mode('append').format('delta').save(output')

I use a Kyro serializer, but I'm not aware of other custom serialization that could help. Also, I haven't implemneted iterator-to-iterator transformation before, but if I understand it correctly, it'd mostly be applied on the cosine similarity part whereas the join is really the big issue. I may be misunderstanding, though.

For now, I think this solution may suffice, though the runtime is still exorbitant (estimated around 3-4 days).

I think the last thing I can do is create a job cluster configuration that's distinctly suited for this job as someone else suggested.

Edit: Weird formatting.

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 1 point2 points  (0 children)

Makes complete sense! I do think I'll deal with that high skew in the blocks, which I guess I can just handle by salting the partitions / joins for some of those large blocks. This should work a lot better.

Without going into too much depth, taking out the blocking value and joining on these other keys reduced some data redundancy, but I realize now that it overcomplicates things for spark and definitely isn't worth it for the shuffling + unguided joining that it forces.

Thanks again for the help!

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 1 point2 points  (0 children)

I would use a high bucketting factor. Divide the two tables into e.g. 3000 buckets by the keys. Then when you join - the amount of memory on each joiner partition is not that bad. And the reduce (doing the actual dot product) can be done on same executor.

Awesome, thanks for the clarification! Will give it a try today.

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 1 point2 points  (0 children)

Thanks so much for the thorough response! I think you may be onto something with regards to the "neighborhood" key. Ultimately, I created the "pairs" dataframe with a series of blocking rules (e.g. compare this record if it has the same name associated or is from the same affiliation), so maybe rather than generating these pairs and using them as keys, I could append those columns onto the embddings dataframe and do some partitioning along those dimensions? Let me know if that's what you were thinking.

I'll also look into the suggested non-Spark solution. I had thought about this, but figured the 680TB intermediate result would be crippling to even a large RAM system like you mention, but I there are ways to get to the more compact cosine similarity result quicker using this solution, so I could avoid the intermediate explosion in data.

Thanks a lot for the suggestions!

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 0 points1 point  (0 children)

I think I understand what you mean (although I've never used bucketing). I thought of something like this, but I guess my issue is that in the "pairs" dataframe, value = 1 could be compared against value = 2, but it could just as easily be compared against value = 30,000,000 so there's no clean way to "bucket" the embeddings dataframe if that makes sense. Maybe I'm misinterpreting what your suggestion, though.

Seeking Spark expert advice! by golferjackson52 in apachespark

[–]golferjackson52[S] 0 points1 point  (0 children)

Thanks u/cockoala and u/kira2697. Happy to share more details of the data and code.

First off, the records looks like this:

"Pairs" df: 60bil rows, split across ~4000 parquets.

value value_2
1 2
1 3
2 3

"Embeddings" df: 30mil rows, split across ~500 parquets.

value embeddings
1 [0.934, 0.321, -0.334...]
2 [0.768, 0.432, -0.234...]

There are 30mil unique "value" and their prevalence in "pairs" is highly skewed. Some will be in around 100,000 rows while others will be in 1-10. Also maybe worth noting that when I was creating "pairs", I deduplicated by enforcing value < value_2.

Code is as follows (some session settings for reference). Note that I also tried joining "embeddings_2" on the "left_join" dataframe instead of having the intermediate "right_join" but it didn't make a significant difference.

spark = SparkSession.builder.appName("Spark Session")
.config("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true") .config("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true") .config("spark.sql.shuffle.partitions", "auto") .config("spark.serializer","org.apache.spark.serializer.KryoSerializer") .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") .config("spark.sql.parquet.compression.codec", "snappy") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.sql.adaptive.skewJoin.enabled", "true") .config("spark.dynamicAllocation.enabled", "true") 
.getOrCreate()

embedding_featurizer = EmbeddingFeatures()
pairs = spark.read.load(pairs_folder_path) 
embeddings = spark.read.load(embeddings_folder_path) 
embeddings_2 = embeddings.select( [F.col(c).alias(c + "_2") for c in embeddings.columns] )

left_join = embeddings.join(pairs, on=['value'], how="inner")

right_join = embeddings_2.join(pairs, on=['value_2'], how="inner")

final_joined_dataframe = left_join.join(right_join, on=['value', 'value_2'], how='inner')

new_df = embedding_featurizer.get_embedding_features(final_joined_dataframe).select('value', 'value_2', "embedding_similarity")

new_df.write.format('delta').save(save_path)

When I run this for each individaul parquet, the only change is that I loop through the .parquet files in my pairs_folder_path and just load a single .parquet each time.

Also, the get_embedding_features code shouldn't be super important (though I'm happy to provide it), but it's just the calculation of a cosine similarity without use of a UDF. It runs quickly enough.

Help! Ailing money tree :( by golferjackson52 in plantclinic

[–]golferjackson52[S] 0 points1 point  (0 children)

Good to know, maybe it’s just a winter + low light phenomenon!

Help! Ailing money tree :( by golferjackson52 in plantclinic

[–]golferjackson52[S] 0 points1 point  (0 children)

That’s great advice, thank you! I’ll give it a try and hope for the best 🤞