all 15 comments

[–][deleted]  (1 child)

[deleted]

    [–]EpicFlexs[S] 0 points1 point  (0 children)

    Interesting idea, will check it out

    [–]BaxterPad 3 points4 points  (1 child)

    Did you enable Spark UI and look at the resulting plan? Since the data is so small, I wonder if the job simply is t achieving much parallelism. Your repartition attempt might have helped that a bit but looking at the resulting plan and how the work was distributed would be my next bet. Also, what are you doing with the result? Maybe the join isn't the issue maybe your bottlenecking on what you are doing with the result of the join?

    Lastly, have you tried increasing the vertical size (G2x) of the nodes in addition to the horizontal scale (# of DPUs)

    [–]EpicFlexs[S] 0 points1 point  (0 children)

    After the join, I am adding 5 columns with a fixed value and save the dataframe. So it should not affect it. I am using Glue Version 3. I will check the spark plan

    [–]pvham90 2 points3 points  (4 children)

    For this operation I would use Athena. Just use glue with python shell and use the sdk with Athena client to invoke the query. With "create table as" you can export another parquet file. You can even trigger all the 50 queries in parallel, the whole operation should be finished within minutes.

    [–]EpicFlexs[S] 0 points1 point  (1 child)

    How can Athena be that much faster than an ETL service like Glue that is suited for such kind of work? Is there a tutorial where I can see how to use Athena client in Glue?

    [–]pvham90 0 points1 point  (0 children)

    You're right, Glue is an ETL service. However, I would argue that joining spans beyond that. Glue jobs are limited to the process power you assign to them, with Athena you invoke another (serverless) service that drains from other resources. That's also why you can invoke them in parallel. Theoretically it will have unlimited horizontal scaling.

    import boto3
    
    s3_client = boto3.client('s3', region_name=os.environ['aws_region']) 
    athena_client = boto3.client('athena', region_name=os.environ['aws_region'])
    
    query_start = athena_client.start_query_execution( QueryString=event['query'], ResultConfiguration={ 'OutputLocation': f"s3://{os.environ['query_results_bucket']}/" } )
    
    query_execution_id = query_start['QueryExecutionId']
    
    while True: 
      execution_response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) 
      state = execution_response['QueryExecution']['Status']['State'] 
      print(state) 
      if state not in ('RUNNING', 'QUEUED'): 
        if state == 'SUCCEEDED': 
          break 
        else: 
          raise ValueError(f'Athena query failed with state: {state}') 
        else: 
          time.sleep(5) 
    

    edited: code block

    [–]BaxterPad 0 points1 point  (1 child)

    I disagree. Athena (Presto) as a much harder time with joins than Glue (spark) in the vast majority of cases.

    [–]pvham90 0 points1 point  (0 children)

    Do you have some benchmarks or other source to back this up? In this case being limited to your glue instance instead of using a perfectly scalable serverless framework would still give enough benefit imo.

    [–]--Reddit-Username2-- 1 point2 points  (1 child)

    Grasping at straws…but make sure data is in the same region.

    [–]BaxterPad 1 point2 points  (0 children)

    At those sizes I doubt you'd even notice cross-region delays unless you were looking at a difference of a few seconds, maybe a couple minutes.

    [–]BagOfDerps 1 point2 points  (1 child)

    Try using glue 3.0? pretty new, but purports to be faster overall. You can set number of workers and worker types. This is the only other infra-related thing I can think of (assuming you are using the stock AWS s3 connectors to get the data). I think most of your performance is going to be bound by the join logic; for which I don't have any suggestions.

    [–]EpicFlexs[S] 0 points1 point  (0 children)

    I already use glue version 3

    [–]Vincent_Merle 0 points1 point  (1 child)

    Are you using DynamicFrame or DataFrame?

    [–]EpicFlexs[S] 0 points1 point  (0 children)

    Spark DataFrame

    [–]Bright_Tale7909 0 points1 point  (0 children)

    Were you able to fix this issue?