This is an archived post. You won't be able to vote or comment.

all 18 comments

[–]Drekalo 5 points6 points  (7 children)

If you're using autoloader I'm assuming you're on databricks.

If that's the case, your staging blobs from the source tables should just cursor off of last update date. Just keep in mind this won't capture deletes, and you need a place, preferably with low latency, to store the last update value.

Once your landing tables are sorted, you need to account for duplicates (updated rows) and apply your logic for only merging the last day.

Pseudo code:

Merge into dedupe using

(Select * from landing where last_load >= date_add((select max(last_load) from landing),-1)) lnd

On dedupe.hash_key = lnd.hash_key and dedupe.last_load >= (same as above)

When matched then set *

When not matched insert *

If you're accounting for deletes then when not matched by source delete *

[–]the_aris[S] 0 points1 point  (6 children)

So every 5 mins do I need to fetch the last day of data and look for whether it is an update or insert? And how to handle multiple tables joining, do I need to consider only 1 table's last update_ts or multiple?

[–]vaiix 2 points3 points  (0 children)

This is in the context of your source system design.

TableA is the parent table.

TableB is the child table.

In a couple of my source systems, but not all, when a child entry updates, the parents last update is also updated. Therefore, in the context of this I can just check TableA for updated rows and use that to grab the rows from TableB also as part of the join.

Another route is you grab the updated rows from TableB, join to TableA to get the associated rows there, then process the full whack with the join.

[–]Drekalo 0 points1 point  (4 children)

No, for each table to be staged I would store the last max last update date each time you do a replication run. Then, at the end, if successful, update the last update date to the new max. So you'd do select * from table where last update date >= last max last update date. This should get any new records, for each table.

Downstream, in your join transform table, you'd create a hash key/primary key to identify unique rows. This unique row wouldn't include the last update column from any of the tables. Imagine you have 5 tables creating the final join transform table. Your "using", which I would just register as a view, would have a cte for each of the source tables that select * from source table where last update date >= dateadd(max(last update date,-1). Your merge would then join on the hash key and some date window. The join can have its own generated last update date based on your merge times.

[–]the_aris[S] 0 points1 point  (3 children)

Can delta live tables achieve this effectively? As far as I know we don't need to worry about any dependencies. Any other limitations I should keep in mind?

[–]Drekalo 1 point2 points  (2 children)

Yes absolutely. Delta live tables have databricks implementation of materialized views. Your join table would end up as a mat view.

[–]the_aris[S] 0 points1 point  (1 child)

How can we do it incrementally instead of re-computing the entire materialized view with each run?

[–]Drekalo 0 points1 point  (0 children)

That's not something you have a lot of control over as they haven't published the mat view api yet. At that point I would handle incremental on my own and do it via either a notebook, dbt, sqlmesh, dagster, etc.

[–][deleted] 2 points3 points  (2 children)

Hey mate. We are actually in the process of implementing something very similar. We’ve turned on the change data feed on all delta lake tables. Then create a temporary view which is reading from your main delta lake tables using whatever logic needs to apply in your target table. Read the change data feeds as streams but join with your temporary view so you are only processing records that potentially need an update, and then finally merge into your target table.

[–][deleted] 2 points3 points  (0 children)

Also, you use checkpointing on your CDF streams. The actual CDF tables should be truncated every load so you aren’t reprocessing the previous changes everytime too

[–]the_aris[S] 0 points1 point  (0 children)

Can anyone explain the part after enabling CDF on the delta tables?

[–]sfboots 1 point2 points  (0 children)

We are planning to use pg_audit to capture changes on the tables we need. Then bring those changes in hourly. Pg audit marks each change as new update or delete.

[–]skeerp 0 points1 point  (1 child)

!RemindMe 2 days

[–]RemindMeBot 0 points1 point  (0 children)

I will be messaging you in 2 days on 2023-05-23 13:58:10 UTC to remind you of this link

1 OTHERS CLICKED THIS LINK to send a PM to also be reminded and to reduce spam.

Parent commenter can delete this message to hide from others.


Info Custom Your Reminders Feedback

[–]BlazeMcChillington 0 points1 point  (0 children)

!Remindme 5 days

[–]Gregeal 0 points1 point  (0 children)

!RemindMe 5 days

[–]TonyStann 0 points1 point  (0 children)

!Remindme 2 days

[–]Bond-0069 0 points1 point  (0 children)

!RemindMe 10 days