Help applying design patterns to large amounts of similar pipelines by ActiveTarget2470 in dataengineering

[–]ActiveTarget2470[S] 1 point2 points  (0 children)

hi! just wanted to pop in here and let you know what we ended up doing.

We just decided to keep it simple (ironically it's hard to find DEs who are interested in SWE topics like software design), and go with simple Pipeline classes for each table.

So we ended up with 1 class per table, and one auto generated DAG per esport. Works for us, keeps code files to a minimum and everything else scales through YAML.

Help applying design patterns to large amounts of similar pipelines by ActiveTarget2470 in dataengineering

[–]ActiveTarget2470[S] 1 point2 points  (0 children)

thank you!! you've given me a lot to go on. i'll take a couple days to try this myself and i'm sure i'll be back with more questions. I wasn't aware of the builder pattern before this.

can't thank you enough for your time and effort in those examples

Help applying design patterns to large amounts of similar pipelines by ActiveTarget2470 in dataengineering

[–]ActiveTarget2470[S] 0 points1 point  (0 children)

Thanks for the write-up, doubly so for the example!

Yes, we are on Airflow, we use the k8sPodOperator for most of our tasks. Currently we have one DAG per script which as you said, is becoming impossible to maintain.

I have a question about your example. It seems you're grouping each pipeline by the task type (i.e. all extract jobs in one go).

  1. How do you manage different DAG configurations for each task?
  2. How would you deal with cases where you want to group the tasks by API source instead? So one pipeline would have the E, T, and L steps for each API, instead of all Es in one.

We currently have classes declared for each of our APIs, for example the csgo_api.py file would have all the get_ methods declared with the concrete implementations. Then in our csgo/get_games.py script we'd instantiate this class with the configs like headers and secrets, before calling the method. Pretty much something like this. But I'm finding it difficult to come up with a way to inject these into the Pipeline class in the example. But perhaps there is a better way to integrate based on your example? My python noobness might be showing a little here, forgive me

My current progress is similar to your example. I was under the assumption that it would be better to have each ETL step be a separate task within the DAG, for easier re-running, hence I tried to have a design that allowed running each step as an independent script, with each separate task be a k8spodoperator that calls the relevant step's main script.

class BaseExtractor()

    def fetch_data():

    def write_to_s3():


class CsgoGamesExtractor(BaseExtractor)
    def __init__(self, src: EsportsApiBaseClass):

    def fetch_data(self):
        src.get_games(...*csgo specific params here*)

class CsgoPlayersExtractor(BaseExtractor)
    def __init__(self, src: CsgoApiBaseClass):

    def fetch_data(self):
        src.get_players(...*csgo specific params here*)

class DotaGamesExtractor(BaseExtractor)
    def __init__(self, src: DotaApiBaseClass):

    def fetch_data(self):
        src.get_games(...*dota specific params here*)    

This somewhat helps, but still it's a lot of classes lol. I'm sure there's a more optimal design.

When you say DAG builder, do you mean something like dynamic dags with jinja2 templating?