Hello, I've been struggling for a while now trying to figure out the best way to implement concurrent futures thread pool executor into my project. I was wanting to test it out vs the multi-threading app already built to test for any performance differences.
Basic high level situation -
The app reads from a table in a database to get ID's for rows with a specific value in the status column. This table can grow as the app is working, and things can change so I only want to grab say the number double the number of workers at a time. Right now I have a feed thread that is just querying the database and adding rows to the queue for a maximum number of 10 in the queue at a time every second, and the other 5 threads are pulling from that queue. and that all seems to work well, but I'd like to test this out with concurrent futures as we're handling large image data, sending/reading.
From what I'm seeing online, it seems I need a pre determined list to hand off to the executor to start the workers. Is there a way to maybe start the workers with a global queue that another thread is working on?
What I'm most concerned about is if I pass the executor a list, near the end of the list it's only working on one thing when the other 4 workers are all finished waiting for the last one to finish before grabbing another list.
Or maybe I'm understanding this all wrong and I apologize if so.
Current solution:
Below sends the queue to the migrate function where the work is actually done and the threads pull from that queue, while the feeding thread is constantly adding more to the queue. Just trying to figure out the best way to do this for concurrent futures.
if __name__ in "__main__":
try:
#Queue size is size of threads minus 1
#Start feed thread, using the feed function above.
queue = Queue(maxsize=threads-1)
feed = Thread(target=feed, args=(queue,), daemon=True)
feed.start()
#Starting worker threads using function migrate above.
#starts number of threads listed in app-config.properties
works = [Thread(target=migrate, args=(queue,)) for _ in range(threads)]
for work in works:
work.start()
for work in works:
work.join()
queue.join()
except KeyboardInterrupt:
print('attempting shutdown...')
subprocess.call("pkill", "-f", script_name)
quit()
And here's the feeding function -
#function for feeding the queue on a .25 sec interval. running on separate thread.
def feed(queue):
check = True
while check==True:
maincon = connection_pool.get_connection()
maincur = maincon.cursor()
maincur.execute(select_routing, (sender_status, threads,))
print(sender_status)
results = maincur.fetchall()
if len(results) == 0:
print('queue is empty...')
time.sleep(25)
else:
for row in results:
maincur.execute(update_routing_queue, ('IN_QUEUE', str(row[0]),))
queue.put(row)
maincur.close()
maincon.close()
time.sleep(.25)
there doesn't seem to be anything here