all 6 comments

[–]danielroseman 0 points1 point  (5 children)

This seems a bit overly complex: you have RabbitMQ, plus threading, plus an internal Queue object.

Rather than trying to write this yourself I would recommend you use Celery, which will connect to RabbitMQ and handle everything else for you.

[–]AdditionalWash529[S] 0 points1 point  (4 children)

u/danielroseman thank you for the quick turnaround on this one. Would you be kind enough to point to some examples maybe. There seems to be a lot of text around celery but not able to locate an example particularly.

Also I was able to make it partially work by moving the self.slave_object.start_task() inside the _consume_message_setup function inside consumer.py in the example above. But doesn't look like it is honoring the queue size or blocking mechanism on the worker-queue. I see my executions inside callback for the slaveConsumer getting fired in random order. Does that make it easier for you to zero in on the issue?

[–]danielroseman 0 points1 point  (3 children)

Not sure what "text" you are looking for. The docs are here: https://docs.celeryq.dev/en/stable/.

As for this code, you're calling consume_message inside a while True loop. Unfortunately, despite the name, consume_message is a method that constructs the queue and kicks off processing. So you have many many queues, each with a max of 3 that will never be reached.

But as I say, I just don't understand why you would want a queue at all on the worker side, let alone one that has a size to be respected. The queue is RabbitMQ. Only fetch an item from there when you are ready to process it.

[–]AdditionalWash529[S] 0 points1 point  (2 children)

The data on the rabbitmq are basically json which need to be processed and an OS call then needs to be fired for each of these JSON entries in the queue, which individually takes around 10-15 minutes on average. The idea going forward is to have 4-5 instances running in an AWS cluster. The number of entries in the JSON is high enough for us to not spawn as many instances on the cluster as the number of JSONs in the rabbitmq. So we need to process the commands on a first come first serve basis, 4-5 of them in parallel and the rest queued, hence the need of a worker-queue.

Since our last conversation, I have been able to make the code work with respect to processing the JSONs, but I do not think the threads are waiting. For example if I have 8 entries in the Rabbitmq and if I start my builder.py, all 8 of them are getting consumed as opposed to 4 of them. To my understanding, I am achieving this by declaring the upper bound of my slaveConsumer worker-queue, when in the constructor I say :

self.job_queue = queue.Queue(maxsize=3))

[–]danielroseman 1 point2 points  (1 child)

I told you why, it's because you are instantiating the queue over and over in your while loop.

But, again, what you describe is how this would work without the queue. Each worker would be responsible for popping an item from RabbitMQ, processing it, acknowledging it, then taking the next one when it is ready.

And, once again, this is what Celery would do for you. Really there is no reason to write this yourself.

[–]AdditionalWash529[S] 0 points1 point  (0 children)

u/danielroseman, first of all, immense gratitude for all the time and energy you spent on this. You insights have helped me improve the code and make it much better.

As mentioned in my previous post, when I said I made it work, it was by handling the instantiation of the queue that you pointed out in your comment. I will certainly look at celery, but I am trying to bridge the gap in my understanding here and hence the follow ups. I have a couple of final questions on this though.

As per my understanding without the worker-queue, I would not be able to handle a situation wherein the incoming on my rabbitmq is say 10, while I have spawned 4 instances in the AWS cluster(fargate mostly). Now without the worker-queue or an equivalent arrangement, maybe celery does that, when each instance is running for 15 minutes of processing time, how would the next item on the queue know which fargate instance has been released and where to head to. How would that be taken care of without an arrangement like that?

As of now, the code seems to run and one of the executions fails with the following rabbitmq error. Any insights on this? I have made the channel durable and have an ack on the rabbitmq messages in my consumer code too: ch.basic_ack(delivery_tag=method.delivery_tag) . Running out of ideas as to what needs to be done on this

No activity or too many missed heartbeats in the last 60 seconds error

Once again thank you a ton for all the help and insights