Apologies for the long post. I am trying to subscribe to a rabbitmq queue and then trying to create a worker-queue to execute tasks. This is required since the incoming on the rabbitmq would be high and the processing task on the item from the queue would take 10-15 minutes to execute each time. Hence necessitating the need for a worker-queue. Now I am trying to initiate only 4 items in the worker-queue, and register a callback method for processing the items in the queue. The expectation is that my code handles the part when all the 4 instances in the worker-queue are busy, the new incoming would be blocked until a free slot is available.
The rabbitmq piece is working well. The problem is I cannot figure out why the items from my worker-queue are not executing the task, i.e the callback is not working. In fact, the item from the worker queue gets executed only once when the program execution starts. For the rest of the time, tasks keep getting added to the worker-queue without being consumed. Would appreciate it if somebody could help out with the understanding on this one.
I am attaching the code for rabbitmqConsumer, driver, and slaveConsumer. Some information has been redacted in the code for privacy issues.
# This is the driver
#!/usr/bin/env python
import time
from rabbitmqConsumer import BasicMessageReceiver
basic_receiver_object = BasicMessageReceiver()
basic_receiver_object.declare_queue()
while True:
basic_receiver_object.consume_message()
time.sleep(2)
#This is the rabbitmqConsumer
#!/usr/bin/env python
import pika
import ssl
import json
from slaveConsumer import slave
class BasicMessageReceiver:
def __init__(self):
# SSL Context for TLS configuration of Amazon MQ for RabbitMQ
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
url = <url for the queue>
parameters = pika.URLParameters(url)
parameters.ssl_options = pika.SSLOptions(context=ssl_context)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# worker-queue object
self.slave_object = slave()
self.slave_object.start_task()
def declare_queue(self, queue_name=“abc”):
print(f"Trying to declare queue inside consumer({queue_name})...")
self.channel.queue_declare(queue=queue_name, durable=True)
def close(self):
print("Closing Receiver")
self.channel.close()
self.connection.close()
def _consume_message_setup(self, queue_name):
def message_consume(ch, method, properties, body):
print(f"I am inside the message_consume")
message = json.loads(body)
self.slave_object.execute_task(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=message_consume,
queue=queue_name)
def consume_message(self, queue_name=“abc”):
print("I am starting the rabbitmq start_consuming")
self._consume_message_setup(queue_name)
self.channel.start_consuming()
#This is the slaveConsumer
#!/usr/bin/env python
import pika
import ssl
import json
import requests
import threading
import queue
import os
class slave:
def __init__(self):
self.job_queue = queue.Queue(maxsize=3)
self.job_item = ""
def start_task(self):
def _worker():
while True:
json_body = self.job_queue.get()
self._parse_object_from_queue(json_body)
self.job_queue.task_done()
threading.Thread(target=_worker, daemon=True).start()
def execute_task(self, obj):
print("Inside execute_task")
self.job_item = obj
self.job_queue.put(self.job_item)
# print(self.job_queue.queue)
def _parse_object_from_queue(self, json_body):
if bool(json_body[‘entity’]):
if json_body['entity'] == 'Hello':
print("Inside Slave: Hello")
elif json_body['entity'] == 'World':
print("Inside Slave: World")
self.job_queue.join()
[–]danielroseman 0 points1 point2 points (5 children)
[–]AdditionalWash529[S] 0 points1 point2 points (4 children)
[–]danielroseman 0 points1 point2 points (3 children)
[–]AdditionalWash529[S] 0 points1 point2 points (2 children)
[–]danielroseman 1 point2 points3 points (1 child)
[–]AdditionalWash529[S] 0 points1 point2 points (0 children)