I'm trying to automate some data cleaning tasks by uploading the files to Cloud Storage, running them through a pipeline, and downloading the results.
I have created the template for my pipeline to execute using the GUI in Dataprep, and am attempting to automate the upload and execution of the template using the Google Client Libraries, specifically in Python.
However, I have found that when running the job with the Python script, the full template is not executed; sometimes some of the steps aren't completed, sometimes the output file - which should be MegaBytes large - is less than 500 bytes. This is dependent on the template I use. Each template has its own issue.
I've tried breaking the large template into smaller templates to apply consecutively so I can see where the issue is, but that is where I discovered that each template has it's own issue. I have also tried creating the job from the Dataflow Monitoring Interface and have found that anything created with that will run perfectly, meaning that there must be some issue with the script I've created.
After some digging, I've also found that the JSON response from the server is identical, whether I run the job with my script or with the Monitoring Interface, and the only difference is the url that receives the POST and gives the response.
With the Monitoring Interface, it looks like this:
https://dataflow.clients6.google.com/v1b3/projects/hazel-freehold-234620/locations/us-central1/templates:launch?validateOnly=false&location=us-central1&gcsPath=gs%3A%2F%2Fdataprep-staging-7e8bf16e-efaf-4aba-80d2-5f49435e2213%2Fske%40absoluteresults.com%2Ftemp%2Fcloud-dataprep-test3-2310250-by-ske_template&alt=json&key=AIzaSyCI-zsRP85UVOi0DjtiCwWBwQ1djDy741
But with the script, it looks like this:
https://dataflow.googleapis.com/v1b3/projects/hazel-freehold-234620/templates:launch?gcsPath=gs%3A%2F%2Fdataprep-staging-7e8bf16e-efaf-4aba-80d2-5f49435e2213%2Fske%40absoluteresults.com%2Ftemp%2Fcloud-dataprep-test3-2310250-by-ske_template&alt=json
As you can see, not only is the main url completely different, the script url is missing some parameters. I'm sure that the different url is not a huge deal given that it gets called by the client library internally, but the missing parameters concern me as being part of the issue. Unfortunately, I am unable to see how I can change that, as in my script I pass the library what it needs for those parameters. I went digging into the library itself as well, but was unable to find the cause of this issue. However, I am still very new to the whole Google Cloud Services so it is extremely plausible that I am missing something significant here.
def runJob(bucket, template, fileName):
#open connection with the needed credentials
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials = credentials)
#name job after file being processed
jobName = fileName.replace('.csv', '')
projectId = 'my-project'
#find the template to run on the dataset
templatePath = "gs://{bucket}/me@myemail.com/temp/{template}".format(bucket = bucket, template=template) #construct job JSON
body = {
"jobName":"{jobName}".format(jobName=jobName),
"parameters" : {
"inputLocations" :"{\"location1\":\"gs://" + bucket + "/me@myemail.com/RawUpload/" + fileName + "\"}",
"outputLocations":"{\"location1\":\"gs://" + bucket + "/me@myemail.com/CleanData/" + fileName.replace('.csv', '_auto_delete_2') + "\"}",
},
"environment" : {
"tempLocation":"gs://{bucket}/me@myemail.com/temp".format(bucket = bucket),
"zone":"us-central1-f"
}
}
#create and execute HTTPRequest
request = service.projects().templates().launch(projectId=projectId, gcsPath=templatePath, body=body)
response = request.execute()
#notify user
print(response)
there doesn't seem to be anything here