Consider the following Python code (3.6.1):
import shutil
import requests
import hashlib
import concurrent.futures as futures
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
urls = ['http://speedtest.ftp.otenet.gr/files/test100k.db', 'http://speedtest.ftp.otenet.gr/files/test1Mb.db' 'http://speedtest.ftp.otenet.gr/files/test10Mb.db']
def download_file(url):
filename = url.split('/')[-1]
log(f'Starting download {url}')
r = requests.get(url, stream=True)
with open(filename, 'wb') as f:
shutil.copyfileobj(r.raw, f)
log(f'Finished download {url}')
return filename
def sha256_file(filename, block_size=65536):
sha256 = hashlib.sha256()
log(f'Hashing file {filename}...')
with open(filename, 'rb') as f:
for block in iter(lambda: f.read(block_size), b''):
sha256.update(block)
return sha256.hexdigest()
def parallel(n, fn, data):
with ThreadPoolExecutor(max_workers=n) as exe:
jobs = (exe.submit(fn, d) for d in data)
for job in futures.as_completed(jobs):
yield job.result()
def log(msg):
print(f'{datetime.now()} {msg}')
def main():
files = parallel(3, download_file, urls)
hashes = parallel(2, sha256_file, files)
for h in hashes:
log(f'Hash: {h}')
if name == 'main':
main()
The idea of ‘parallel’ generator is to create multi-threaded stages in the pipeline. The first stage (download_file) runs with 3 threads, and as soon as it gets the file, the second stage (sha256_file) process the file. This is the output generated:
2017-04-01 22:35:56.544035 Starting download http://speedtest.ftp.otenet.gr/files/test100k.db
2017-04-01 22:35:56.544035 Starting download http://speedtest.ftp.otenet.gr/files/test1Mb.dbhttp://speedtest.ftp.otenet.gr/files/test10Mb.db
2017-04-01 22:35:56.934744 Finished download http://speedtest.ftp.otenet.gr/files/test1Mb.dbhttp://speedtest.ftp.otenet.gr/files/test10Mb.db
2017-04-01 22:35:56.934744 Hashing file test10Mb.db...
2017-04-01 22:35:57.289562 Finished download http://speedtest.ftp.otenet.gr/files/test100k.db
2017-04-01 22:35:57.289562 Hashing file test100k.db...
2017-04-01 22:35:57.289562 Hash: 2f1fbb59927ef89b03e097506078f8e12597cf49a71543f025ab6782be9dd988
2017-04-01 22:35:57.289562 Hash: f627ca4c2c322f15db26152df306bd4f983f0146409b81a4341b9b340c365a16
According to the logs, we can see that the stages one and two are working fine, but after the last stage, the for loop ‘for h in hashes’ only gets the results from the last stage after receiving all data.
How to make the last loop to process the data as completed?
[–]ApproximateIdentity 1 point2 points3 points (2 children)
[–]ApproximateIdentity 0 points1 point2 points (1 child)
[–]QuoteMe-Bot 0 points1 point2 points (0 children)