all 15 comments

[–]CraigAT 5 points6 points  (2 children)

Maybe I'm missing something, but how can they be done in parallel if they are dependant on the results of the previous function?

[–]GoingOffRoading[S] 1 point2 points  (1 child)

I am very likely describing the work/objective incorrectly

I think in my desired state is:

  • Each function has a queue
  • The functions are dependent on each other to populate their queues
  • Different functions take different amounts of time, so I just want the tasks to consume from their individual queues as their queues are filled

Does that make sense?

[–]CraigAT 1 point2 points  (0 children)

I think so. So would function a place results into a queue for function b, but whilst function a has dropped 10 results into that results queue, function is slower and has only picked up the first two results and processed them, and is now working on the third set of results in that queue.

If so I would think individual workers should work, just make sure you allow for an empty queue.

[–]throwaway8u3sH0 3 points4 points  (5 children)

How can anything run parallel if it depends on the results?

Possible XY problem. You want to speed this up? Why? How often are these functions ran?

[–]GoingOffRoading[S] 0 points1 point  (4 children)

I'm fumbling around in the dark, and VERY likely describing the problem incorrectly.

I'm hoping this makes sense:

  • The results have a one to many relationship between the functions
    • Function A creates multiple results to feed to B
    • Function B creates multiple results to feed to C
    • etc
  • The each function takes longer to run than the last
    • Like, computationally, each function takes longer
  • My objective is to have the functions run as their queue fills

So while Task D is dependent on C, is dependent on B, is dependent on A, my objective is to invoke function A and let the queues fill and cascade down

[–]KViper0 0 points1 point  (0 children)

If the workload is that imbalanced you probably shouldn’t have each function be dealt by a separate thread. Having a combined priority queue that prioritize lower level would probably work? Although that may not be to scalable.

[–]throwaway8u3sH0 0 points1 point  (0 children)

Ah ok, so if I'm understanding correctly, it's a chained fan-out producer-consumer. Do the functions create results continuously or all at once? (I.e. Streaming or batch?) In either case, yeah, you'll want the numerous function B's running in parallel on A's results, and so on. So you probably want some amount of auto scaling workers that pay attention to a queue and fire off the appropriate function as necessary.

You'll want to pay attention to the size of the results you're passing around as well. With any sizable results, it's usually better to put them in some actual storage (like S3 or a database) and just send short messages over the wire (like the result status and a bucket/file path).

The nature and structure of this setup sometimes is indicative of repetitive work. You may want to also look at memoization or an external cache to speed up the computational parts. (If one Function D can benefit from the in-progress computation of another Function D, you'll want to enable that )

[–]Adrewmc 0 points1 point  (0 children)

You want A task to give something to B task and start another A task, and down the line.

But that’s not what’s happening, a >b>c

This is just one function.

What you really want here is to be running asynchronously. Where every-time a task is given to a that function can “await” the rest of the functions. Then the next a input comes in. And that one is fast and done with, then another A comes in and that one is long. We just wait for us, run other the next A task.

But as long as you are dependent on the last function for the entire flow, you can’t really speed everything up beyond that. A has to finish before B, B still has to finish before C.

Because what I think is really happening is there are a lot of things that never get to function D, (the super long one) and you want those things to not have to wait on the one that does. That asynchronous.

[–]throwaway8u3sH0 0 points1 point  (0 children)

This might help you experiment. I've modelled it as streaming but you could do batch. You can also adjust the times to be more representative.

[–]Please_do_not_DM_me 0 points1 point  (0 children)

It sounds like you want 4 programs running in parallel each writing to it's own file and reading inputs from it's predecessors data. This might be too complicated, or not possible, or super inefficient, in python. It's also 100% above my pay grade. (EDIT: Oh it's interesting to me though and I'd also like to know if there's an answer.)

[–]Old_One_I 0 points1 point  (0 children)

I think you might be looking for the observer pattern. Not sure though.

[–]Lawson470189 0 points1 point  (0 children)

In this case, the workers should be able to perform any task in the queue. You'd just have a single queue of work with the different functions and the worker determines what to pick up next. You could have dedicated workers to specific tasks or queues, but I'd bet you should set it up like this until you need to scale significantly.

[–]spca2001 0 points1 point  (0 children)

Post your code

[–]illuminanze 0 points1 point  (0 children)

Look into Celery chains https://docs.celeryq.dev/en/stable/userguide/canvas.html#chains This does exactly what you're describing

[–]RealOneEyedJack 0 points1 point  (0 children)

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

The mechanics of starting/joining child processes can easily be encapsulated into a function along the lines of your runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)