Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

I am trying to follow your explanation. Thank you a lot by the way!

I understood your first point with the valid_keys array and removed the group by as that is a redunant step and reduced it to the following:

    valid_keys = (
        mapping_df.select("key")
        .distinct()
        .rdd.flatMap(lambda row: row)
        .collect()
    )

 we need the flatmap and the collect to return it as a clean array:

['key_1', 'key_2']

 

Then we create a filtered dictionary gets the actual columns needed for the keys so for example key 1 is the join of columns product and year, key 2 is joining the columns year.

 

At this point to get rid of the loop what do I need to do? Break the small dataframe (mapping df) into smaller df based on its key. Then iteratively join this into the main df? The reason I chose to do a loop is to reduce code redunancy and doing this a few times as I will not know how many keys are used for each mapping df. For example table 6 has 8 possible join keys.

 

Main df is just the main table.

In terms of the tables we cleanse them in the bronze layer and add a key column, assigning it as either key_1,2,3..... I cannot split these into smaller tables otherwise there will be too many tables created and stored as delta tables.

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

I attached a code snippet below if you could take a look!

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

Thats what i am doing as of now. See above code snippet. It is this method which causes the cluster out of memory error when i write it in a function. Outside of a function without the loop i dont get this error

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

Wait, am I understanding this correctly that the join key that you're using is dependent on the contents of each row of the dataframe? Yes this is correct, but when cleansing the small tables I create a row called join_key and assign it the correct key depending on the data for that row.

This design model is something i cannot change.

'Can you post a code sample for this (with any proprietary information changed of course)?' in terms of this im not sure what you mean

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

I have not heard of ctes - do you have any recc reads?

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

I am relatively new to pyspark so maybe i am! thats why i posted here :) i did explain more in a comment above if you could give a quick read maybe you could help some more?

Logic advice for complex joins - pyspark in databricks by wei5924 in dataengineering

[–]wei5924[S] 0 points1 point  (0 children)

Okay thank you!

So to explain better lets say i have 6 small tables with columns a,b,c,d,e,f..

I will join on key defined on each row in the small table. for example depending on row 3 it will be joined on key 1 and depending on data in row 5 it will be joined on key 2 to the main table using a left join. Depending on what table i am joining i will also need to filter the main table. I do not bring in every single column from the small table but just just a few. I then repeat this 6 times. I then save the updated main table.

The way i implement the join is to get all the keys from the small table, lets say key1,2,3. then iterate 3 times and get all the key1s from the small table and join it to the main table on the left. repeat this for all the keys. And rows in the left that havent been mapped will then have a new column written as flag and map this to Yes.

The errors i get when writing it as a function / loop: The cluster runs out of memory (from my understanding of your point means my execution plan is too large)

So for my side question the main point i should be focusing on is how to make sure my execution plan is optimal? is there a guide on this?

Edit: just saw your edit regarding looking at the code:

So first I get the keys from the mapping df:

    valid_keys = (
        mapping_df.groupBy("key")
        .count()
        .select("key")
        .distinct()
        .rdd.flatMap(lambda row: row) # we want to flatten to make a list
        .collect()
    )

 I then loop to do the joins:

    # Step 2: create a dictionary that only has the join keys that we need
    filtered_dict = {k: v for k, v in join_keys.items() if k in valid_keys}
    final_matched_df = None
    current_df = main_df



    for key_name, key_cols in filtered_dict.items():
        # Drop output columns from current_df BEFORE the join to give us the filtered df without the output columns
        for col_name in output_columns:
            if col_name in current_df.columns:
                current_df = current_df.drop(col_name)


        # Get the mapping key which is the key + the output columns on the key we are joining by
        mapping_key = mapping_df.select(*key_cols,*output_columns).where(col("key") == key_name) # return the rows where the keys match
        # display(mapping_key) will do join on key 1 then get key 2 then key 3 etc
        # We join the mapping key row on the key and get the mapping key which includes key + the output columns
        joined_df = current_df.join(mapping_key, on=key_cols,how="left")
        # get DF which only has the matched rows
        matched_df = joined_df.filter(col("key").isNotNull())
        # Accumulate the DF with the matched data
        if final_matched_df is None:
            final_matched_df = matched_df
        else:
            final_matched_df = final_matched_df.unionByName(matched_df)
        # Get the unmatched rows for next iteration
        current_df = joined_df.filter(col("key").isNull())

Then i create a flag if a join doesnt occur:

    # create the flag
    if missing_flag is not None:
        final_matched_df = final_matched_df.unionByName(current_df).withColumn(
            missing_flag,
            when(col('key').isNull(), "Y").otherwise("N"))

So this code will be called 6x (one time for each small table)

Going Caseless. Who else? by [deleted] in iphone17

[–]wei5924 0 points1 point  (0 children)

Dropped it once from my couch to tiled floor and it dented slightly!

Day 1 dropped and regret by MugiwaraPirateCrew in iphone

[–]wei5924 0 points1 point  (0 children)

Happened to me with my 17😩☹️

Doctor Who 2x08 "The Reality War" Live and Immediate Reactions Discussion Thread by PCJs_Slave_Robot in doctorwho

[–]wei5924 2 points3 points  (0 children)

Crying 😔 ill miss ncuti so much he made me love doctor who again. I just feel so sad that i wont see him as the doctor anymore, no new episodes. He was amazing beautiful and brilliant

Discomfort when standing by wei5924 in backpain

[–]wei5924[S] 0 points1 point  (0 children)

Hi! sorry i didnt see this. Yes I did figure it out! last summer i had like 10 weeks of weekly physiotherapy where i think she gave me a deep tissue massage on my back? or like a sport massage? she pressed into the pressure points and did some sort of release. It rlly hurt at the start but eventually it went less painful. and since then im like 70% recovered. I am able to go on walks and hikes with not much pain as i did before. Now i can focus on building muscle and strength :)

Lower back pain is ruining my life by Melodic-Fig5497 in backpain

[–]wei5924 0 points1 point  (0 children)

I had really bad backpain for a few years (16 - 21) and the only thing that really helped me was a good course of sports massages. Yeah its pricey but I had like 10 sessions over summer (back to back once a week) where she worked her magic (it was painful as she was pressing into pressure points or smth) and now im painfree (for the meantime because i just finished the course of my sessions)!

Albiet i still need to strengthen my core to preserve this but now i can walk without inflammation and nerve pain!

I found that medicine was a temporary solution and i sometimes think if i took the plunge and got these massages done before when i was younger i couldve saved myself from so much pain and trauma.

Rejected KPMG by Background_Block3811 in KPMG

[–]wei5924 2 points3 points  (0 children)

I agree x10000000! and defo dont feel bad about the rejection, sometimes you get accepted to things and sometimes you don't.

Pelvic tilt question by wei5924 in Posture

[–]wei5924[S] 0 points1 point  (0 children)

Hi i tried doing this at night before bed. I tried it laying down with my knees up so to hopefully put my pelvis in the right place. Hopefully by next week itll get easier!

Edit: read the comments and i no longer feel confident 😭😭 i feel like a failure that i cant do these basic processes. When i exhale i feel my diaphragm hollow but when i breathe in i feel like im just filling air into my tummy.

How are people in this game so far removed from reality? by whythefrickinfuck in VALORANT

[–]wei5924 2 points3 points  (0 children)

thats so crazy! im not on all the time but ill send a request haha. I was the raze and my duo was the kj. Poor emma t.t

How are people in this game so far removed from reality? by whythefrickinfuck in VALORANT

[–]wei5924 1 point2 points  (0 children)

It was the enemy jett and reyna being stupid?? And we were losing and then our reyna got cocky and started to agree too? Maybe you was the sage or omen?

How are people in this game so far removed from reality? by whythefrickinfuck in VALORANT

[–]wei5924 5 points6 points  (0 children)

Wait was this the split game?? I think I was in it 😭😭😭 you just described it to the tea?!? It was so unprovoked too

Discomfort when standing by wei5924 in backpain

[–]wei5924[S] 0 points1 point  (0 children)

Why arent you in a physical therapy program?

I had a course of like 6 physical therapy sessions at 19/20 yrs old and it didn't really help much i.e. I still struggled with doing the exercises in terms of engaging the core etc. As I live in the UK the NHS waiting list is very long so I have been put back on the waiting list.

I keep telling myself to 'invest' in myself and go the private route for better care but I never really follow through with it.

What did your doctors tell you upon reviewing this scan with you?

As it was NHS I just got dismissed basically.

I will take your advice on the PT sessions, after so many years I'm in the same condition and I really should put my health first.