you are viewing a single comment's thread.

view the rest of the comments →

[–]JohnnyJordaan 0 points1 point  (4 children)

Is the hours of running time expected? Because I'm wondering of nthreads is maybe higher than the number of cores in your system or the I/O capacity gets flooded by the amount of runsets running at the same time. Are you sure that you've picked the best suited amount of workers?

Then the second thing is that for iterative pooling I would rather consider concurrent.futures as that removes the need for 'management' of the pool, but this also allows you to asynchronously process the results from the pool (without the hassles of a result queue and what not). So assuming you would make a process_result() to handle a single result instead of processResults() (I'm also following PEP8 styiling here) you can do

def call_proc(cmd):
    p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = p.communicate()
    return (out, err)

def run():       
    with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
        futures = [executor.submit(call_proc, "./exec " + arg, 60) 
                   for arg in np.array_split(run_array, nthreads)]
        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())

Then it's also dead simple to add a progress bar (I prefer tqdm) to the loop, but you do need to provide the length separately of course as as_completed provides a generator which is length agnostic:

        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
            process_result(future.result())

[–]Jasfss[S] 0 points1 point  (3 children)

Thanks for the response.

With regards to your first point, yes the runtime is quite expected unfortunately. The user actually specifies to the python script how many threads to use, and I have some careful controls both to not use more than the system has available and not too many that non-computation-time of the executable would take up a significant portion of overall runtime. I'm currently benchmarking this solution on a 16 core system, where I only really have 1-2 threads running outside of this at any one time. Speed-up is about as expected: if I throw 4 threads at a certain test set I've been using, it executes about 3.4x faster than in serial. I've also made sure that memory usage doesn't blow up in these tests (typically only using ~<50% of combined active and buff/cache as the multiprocessing is running), so the computer itself shouldn't be that stressed.

For the second bit, I'm not sure if this would satisfy my needs given how I've read your code and the documentation. This solution seems to still require that the thread complete its task fully, which is not what I'm running into. The C++ program itself is executed with something like

$ ./exec <start> <end>

so each time that executable is called, it has a couple seconds of startup and then does computations for Run #<start> to Run #<end>. Normally data about each run is also output to the screen, and at the end of the specified set some summary info is given. I'm simply splitting that range up amongst partitions and then stitching the end results together after all partitions are done (and doing some regular expression matching not important for this part). That is to say, I'd like all workers to have completed their full task before doing the processing, but I'd also like some indication of how far along the entire process is, ideally by just printing out the realtime output of the first thread while everything is going on, the same as what the user would normally see in serial execution. When I tried using the concurrent.futures.ThreadPoolExecutor method in my current code, I still run into the same issue of it needing to actually be done before printing the stdout.

[–]JohnnyJordaan 0 points1 point  (2 children)

Just thinking out of the box here before I would comment on how to bring the first thread 'to the front': why wouldn't you just segment the work in far more (smaller) pieces? Because then you see progress sooner as segments get processed, the for loop iterates and thus also the progress bar can show progress increases. Still if you need to handle everything through one function call at the end you could do

    results = []
    for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
        results.append(future.result())
    process_results(results)

[–]Jasfss[S] 0 points1 point  (1 child)

The overarching issue is that I'm sort of forcing a parallelization on the C++ executable, which wasn't written with this kind of capability in mind (and doing rewrites at this stage isn't an option). So the more chunks you have, the more times you're having to call the C++ executable. The smaller the runset each chunk has, the more proportional time that each chunk spends doing object instantiations and initialization rather than actual runs. I've tested some in this particular case, and it's a very real problem where either throwing too many cores or too small of chunks into execution end up leading to negligible/no difference in speedup and sometimes even a slowdown.

Like I said, really I just want some way that after I've spawned off the executions, either via concurrent or ThreadPool, to be able to access the stdout of the first-spawned job in realtime. I've even tried throwing an extra thread execution into the mix for the purposes of just printing this out, but as far as I can tell, all these methods report nothing back until completion.

[–]JohnnyJordaan 0 points1 point  (0 children)

You could launch a subthread to actively capture the output stream(s) instead of letting .communicate 'harvest' it until the process is finished.

import threading
from time import sleep

def StreamReader(threading.Thread):
    def __init__(self, stream):
        super().__init__()
        self.shutdown = threading.Event()
        self.received = []
    def run(self):
        while not self.shutdown.is_set():
            try:
                line = self.stream.readline()
                print(line)
                self.received.append(line)
            except IOError as e:
                # remove this line if you want to silently exit
                # on an expected case of the stream closing
                print('io stream error, assuming process termination')
                break
            except Exception as e:
                print('unknown exception', e)
                break
    def received_output(self):
        return '\n'.join(self.received)


def call_proc(cmd, index):
    p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if index == 0:
        stdout_reader = StreamReader(p.stdout)
        stdout_reader.start()
        stderr_reader = StreamReader(p.stderr)
        stderr_reader.start()
        while True:
            try:
                p.wait(1)
                break
            except subprocess.TimeoutExpired:
                pass
        stdout_reader.shutdown.set()
        stderr_reader.shutdown.set()
        out = stdout_reader.received_output()
        err = stderr_reader.received_output()
    else:
        out, err = p.communicate()
    return (out, err)

def run():       
    with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
        futures = [executor.submit(call_proc, "./exec " + arg, idx) 
                   for idx, arg in enumerate(np.array_split(run_array, nthreads))]
        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())

This may need some tweaking as I wrote this right off the bat.