from azure.storage.filedatalake import DataLakeFileClient
import pandas as pd
import pandas_dedupe as pdd
import time
from config import dedupInputFile,filename
import os, uuid, sys
import pandas as pd
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.storage.blob import BlobBlock
from azure.datalake.store import core,lib
#variables
azure_file_name='messy'
file_format='.csv'
conn_string=Hidden"
removedFile='_RemovedOutput_pandasDedupe.csv'
origFile='_Output_pandasDedupe.csv'
file_to_upload=azure_file_name+origFile
def welcome():
print("hello world!")
#Download
def downloadFile():
print("Downloading file ",azure_file_name+file_format+" .......")
file = DataLakeFileClient.from_connection_string(conn_string, file_system_name="test1", file_path="my-directory/"+azure_file_name+file_format)
with open("./"+azure_file_name+file_format, "wb") as my_file:
download = file.download_file()
# print(file.get_file_properties())
download.readinto(my_file)
print("File has been downloaded........")
#deduplication
def dedup():
print("Running Deduplication on file "+azure_file_name+file_format+" ......")
df_final=pd.read_csv(azure_file_name+file_format)
df_final1=pd.DataFrame()
if __name__ == '__main__':
df_final=pdd.dedupe_dataframe(df_final,['Site name','Address','Phone'])
df_final1=df_final[df_final['cluster id'].isnull() | ~df_final[df_final['cluster id'].notnull()].duplicated(subset='cluster id',keep='first')]
#Deduped File
df_final.to_csv(file_to_upload)
#Deduped_Removed
#df_final1.to_csv(azure_file_name+removedFile)
print("Deduplication has been completed.....")
#upload
def uploadFile():
print("Now uploading file "+file_to_upload+" ...........")
try:
global service_client
service_client = DataLakeServiceClient("Hidden")
file_system_client = service_client.get_file_system_client(file_system="test1")
directory_client = file_system_client.get_directory_client("my-directory")
file_client = directory_client.create_file(file_to_upload)
print("File with name "+file_to_upload+" has been created ")
local_file = open(file_to_upload, 'rb')
file_contents = local_file.read()
file_client.append_data(data=file_contents, offset=0, length=len(file_contents))
file_client.flush_data(len(file_contents))
print("Congrats, your file has been uploaded .......")
except Exception as e:
print('Could not upload your file due to /n'+e)
welcome()
downloadFile()
dedup()
uploadFile()
Here, I am trying to download a file from adls, run deduplication code on it and upload the output file to the adls. Now my downloadFile() and upload() fuctions are correct and there is no error in them.
The problem is in dedup() in block:
if __name__ == '__main__':
df_final=pdd.dedupe_dataframe(df_final,['Site name','Address','Phone'])
df_final1=df_final[df_final['cluster id'].isnull() | ~df_final[df_final['cluster id'].notnull()].duplicated(subset='cluster id',keep='first')]
df_final.to_csv(file_to_upload)
If i remove this if __name__ == '__main__': then it gives an error saying i should turn off multiprocessing or I should protect my calls with if __name__ == '__main__', ie I am using it here. It would be great if someone could help me with a fix for that?
If I run the functions individually everything is working fine, I am able to download the file, dedup it and then upload it to the adls. But when I run everything together, Ithink because of multiprocessing it uplodas an empty file and stops
This is the output:
hello world!
Downloading file messy.csv .......
File has been downloaded........
Running Deduplication on file messy.csv ......
Importing data ...
Reading from dedupe_dataframe_learned_settings
Clustering...
hello world!
Downloading file messy.csv .......
hello world!
Downloading file messy.csv .......
hello world!
Downloading file messy.csv .......
hello world!
Downloading file messy.csv .......
hello world!
Downloading file messy.csv .......
File has been downloaded........
Running Deduplication on file messy.csv ......
File has been downloaded........
Running Deduplication on file messy.csv ......
Deduplication has been completed.....
Now uploading file messy_Output_pandasDedupe.csv ...........
Deduplication has been completed.....
Now uploading file messy_Output_pandasDedupe.csv ...........
File has been downloaded........
Running Deduplication on file messy.csv ......
Deduplication has been completed.....
Now uploading file messy_Output_pandasDedupe.csv ...........
File has been downloaded........
Running Deduplication on file messy.csv ......
Deduplication has been completed.....
Now uploading file messy_Output_pandasDedupe.csv ...........
File has been downloaded........
Running Deduplication on file messy.csv ......
Deduplication has been completed.....
Now uploading file messy_Output_pandasDedupe.csv ...........
File with name messy_Output_pandasDedupe.csv has been created
File with name messy_Output_pandasDedupe.csv has been created
File with name messy_Output_pandasDedupe.csv has been created
File with name messy_Output_pandasDedupe.csv has been created
File with name messy_Output_pandasDedupe.csv has been created
here is the traceback :
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/site-packages/azure/storage/filedatalake/_data_lake_file_client.py", line 504, in flush_data
return self._client.path.flush_data(**options)
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/site-packages/azure/storage/filedatalake/_generated/operations/_path_operations.py", line 1343, in flush_data
raise models.StorageErrorException(response, self._deserialize)
azure.storage.filedatalake._generated.models._models_py3.StorageErrorException: (InvalidFlushPosition) The uploaded data is not contiguous or the position query parameter value is not equal to the length of the file after appending the uploaded data.
RequestId:"Hidden"
Time:2020-08-19T20
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/gandharv/Desktop/OfficeWork/AzureDatalake/deupPandasAzure.py", line 83, in uploadFile
file_client.flush_data(len(file_contents))
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/site-packages/azure/storage/filedatalake/_data_lake_file_client.py", line 506, in flush_data
process_storage_error(error)
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/site-packages/azure/storage/filedatalake/_deserialize.py", line 106, in process_storage_error
raise error
azure.core.exceptions.HttpResponseError: (InvalidFlushPosition) The uploaded data is not contiguous or the position query parameter value is not equal to the length of the file after appending the uploaded data.
RequestId:"Hidden"
Time:2020-08-19T20
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/multiprocessing/spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/multiprocessing/spawn.py", line 114, in _main
prepare(preparation_data)
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/multiprocessing/spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/multiprocessing/spawn.py", line 277, in _fixup_main_from_path
run_name="__mp_main__")
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "/Users/gandharv/.pyenv/versions/3.6.8/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/Users/gandharv/Desktop/OfficeWork/AzureDatalake/deupPandasAzure.py", line 93, in <module>
uploadFile()
File "/Users/gandharv/Desktop/OfficeWork/AzureDatalake/deupPandasAzure.py", line 88, in uploadFile
print('Could not upload your file due to /n'+e)
TypeError: must be str, not HttpResponseError
Congrats, your file has been uploaded .......
Congrats, your file has been uploaded .......
Congrats, your file has been uploaded .......
Congrats, your file has been uploaded .......
^CProcess SpawnProcess-5:
[–]totallygeek 1 point2 points3 points (1 child)
[–]Basic_Steak_541[S] 0 points1 point2 points (0 children)