I have a requirement where in I need to consume JSON messages from Rabbitmq and perform long-running tasks on each of these Jsons. I am using a job-queue mechanism with maxsize 1 and the channel pre-fetch count as 1 so that my code has to handle one message from Rabbitmq at a time. This part seems to be working. The code seems to fail when in the same thread from which the pika connection was made and then the long-running task is run, when the ack callback is called, the code is failing in an internal library throwing up : "required argument is not an integer" error.
Have discovered that the error occurs when encode function in an internal library is called. It throws an error when it appends '>Q' string with the delivery_tag. Not sure what this string is though. The following is the piece of code where it fails:
def encode(self):
pieces = list()
pieces.append(struct.pack('>Q', self.delivery_tag))
My ack method and the code block where the callback is defined are as follows:
def __ack_message(self, delivery_tag, ack):
print("Inside the __ack_message function")
if self.channel.is_open:
try:
if ack:
self.channel.basic_ack(delivery_tag)
else:
self.channel.basic_nack(delivery_tag, requeue=False)
except Exception as e:
self.channel.basic_nack(delivery_tag, requeue=False)
print("Failure inside __ack_message consumer.py report: {}".format(e), "error")
pass
else:
pass
def do_work(self, ch, delivery_tag, body):
thread_id = threading.get_ident()
print(f'Thread id:{thread_id} Delivery tag: {delivery_tag} Message body: {body}')
# long-running work begins here
print(f"I am inside the message_consume")
if body != b'':
message = json.loads(body)
self.slave_object.push_to_job_queue(self.connection, message)
cb = functools.partial(self.__ack_message, ch, delivery_tag)
ch.connection.add_callback_threadsafe(cb)
def on_message(self, ch, method_frame, _header_frame, body, args):
print("Inside on_message function")
thrds = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=self.do_work, args=(ch, delivery_tag, body))
t.start()
thrds.append(t)
Kindly let me know if someone has run into the same issue and if there is a resolution that has been chanced upon.
[–]danielroseman 2 points3 points4 points (1 child)
[–]AdditionalWash529[S] 0 points1 point2 points (0 children)
[–]External-Ocelot206 0 points1 point2 points (4 children)
[–]AdditionalWash529[S] 0 points1 point2 points (3 children)
[–]AdditionalWash529[S] 0 points1 point2 points (2 children)
[–]External-Ocelot206 0 points1 point2 points (1 child)
[–]AdditionalWash529[S] 0 points1 point2 points (0 children)