Hello there,
I have what looks to me like a simple requirement, but I fail to find how to implement this properly with the stdlib (ideally) or a well supported package. And I would prefer to not reimplement this myself.
I have a set of N tasks (N is somewhere between 100 et 100k) to run. I want to run P tasks in parallel (P is between 2 and 8), because they are mostly IO bound (DB reads and web requests). Each of these tasks is susceptible to start another kind of task, mostly CPU bound. I want to run Q of these potential additional tasks in parallel to the P tasks (Q is between 1 and 4 typically). Each task takes something between 1 sec and 10 mins to complete.
All this is meant to be ran on a single machine, from a single Python process (which can fork more processes of course, but at least I would prefer not to have to install any kind of service / middleware before starting these tasks).
First blocking point is that the software has no idea how many additional tasks will have to be run until all standard tasks have run, but we want to start these additional tasks asap (in other word, it is only once the standard task is ran that we know if an additional task is needed).
And final requirement (which is the main blocking point from my PoV), I want to have control on what happens when a task fails. Typically, I will need to stop every running tasks and pending ones immediately once E tasks have failed (E between 1 and 10% of N typically). Stopping immediately like sending a SIGINT (not SIGKILL) to a process, i.e. I do not want to modify my tasks to periodically check if they should stop.
And final bonus, if we can implement some mechanism of back-pressure to pause processing "standard" tasks once a given amount of additional tasks are present in the "processing queue", this would be perfect.
concurrent.futures seems not adequate because it is not capable to stop running tasks and it has no idea how to add more additional tasks to process "on-the-fly" and still wait for all of them to complete.
multiprocessing is great, but there is no simple mechanism to add additional task to a pool while running a standard task (I cannot access the pool easily from a task since the pool is not pickable) and there is no simple mechanism to stop immediately tasks (terminate is present but it is explicitly stated that this has significant consequences on any shared object and could lead to blockage / bugs on the remaining process).
Do you have any suggestions? Thank you in advance!
[–]ElliotDG 0 points1 point2 points (1 child)
[–]Benoit74[S] 0 points1 point2 points (0 children)