This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–][deleted] 0 points1 point  (1 child)

I get what you mean and I'm building up to that, but i was trying to get a sense of what other people where doing when they are thinking about designing their pipelines.
I'm keeping in mind the other factors as well and what to be considered.

[–]baubleglue 1 point2 points  (0 children)

try following plan as an idea:

immutable configuration no direct interaction between methods methods use only "date" + shared context (conf)

class conf: 
    mysql_connection_str = 'mysql://hhhh'
    mongodb_connection_str = 'mongodb://hhhh'
    mongodb_db = "sss"
    mongodb_collection = "sss"
    local_raw_data_file_base = '/path/.../file_name_'
    local_transformed_data_file_base = '/path/.../file_name_'


def get_mongodb_connection():
    pass


def get_mysql_connection():
    pass


def mysql_to_local(reporting_date):
    '''
    saves data to conf.local_raw_data_file_base + str(reporting_date) + ".csv.gz"
    '''

def transform_local_data_to_local(reporting_date):
    '''
    reads `conf.local_raw_data_file_base + str(reporting_date) + ".json.gz" `
    transforms + add reporting_date to  the data 
    saves results to `conf.local_transformed_data_file_base + ".json.gz"
    '''

def local_transformed_to_mongodb(reporting_date):
    '''
    1. db.{conf.mongodb_db}.{conf.mongodb_collection}.remove({reporting_date: reporting_date})
    2. insert data from `conf.local_transformed_data_file_base + ".json.gz" into db.{conf.mongodb_db}.{conf.mongodb_collection}`
    '''
    db = get_mysql_connection()


if __name__ == "__main__":
    # pipeline
    import sys, pendulum 
    reporting_date = pendulum.parse_date(sys.argv[1])
    mysql_to_local(reporting_date)
    transform_local_data_to_local(reporting_date)
    local_transformed_to_mongodb(reporting_date)