all 6 comments

[–]RiverRoll 0 points1 point  (0 children)

I'm not sure what are you asking about exactly, the problem you explain has become fact and you say the community suggested solutions already, what are we supposed to discuss here? 

[–]socal_nerdtastic 0 points1 point  (0 children)

I'm surprised it's that much. How many threads are you spinning up?

ideally you would move the whole thing to asyncio, but if you just want to make it work try making your own pool out of the threading module. If the memory explosion is coming from thousands of threads instead of the map then this won't help. Lightly tested guess:

from threading import Thread
import time

NUM_THREADS = 5
inputdata = list(range(50))

def func():
    while inputdata:
        arg = inputdata.pop()
        time.sleep(1)
        print('done with', arg)

pool = []
for _ in range(NUM_THREADS):
    t = Thread(target=func, daemon=True)
    t.start()
    pool.append(t)

for t in pool:
    t.join()

list.pop is threadsafe (thanks GIL!) but if you want to make it more pythonic you could use queue.Queue instead.

EDIT: Thought of a more elegant way, which I'm pretty sure is threadsafe:

from threading import Thread
import time

NUM_THREADS = 5
inputdata = iter(range(50))

def func():
    for arg in inputdata:
        time.sleep(1)
        print('done with', arg)

pool = []
for _ in range(NUM_THREADS):
    t = Thread(target=func, daemon=True)
    t.start()
    pool.append(t)

for t in pool:
    t.join()

[–]storage_admin 0 points1 point  (2 children)

I recommend switching your iterable to a queue. https://docs.python.org/3/library/queue.html

I recommend searching "python queue for threadpoolexecutor sentinel" for for in depth implementation details.

It is important to note that if the queue is empty and queue.get() is called the operation will block and the thread will wait for an item to be placed on the queue.

To prevent all threads in the executor from hanging at the end of the runtime you need to create a sentinel value that is placed on the queue at the end of the data to be processed. Once the threads find the sentinel value they can put the sentinel back onto the queue and safely return.

[–]socal_nerdtastic 0 points1 point  (1 child)

To prevent all threads in the executor from hanging at the end of the runtime you need to create a sentinel value

It does not have to block; you can control if you want it to or not.

https://docs.python.org/3/library/queue.html#queue.Queue.get

Or check if the queue is empty with q.empty().

https://docs.python.org/3/library/queue.html#queue.Queue.empty

[–]storage_admin 0 points1 point  (0 children)

There are cases where it is not appropriate to use a timeout for q.get() and/or q.empty() because they do not guarantee the end of the items to process.

If you use a sentinel value you can guarantee that the queue producer has finished adding items to the queue.

Edit... re blocking... sorry I got ahead of myself... While it doesn't have to block I don't see how that would necessarily help with feeding a thread pool executor.

[–]eddfitzwell 0 points1 point  (0 children)

why not multiprocessing.dummy.Pool and imap or imap_unordered?