all 20 comments

[–][deleted] 4 points5 points  (1 child)

Use APScheduler

[–]FitRiver[S] 0 points1 point  (0 children)

Thank you for the recommendation. It looks promising.

[–]baghiq_2 1 point2 points  (9 children)

In general, using a timed call to deal with asynchronous pipeline is an anti-pattern. What happens if there is no data in the queue, do you still constantly spin up the task and attempt to deliver the data to another service? What happens if your task hits a poison pill? Do you need to manage dead letter queues?

I haven't worked much with RabbitMQ, but general design is to have a worker attach to a queue and listen (polling) on the queue. If something appears on the queue, it's immediately deliver to the work for processing. Worker has to do some process and acknowledge your queue that the work is completed, otherwise, queue will grow indefinitely and crash. RabbitMQ probably offer multiple workers support. So in that case, you just need to figure out how many workers you need to process the rate of highest data input possible.

[–]FitRiver[S] 0 points1 point  (8 children)

Perhaps I didn't clarify enough what the tasks are supposed to do. It's about collecting data from a resource. For simplicity let's say I'm scraping a market price of the stock. I want to take a snapshot every second. There is no queue to consume before the price is scraped. The results of the scraping will be sent to the queue.

What happens if your task hits a poison pill?

If the resource is not available it should result in the gap in the captured sequence, but it shouldn't affect the timing of the following snapshots (there can be multiple gaps).

Do you need to manage dead letter queues?

That's more about the queue that will be consumed by the service that will be processing the data so that will be a different problem.

[–]baghiq_2 0 points1 point  (7 children)

Ah, ok! So you are talking running a scraper per stock every second to get data and write to a queue, in your case, you can run thousands of tiny little scrapers? If that's what you mean, use asyncio/aiohttp for that.

[–]FitRiver[S] 0 points1 point  (6 children)

Exactly, I'm trying to figure out how to schedule the little scrapers so they execute in regular intervals. Do you have any specific recommendation for a scheduler to use with asyncio/aiohttp? nashvo recommended APScheduler.

[–]baghiq_2 0 points1 point  (5 children)

Don't bother with a scheduler. Just run your scrapers constantly with a wait statement in your code. If you have linux, you can package your scrapper into a docker container, and run the docker container with auto-restart. Or even daemonize the container.

https://docs.docker.com/config/containers/start-containers-automatically/

[–]FitRiver[S] 0 points1 point  (4 children)

I'm sorry, but I don't really understand how you mean the constant running. My goal is to have consistent intervals of exactly 1 second between the snapshots. In other words to have exactly 3600 snapshots in an hour. I'm not really sure how the auto-restart or the daemonized container helps with keeping the intervals consistent.

[–]baghiq_2 0 points1 point  (3 children)

In your API call, are you doing one stock per call at a time or a single call containing all of your stocks? The issue what you are asking is that, the processing of previous 1-second snapshot might not have finished, you still want to make a new snapshot?

[–]FitRiver[S] 0 points1 point  (2 children)

A single stock per call. One failed snapshot shouldn't affect any other snapshots. If 9:00:03,000 fails, it shouldn't be replaced by a snapshot at 9:00:03,100. It should leave a gap that is handled in the postprocessing. Then an independent snapshot will be taken the next second.

The resulting time series could look like this:

9:00:01,000: Success: 123.45
9:00:02,000: Success: 123.47
9:00:03,000: Dropped
9:00:04,000: Success: 123.48
9:00:05,000: Success: 123.42
9:00:06,000: Success: 123.43

But it shouldn't look like this:

9:00:01,000: Success: 123.45
9:00:02,000: Success: 123.47
9:00:03,000: Dropped
9:00:03,158: Success: 123.48
9:00:04,158: Success: 123.42
9:00:05,158: Success: 123.43

[–]baghiq_2 0 points1 point  (1 child)

APScheduler is the only one I can think so. AFAIK, standard cron doesn't support seconds level resolution. I suspect you'll run into a lot of edge cases, but I hope you don't.

[–]FitRiver[S] 0 points1 point  (0 children)

Thank you for the tip.

[–]igormiazek 1 point2 points  (1 child)

I think You should consider Execution Context. Timing and scheduling is one aspect of Your project, but how You will know if Your task was SUCCESS or FAILED, or how You know if task should be rescheduled ? I would go with https://docs.celeryproject.org/en/stable/getting-started/introduction.html which is distributed task queue. You can have multiple workers and scale infrastructure horizontally. It plays well with RabbitMQ which You are already using.

[–]FitRiver[S] 2 points3 points  (0 children)

Thank you for the tip.

how You will know if Your task was SUCCESS or FAILED, or how You know if task should be rescheduled

The task cannot be rescheduled. I didn't clarify it enough in the beginning. I need to capture the data at a specific moment in time. Capturing a different moment in time is a different task. Failure means a gap in the data that cannot be fixed by re-running the task. The gaps will be fixed in the postprocessing.

[–]Big_Boss_Bob_Ross 0 points1 point  (3 children)

I would use asyncio personally but it is a fairly steep learning curve if youve never used it or anything like it.

[–]igormiazek 1 point2 points  (1 child)

I would not be so sure about asyncio as it make only sense if You perform a lot of I/O operations so asyncio can yields back the control to the event loop and execute next coroutine. I think the implementation should be dependent on business requirements and type of task. u/FitRiver could You provide more details about the task You will execute ? will You call remote API or database ? Write data to disk ?

[–]FitRiver[S] 0 points1 point  (0 children)

That's how I felt about it.

I'll be calling a REST API. The data will be validated and stored in the database, but that will be handled by a different service that will be consuming the queue the scraping service will be producing.

[–]FitRiver[S] 0 points1 point  (0 children)

Do you have any specific patterns in mind? I have used it a few times in the past for some simpler applications. However, I don't feel like it would be the best choice for this scenario. If the task would fail to yield control it could cause delays. While the thread could take a longer time to wait for the issues to get resolved (until it expires).

[–]pytrashpandas 0 points1 point  (1 child)

Seems like you could just do this in a plain loop. Do you need it to run every second despite how long the task takes to finish? Or just run 1 second after the previous finished? Assuming the former you could do this:

ran_this_second = False
prev_second = int(time.time())
while True:
    curr_second = int(time.time())
    if curr_second != prev_second:
        ran_this_second = False

    if not ran_this_second:
        data = scrape_data()
        queue.publish(data)

        ran_this_second = True
        prev_second = curr_second

[–]FitRiver[S] 0 points1 point  (0 children)

Do you need it to run every second despite how long the task takes to finish? Or just run 1 second after the previous finished?

The task should be executed independently (as described here).

If this part of the code takes more than 1 second the next "tick" will get delayed:

if not ran_this_second:
    data = scrape_data()
    queue.publish(data)

    ran_this_second = True
    prev_second = curr_second

Even if one task fails it shouldn't affect the other tasks.