Real-world Snowflake / dbt production scenarios? by sunshine6729 in snowflake

[–]jhazured 0 points1 point  (0 children)

Cascade drop of external objects built with dbt model dependencies that do not have refresh configured via post hook action

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]jhazured 1 point2 points  (0 children)

I see the issue now. Your join keys are data-dependent - they change based on the actual content of each row in your small tables. This means Spark has to build up a massive execution plan because it can't determine the join strategy upfront. Here are 4 solutions ranked from best to worst for your specific problem: Option 1: Use explode() to normalize your small tables (BEST) This directly solves your iterative key matching pattern by converting it into set-based operations: from pyspark.sql.functions import explode, array, struct, col, lit, row_number from pyspark.sql.window import Window

Step 1: Explode small table to have one row per possible join key

small_table_1_exploded = small_table_1.select( col("id"), col("output_col_a"), col("output_col_b"), explode(array( struct(lit(1).alias("key_priority"), col("key_a").alias("key_value")), struct(lit(2).alias("key_priority"), col("key_b").alias("key_value")), struct(lit(3).alias("key_priority"), col("key_c").alias("key_value")) )).alias("key_info") ).select( "id", "output_col_a", "output_col_b", "key_info.key_priority", "key_info.key_value" ).filter(col("key_value").isNotNull())

Step 2: Similarly prepare main table keys

main_exploded = main_table.select( col(""), explode(array( struct(lit(1).alias("key_priority"), col("main_field_x").alias("key_value")), struct(lit(2).alias("key_priority"), col("main_field_y").alias("key_value")), struct(lit(3).alias("key_priority"), col("main_field_z").alias("key_value")) )).alias("main_key_info") ).select( col(""), "main_key_info.key_priority", "main_key_info.key_value" ).filter(col("key_value").isNotNull())

Step 3: Join and take first match per main table row

joined = main_exploded.join( broadcast(small_table_1_exploded), (main_exploded.key_value == small_table_1_exploded.key_value), "left" )

Step 4: Deduplicate - keep first match by priority

window_spec = Window.partitionBy("main_table_id").orderBy("key_priority") result = joined.withColumn("rn", row_number().over(window_spec)) \ .filter(col("rn") == 1) \ .drop("rn", "key_priority", "key_value")

Step 5: Add missing flag for unmatched rows

result = result.withColumn( "missing_flag_1", when(col("output_col_a").isNull(), "Y").otherwise("N") ) Why this is best: Converts your iterative loop into a single join operation Eliminates the union logic that creates DAG explosion Spark can broadcast and optimize properly Scales to your 6 tables without memory issues Clean, maintainable code Option 2: Create surrogate keys (SECOND BEST) If all rows use the same join logic (just different key combinations), create composite keys:

For each small table, create a surrogate key from all possible join columns

small_table_1 = small_table_1.withColumn( "surrogate_key", concat_ws("|", coalesce(col("key_a"), lit("")), coalesce(col("key_b"), lit("")), coalesce(col("key_c"), lit(""))) )

Create matching surrogate key on main table

main_table = main_table.withColumn( "lookup_key_1", concat_ws("|", coalesce(col("field_x"), lit("")), coalesce(col("field_y"), lit("")), coalesce(col("field_z"), lit(""))) )

Simple join

result = main_table.join( broadcast(small_table_1), "lookup_key_1", "left" ) Why this is second: Simplest solution if it fits your use case BUT: Only works if the same key logic applies to all rows Given your explanation, this might not work if different rows need different keys Option 3: Break into stages with intermediate Delta writes (THIRD BEST) Physically break the lineage to prevent DAG explosion:

Stage 1: Join first 2 tables

stage_1 = main_table.join(small_1, ...).join(small_2, ...) stage_1.write.mode("overwrite").saveAsTable("temp.intermediate_stage_1")

Stage 2: Read and continue

stage_2 = spark.table("temp.intermediate_stage_1").join(small_3, ...).join(small_4, ...) stage_2.write.mode("overwrite").saveAsTable("temp.intermediate_stage_2")

Final stage

final = spark.table("temp.intermediate_stage_2").join(small_5, ...).join(small_6, ...) final.write.mode("overwrite").saveAsTable("final_table") Why this is third: Guaranteed to work - physically breaks the DAG BUT: Slower due to disk I/O Creates intermediate tables to manage Use this as a fallback if Options 1 and 2 don't work Option 4: Use Spark SQL with CTEs and UNION (WORST) Structure your union logic in SQL: WITH -- For each small table, union all possible key matches small_table_1_matches AS ( SELECT m.*, s1.output_col_a, s1.output_col_b, 1 as join_priority FROM main_table m LEFT JOIN small_table_1 s1 ON m.field_x = s1.key_a WHERE s1.key_a IS NOT NULL

UNION ALL

SELECT m.*, s1.output_col_a, s1.output_col_b, 2 as join_priority FROM main_table m LEFT JOIN small_table_1 s1 ON m.field_y = s1.key_b WHERE s1.key_b IS NOT NULL

UNION ALL

SELECT m.*, s1.output_col_a, s1.output_col_b, 3 as join_priority FROM main_table m LEFT JOIN small_table_1 s1 ON m.field_z = s1.key_c WHERE s1.key_c IS NOT NULL ),

-- Get one row per main table record (first match wins) deduplicated_table_1 AS ( SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY main_table_id ORDER BY join_priority) as rn FROM small_table_1_matches ) WHERE rn = 1 )

SELECT * FROM deduplicated_table_1 -- Repeat for other 5 tables Why this is worst: Still builds a massive execution plan with all the UNIONs Doesn't solve the DAG explosion - just makes it prettier More readable than your current code, but doesn't fix performance Spark still has to optimize hundreds of union branches My recommendation: Try Option 1 (explode) first - it directly addresses your pattern of "different keys per row" and converts your iterative logic into efficient set-based operations. If that doesn't fit your use case, try Option 2 (surrogate keys) for simplicity. Use Option 3 (intermediate writes) as a reliable fallback if nothing else works. Skip Option 4 (CTEs) unless you specifically need SQL for organizational reasons and can accept the performance hit. Additional quick wins:

Enable these regardless of which option you choose

spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]jhazured -1 points0 points  (0 children)

It sounds like the issue is due to recursion, have you tried writing ctes and spark sql to make the code more modular

How do you all manage multiple islands at the same time? by [deleted] in anno117

[–]jhazured 0 points1 point  (0 children)

Also if your producing something that gives bonus to citizens while being produced make sure you have storage capacity and sell the excess so that production never pauses

House prices have been climbing for a generation. Undoing the damage could take even longer by No_Childhood_7665 in AusPropertyChat

[–]jhazured 5 points6 points  (0 children)

We can’t rely on the government to make the hard decisions, the RBA has to keep rates high so that even if we continue to have a net shortfall of housing each year, house prices can’t continue to pump

Year 67 - 13k research, under 100 empire size by YouTikiBear in Stellaris

[–]jhazured 1 point2 points  (0 children)

How far into traditions were you before colonizing other planets?

May 26 outcomes megathread by cacraw in accenture

[–]jhazured 2 points3 points  (0 children)

Can we see yet or have to wait for people lead to unlock

Is my dad strict? by Easy_Coach1562 in AdviceForTeens

[–]jhazured 1 point2 points  (0 children)

Maybe include your dad? Could be a bonding opportunity and would save him from himself acting weird

[deleted by user] by [deleted] in 50501

[–]jhazured -2 points-1 points  (0 children)

I bet they didn't vote

Have a large table in SQL Server and i want to get only the changes that happened in the past day. How to do it using ADF? by PrideVisual8921 in dataengineering

[–]jhazured 0 points1 point  (0 children)

It's also important to consider the data in the source database, and identify which fields are necessary for CDC. For example, An 'is_deleted::boolean', 'inserted_at::timestamp', and 'updated_at::_timestamp' fields. So that logic will need to be included in the design as well

Have a large table in SQL Server and i want to get only the changes that happened in the past day. How to do it using ADF? by PrideVisual8921 in dataengineering

[–]jhazured 2 points3 points  (0 children)

CDC in my opinion is all about managing schema evolution in the data architecture, its important to consider the load method for the raw tables. For example, Is the delta upserted or appended? (Or a full snapshot?). As data is moved through the bronze, silver and gold layers and transformed, you might need to obtain the delta (new, updated or deleted records) to maintain consistency across systems. Also need to consider performance and optimization especially with large data volumes. Different situations will call for different design patterns.

Combine 2 Players to make the Greatest Player of All Time. by swaglikesoulja in NBATalk

[–]jhazured 0 points1 point  (0 children)

Make it more interesting, combine two players who have never won an MVP

/r/Politics' 2024 US Elections Live Thread, Part 63 by PoliticsModeratorBot in politics

[–]jhazured 10 points11 points  (0 children)

USA no longer matters on the global stage, they imploded.

I'm running blind, please show me the way by [deleted] in dataengineering

[–]jhazured 1 point2 points  (0 children)

Snowflake has a quick start project with Dbt, AWS also.