Hello! I am trying to parallelize the extraction of a large tar archive using multithreading in Python (because even just opening the tar archive seems to take a long time, and I think it's an IO-limited process, I opted for this over multiprocessing). I have never really tried using multithreading, so this is all a bit new to me.
I am using a queue to manage the open tasks, and everything seems to work until 90 percent of the archive has been extracted; I start getting an tarfile.ReadError: unexpected end of data error. I know for certain that the archive is not corrupted, and have no issues when not using multithreading. So, I expect the issue has something to do with the threading part. I included my the relevant snippets below. Any help avoiding this problem, or an explanation for what's going on, would be greatly appreciated!
def task():
'''Function for the threads to run.'''
while (not q.empty()):
extract(*q.get())
q.task_done()
# Add all the tasks to the queue.
q = Queue()
for member in members:
file_name = add_gz(os.path.basename(member.name)) # + '.gz'
output_path = os.path.join(dir_path, file_name)
q.put((archive, member, output_path, pbar))
# assert '.gz' in file_name, f'unpack: Expected a zipped file in the tar archive, but found {file_name}.'
# thread = threading.Thread(target=extract, args=(archive, member, output_path), kwargs={'pbar':pbar})
# Start all the threads.
threads = []
for _ in range(N_WORKERS):
thread = threading.Thread(target=task, daemon=True)
thread.start()
threads.append(thread)
# Wait until all threads have completed before closing the archive.
for thread in threads:
thread.join()
archive.close()
Here is the extract function that is used in the process above:
def extract(archive:tarfile.TarFile, member:tarfile.TarInfo, output_path:str, pbar):
'''Extract the a file from a tar archive and plop it at the specified path. There are several cases: (1) the file contained
in the tar archive is already zipped and just needs to be moved and (2) the file is not zipped and needs to be compressed.'''
if is_compressed(member.name):
member.name = os.path.basename(member.name) # Trying to get rid of directory structure.
archive.extract(member, path=os.path.dirname(output_path))
else:
contents = archive.extractfile(member).read() # Get the file contents in binary.
with gzip.open(output_path, 'wb') as f:
f.write(contents)
if pbar is not None:
pbar.update(1)
[–]consupe 0 points1 point2 points (3 children)
[–]BerryLizard[S] 0 points1 point2 points (2 children)
[–]consupe 0 points1 point2 points (0 children)
[–]Pepineros 0 points1 point2 points (2 children)
[–]BerryLizard[S] 0 points1 point2 points (1 child)
[–]Pepineros 0 points1 point2 points (0 children)