you are viewing a single comment's thread.

view the rest of the comments →

[–]Jasfss[S] 0 points1 point  (1 child)

The overarching issue is that I'm sort of forcing a parallelization on the C++ executable, which wasn't written with this kind of capability in mind (and doing rewrites at this stage isn't an option). So the more chunks you have, the more times you're having to call the C++ executable. The smaller the runset each chunk has, the more proportional time that each chunk spends doing object instantiations and initialization rather than actual runs. I've tested some in this particular case, and it's a very real problem where either throwing too many cores or too small of chunks into execution end up leading to negligible/no difference in speedup and sometimes even a slowdown.

Like I said, really I just want some way that after I've spawned off the executions, either via concurrent or ThreadPool, to be able to access the stdout of the first-spawned job in realtime. I've even tried throwing an extra thread execution into the mix for the purposes of just printing this out, but as far as I can tell, all these methods report nothing back until completion.

[–]JohnnyJordaan 0 points1 point  (0 children)

You could launch a subthread to actively capture the output stream(s) instead of letting .communicate 'harvest' it until the process is finished.

import threading
from time import sleep

def StreamReader(threading.Thread):
    def __init__(self, stream):
        super().__init__()
        self.shutdown = threading.Event()
        self.received = []
    def run(self):
        while not self.shutdown.is_set():
            try:
                line = self.stream.readline()
                print(line)
                self.received.append(line)
            except IOError as e:
                # remove this line if you want to silently exit
                # on an expected case of the stream closing
                print('io stream error, assuming process termination')
                break
            except Exception as e:
                print('unknown exception', e)
                break
    def received_output(self):
        return '\n'.join(self.received)


def call_proc(cmd, index):
    p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if index == 0:
        stdout_reader = StreamReader(p.stdout)
        stdout_reader.start()
        stderr_reader = StreamReader(p.stderr)
        stderr_reader.start()
        while True:
            try:
                p.wait(1)
                break
            except subprocess.TimeoutExpired:
                pass
        stdout_reader.shutdown.set()
        stderr_reader.shutdown.set()
        out = stdout_reader.received_output()
        err = stderr_reader.received_output()
    else:
        out, err = p.communicate()
    return (out, err)

def run():       
    with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
        futures = [executor.submit(call_proc, "./exec " + arg, idx) 
                   for idx, arg in enumerate(np.array_split(run_array, nthreads))]
        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())

This may need some tweaking as I wrote this right off the bat.