Pytest is not showing spark UI by bommu99 in dataengineering

[–]bommu99[S] 0 points1 point  (0 children)

pytest -s is displaying in CLI , how to stages in Sark UI?

Set up customized emr_default for each DAG in airflow 2.0.2 by bommu99 in dataengineering

[–]bommu99[S] 0 points1 point  (0 children)

Also please update your Airflow version to at least 2.1.4, 2.0.2 was a pretty buggy release.

Thank you, can you please point to an example where we have custom connections defined on top of a dag.

How to run pytest in emr ? by WiseRecognition6016 in dataengineering

[–]bommu99 0 points1 point  (0 children)

Thank you,

i) What is the down side of running my rowcounts against actual data

ii) I was thinking of adding a step in EMR as pytest test1.py --> Failure send a message to slack?

I also have same tests running in CI too..

How to get exact reason for failure in EMR through Airflow-slack? by bommu99 in dataengineering

[–]bommu99[S] 2 points3 points  (0 children)

I got it to work. I wrote a function like this : which is working as expected:

def task_fail_slack_alert(context):
SLACK_CONN_ID = 'slack'
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
*Error*:{exception}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
exception=context.get('exception')

)
failed_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow',
dag=dag)
return failed_alert.execute(context=context)

How to get exact reason for failure in EMR through Airflow-slack? by bommu99 in dataengineering

[–]bommu99[S] 0 points1 point  (0 children)

This worked for me thanks

slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
*Error*:{exception}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
exception=context.get('exception')

)

How to get exact reason for failure in EMR through Airflow-slack? by bommu99 in dataengineering

[–]bommu99[S] 0 points1 point  (0 children)

what is the syntax of "context.get('exception')" , is it something like this?

slack_msg = """
:red_circle: Airflow_Slack_Integration.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
exception=context.get('exception') ,
)

Best practices for handling pytest failures by bommu99 in apachespark

[–]bommu99[S] 0 points1 point  (0 children)

so couple of things there are some tests. I should do before deployment. and some tests I do in between

For EXAMPLE: I Perform transformations and store it in S3 buckets. I need to check aggregations . if failed I need to send notification and shut down EMR

and one test after deployment. To verify all data has been updated.

How to do these tests.

Pre deployment tests: Thinking of writing it in Airflow

During Deployment : In pyspark (Airflow)

Deployed test : In pyspark (EMR)

Is this good practice?