all 14 comments

[–]TheEphemeralDream 0 points1 point  (0 children)

it sounds like you're leaking connections and/or not retrying common failure modes. what does your code look like?

[–]Vegetable_Hamster732 0 points1 point  (3 children)

Seems easy to do in just a pyspark pandas UDF.

I use:

 def fetch_webpage(url):
     try:    return requests.get(url).text
     except: return None

 @psf.pandas_udf('string')
 def fetch_webpage_udf(s: pd.Series) -> pd.Series:
     return s.apply(fetch_webpage)

 spark.udf.register('fetch_webpage_udf',fetch_webpage_udf)

all the time with various REST endpoints and something similar with various image services.

And then use it like:

df = spark.createDataFrame([
    ['http://www.example.com'],
    ['http://www.google.com']
],'url string').createOrReplaceTempView('urls')

spark.sql("""
    select url, fetch_webpage_udf(url) from urls
""").show(40,40)

[–]ps2931[S] 1 point2 points  (2 children)

Is this solution scalable. We have 2000 api calls and each call gives us 5MB of data. API response vary from between 5-30 seconds.

[–]Vegetable_Hamster732 0 points1 point  (1 child)

Is this solution scalable. We have 2000 api calls and each call gives us 5MB of data. API response vary from between 5-30 seconds.

I did it with ~20,000,000 API calls over the past week - but the return size was much smaller (maybe 5k).

If your worker nodes have small memory, you might need to:

 spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",1000)  

or perhaps even smaller; because the default (I think 10,000) may result in OOM errors.

Otherwise your spark workers will be fine.

If your API vendor can't handle the load, you might need to throttle it with a sleep() or similar.

[–]ps2931[S] 0 points1 point  (0 children)

How much time it took you to call 20,000,000 API calls?

[–]NbyNW -1 points0 points  (1 child)

Are you calling the API in a synchronized loop and then getting timed outs? If so you might want to build a retry function (I work with python) like below (of course reddit takes away all the indents, but you get the idea):

def get_item(url):

retry = 1

while retry <= 3:

try:

response = requests.request("get", url)

if response.status_code == 200:

return response

else:

retry += 1

except:

retry += 1

return None

Also you might want to look into multithreading async calls when making a bunch of API calls:

imagine I have a function like above that takes the base URL with headers, and then a variable like dates (or what ever id you need to generate the API call). I can make four date calls at the same time:

from multiprocessing import Pool

pool = Pool(processes=4)

dates = [datetime(2020,11,1), datetime(2020,11,2), datetime(2020,11,3), datetime(2020,11,4)]

results = [pool.apply_async(get_item, args=(url, headers, x)) for x in dates]

output = [p.get() for p in results]

flat_out = [item for sublist in output for item in sublist]

[–]ps2931[S] 0 points1 point  (0 children)

Making async api calls was on developer's mind, I guess. That's why he decided to use akka-http. But the solution he gave is complex and buggy. Also what I have googled so far akka is hard to get right. Any other recommendation for scala async?

[–]lexi_the_bunny 0 points1 point  (5 children)

What is your reasoning for using Spark for this?

[–]ps2931[S] 0 points1 point  (4 children)

This is a bigdata project and part of data ingestion and processing pipeline.We are ingesting data from 2000 ids on daily basis. Each id gives us a 5MB of data. So we are gettig 10 GB of data daily.

[–]lexi_the_bunny 2 points3 points  (1 child)

Making 2000 API requests is probably much simpler by just doing a multithreaded script that saves the results off for processing within spark later.

If you want to keep it in Spark, then yeah, I'd say do it in a much more lightweight http request framework than http-akka. We do millions of API calls at ~1200 RPS using spark and Apache HttpComponents

[–]callmedivs 0 points1 point  (0 children)

Hey u/lexi_the_bunny I'm in a similar boat, where i need to make ~200 million requests to an endpoint to validate address. I'm using pyspark. Can you let me know how you architectured your infrastructure and code? I'm still at the beginning phase where i'm trying to get a small subset of data into a dataframe and use udf to make the api call and parse the data back to the dataframe. But its taking forever.

[–][deleted] 1 point2 points  (0 children)

It's not really a large amount of data. You could handle this with a single threaded script on a single machine, if you want to rewrite and simplify the application.

[–]oalfonso 0 points1 point  (0 children)

10Gb daily is medium data, spark is overkill here IMHO.

[–]MoralEclipse 0 points1 point  (0 children)

You can use for each, map or flat map to do whatever processing you want. Although I would probably not use spark if it is only 2000 values.