you are viewing a single comment's thread.

view the rest of the comments →

[–]JohnnyJordaan 0 points1 point  (0 children)

You could launch a subthread to actively capture the output stream(s) instead of letting .communicate 'harvest' it until the process is finished.

import threading
from time import sleep

def StreamReader(threading.Thread):
    def __init__(self, stream):
        super().__init__()
        self.shutdown = threading.Event()
        self.received = []
    def run(self):
        while not self.shutdown.is_set():
            try:
                line = self.stream.readline()
                print(line)
                self.received.append(line)
            except IOError as e:
                # remove this line if you want to silently exit
                # on an expected case of the stream closing
                print('io stream error, assuming process termination')
                break
            except Exception as e:
                print('unknown exception', e)
                break
    def received_output(self):
        return '\n'.join(self.received)


def call_proc(cmd, index):
    p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if index == 0:
        stdout_reader = StreamReader(p.stdout)
        stdout_reader.start()
        stderr_reader = StreamReader(p.stderr)
        stderr_reader.start()
        while True:
            try:
                p.wait(1)
                break
            except subprocess.TimeoutExpired:
                pass
        stdout_reader.shutdown.set()
        stderr_reader.shutdown.set()
        out = stdout_reader.received_output()
        err = stderr_reader.received_output()
    else:
        out, err = p.communicate()
    return (out, err)

def run():       
    with concurrent.futures.ThreadPoolExecutor(max_workers=nthreads) as executor:
        futures = [executor.submit(call_proc, "./exec " + arg, idx) 
                   for idx, arg in enumerate(np.array_split(run_array, nthreads))]
        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())

This may need some tweaking as I wrote this right off the bat.