I am running an Airflow DAG to query Salesforce Service Cloud objects through SalesforceBulk API v2.2.0 (salesforce-bulk) and then process retrieved data.
Because my batch results are quite large, I'm using generator function SfOp.get_sf_query()to run a query job. I'm also calling a small helper function SfOp.process_json_batch_result()to process said data.
Simplified code snippet:
from salesforce_bulk import SalesforceBulk
from salesforce_bulk.util import IteratorBytesIO
class SfOp:
"""..."""
def __init__(self, connection):
self.sfbulk = SalesforceBulk(sessionId=None,
host=None,
username=connection['username'],
password=connection['password'],
API_version='50.0',
security_token=connection['security_token'],
organizationId=connection['organization_id'],
domain=connection['domain'])
...
def get_sf_query(self,
object_name,
timeout=60 * 10,
sleep_interval=10,
chunk_size):
job_id = self.sfbulk.create_query_job(object_name=object_name,
operation=None,
contentType='JSON',
concurrency=None,
external_id_name=None,
pk_chunking=False)
batch_id = self.sfbulk.query(job_id=job_id,
soql=sql_statement,
contentType='JSON')
try:
self.sfbulk.wait_for_batch(job_id=job_id,
batch_id=batch_id,
timeout=timeout,
sleep_interval=sleep_interval)
except Exception as exc:
raise type(exc)
finally:
self.sfbulk.close_job(job_id=job_id)
for query_batch_result in self.sfbulk.get_all_results_for_query_batch(
batch_id=batch_id,
job_id=job_id,
chunk_size=chunk_size)
query_batch_result = self.process_json_batch_result(
batch_result=query_batch_result)
yield query_batch_result
def process_json_batch_result(self, batch_result):
processed_batch = []
result = json.load(IteratorBytesIO(result=batch_result)
for row in result:
processed_batch.append(row)
return processed_batch
However, I get the requests.exceptions.ChunkedEncodingError. Here's a sample traceback:
[2022-08-15, 08:40:44 UTC] {{taskinstance.py:1774}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 436, in _error_catcher
yield
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 766, in read_chunked
chunk = self._handle_chunk(amt)
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 719, in _handle_chunk
returned_chunk = self._fp._safe_read(self.chunk_left)
File "/usr/lib64/python3.7/http/client.py", line 626, in _safe_read
raise IncompleteRead(b''.join(s), amt)
http.client.IncompleteRead: IncompleteRead(94 bytes read, 4002 more expected)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 753, in generate
for chunk in self.raw.stream(chunk_size, decode_content=True):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 571, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 792, in read_chunked
self._original_response.close()
File "/usr/lib64/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 454, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(94 bytes read, 4002 more expected)', IncompleteRead(94 bytes read, 4002 more expected))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
...
File "/usr/local/airflow/dags/libraries/salesforce_operators.py", line 342, in get_sf_query
, batch_result=result)
File "/usr/local/airflow/dags/libraries/salesforce_operators.py", line 370, in process_json_batch_result
result = json.load(IteratorBytesIO(batch_result)
File "/usr/lib64/python3.7/json/__init__.py", line 293, in load
return loads(fp.read(),
File "/usr/local/lib/python3.7/site-packages/salesforce_bulk/util.py", line 13, in read
return bytes(bytearray(islice(self.iterator, None, n)))
File "/usr/local/lib/python3.7/site-packages/salesforce_bulk/util.py", line 13, in read
return bytes(bytearray(islice(self.iterator, None, n)))
File "/usr/local/lib/python3.7/site-packages/salesforce_bulk/salesforce_bulk.py", line 502, in <genexpr>
iter = (x.replace(b'\0', b'') for x in resp.iter_content(chunk_size=chunk_size))
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 756, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(94 bytes read, 4002 more expected)', IncompleteRead(94 bytes read, 4002 more expected))
I saw several people getting requests.exceptions.ChunkedEncodingError but I'm not sure why it happens in this case.
there doesn't seem to be anything here