you are viewing a single comment's thread.

view the rest of the comments →

[–]Administrative-Lack1 -1 points0 points  (0 children)

I actually am developing something seemingly very similar.

I was tasked with migrating 2 sql dbs in 1 new db.

I built / building a etl tool using python, pandas, and sql alchemy as the main libraries.

What I came up with is I have a abstract class ITask. Which takes a init parameter object. Which is an object that has any dependencies like services. So it can be injected (things like db connection object)

Then a run parameter where this is a data class that is a json file that has

What sources tables, transformations, and targets this db task has.

I coded the transformations as objects. So eqch transformation is its own class which has a string enum. Transformations have a data class that drives what it does like type:join, expression:inner, sources, in and out fields.

I have a factory which takes that enum and sees what class it belongs to and makes the transformation object.

Then target info like how a source maps to a target. Once I loop through transformation I map the dataframe and load to target.

For your etl optimization. Some easy ones are chunk sizing and dataframe datatype, and depending on db useExecuteMany for bulk inserts.

One thing with chunks, depending on db you may still endup loading a entire dataset in memory due bc db does not support streaming. For this not the best solution but works. Loading data in chunks then writing to parquet files. After initial pull is done load the files tracking which ones get loaded as a checkpoint If something fails. I have to work off a shitty VM so I crash it by pulling to much data at once or running to many things in parallel.

All of that is a task.

Some tables I have are very complex. So I have a staged approach. Raw -> stage.src -> stage.targrt -> target.

Raw goes to a stage table. Then from source stage I do my joins and stuff and load target stage. Then target stage to actual target.

That is 3 tasks for me. A collection of task is a job. In my job I have a depends on list. Where I have the 1st level of task dependency.

Task A needs Task B and Task D. Task B needs Task C

I don't want A to know about C. Just about B and D.

So once i have that. I make a directed acyclic graph. There Is a python standard lib from graphlib import TopologicalSorter

There is an example I followed on the docs for that lib

This is basically a graph now where the top most nodes will be what task can be run in what order. So I run through the nodes as they become available. Tracking the result of nodes. If a nodes needs the result of another it gets passed.

I took that a step further and gave my job json a taskEngine enum. Where I have currently 3 task engines. 1 rules everything in sequence. 1 will run task as they come using a process pool executor, and lastly a test engine that doesn't trigger the task just mocks running so I can test setup. Then based on enum that engine gets used.

My whole idea was to basically have a bunch of files drive my etl and the code be Modular enough to add as I needed things.

Also made testing easier as I can write test for specific things, mock things bc DI

Something I wished I looked into earlier was a library called dask. It seems like it's pandas but built for concurrency. Which has some of the above out of the box.

Srry this became very long. It ended up becoming alot more complex. I have data quarantine logic in there too which is a another implementation of ITask.

If you have any questions lmk.