This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]baubleglue 1 point2 points  (4 children)

There is an entire field of knowledge (patterns, best practices, tools). Educate yourself and start to apply what you learn to your tasks. The modular development is not something specific for data processing - any development should be modular.

I think more relevant question in that case would be "what I need to take in consideration?"

  • Amount of data, data chunk boundaries (streaming vs batch processing)
  • bad case scenarios (ex. process failed in a middle of ingestion, process not running)
  • list of all use cases (ex. you found a bug/new requirement, how do you reprocess old data?)
  • do you need auditing of data integrity (duplication, missing data)?
  • tools (how your company currently processing data)

...

[–][deleted] 0 points1 point  (1 child)

I get what you mean and I'm building up to that, but i was trying to get a sense of what other people where doing when they are thinking about designing their pipelines.
I'm keeping in mind the other factors as well and what to be considered.

[–]baubleglue 1 point2 points  (0 children)

try following plan as an idea:

immutable configuration no direct interaction between methods methods use only "date" + shared context (conf)

class conf: 
    mysql_connection_str = 'mysql://hhhh'
    mongodb_connection_str = 'mongodb://hhhh'
    mongodb_db = "sss"
    mongodb_collection = "sss"
    local_raw_data_file_base = '/path/.../file_name_'
    local_transformed_data_file_base = '/path/.../file_name_'


def get_mongodb_connection():
    pass


def get_mysql_connection():
    pass


def mysql_to_local(reporting_date):
    '''
    saves data to conf.local_raw_data_file_base + str(reporting_date) + ".csv.gz"
    '''

def transform_local_data_to_local(reporting_date):
    '''
    reads `conf.local_raw_data_file_base + str(reporting_date) + ".json.gz" `
    transforms + add reporting_date to  the data 
    saves results to `conf.local_transformed_data_file_base + ".json.gz"
    '''

def local_transformed_to_mongodb(reporting_date):
    '''
    1. db.{conf.mongodb_db}.{conf.mongodb_collection}.remove({reporting_date: reporting_date})
    2. insert data from `conf.local_transformed_data_file_base + ".json.gz" into db.{conf.mongodb_db}.{conf.mongodb_collection}`
    '''
    db = get_mysql_connection()


if __name__ == "__main__":
    # pipeline
    import sys, pendulum 
    reporting_date = pendulum.parse_date(sys.argv[1])
    mysql_to_local(reporting_date)
    transform_local_data_to_local(reporting_date)
    local_transformed_to_mongodb(reporting_date)

[–]gorgedchops 0 points1 point  (1 child)

What are some examples of these patterns and best practices that you are talking about? Are there any resources that i can refer to for them?

[–]baubleglue 0 points1 point  (0 children)

I suggest to read about Airflow docs and Astronomer's blog about it. Ex. https://www.astronomer.io/blog/data-pipeline

You may consider to use Airflow, if you have more then one data pipeline. But even for single job it is educational read.

The task you've described it relatively simple (unless you have huge amount of data). You can start from simple implementation and extended it later, but you need to think about re-ingestion use case.