Scalable alerting for Apache Airflow to improve data orchestration reliability and performance

4 months ago 38
News Banner

Looking for an Interim or Fractional CTO to support your business?

Read more

About

Apache Airflow is a popular tool for orchestrating data workflows. Google Cloud offers a managed Airflow service called Cloud Composer, a fully managed workflow orchestration service built on Apache Airflow that enables you to author, schedule, and monitor pipelines. And when running Cloud Composer, it’s important to have a robust logging and alerting setup to monitor your DAGs (Directed Acyclic Graphs) and minimize downtime in your data pipelines. 

In this guide, we will review the hierarchy of alerting on Cloud Composer and the various alerting options available to Google Cloud engineers using Cloud Composer and Apache Airflow. 

Getting started

Hierarchy of alerting on Cloud Composer

Composer environment

Cloud Composer environments are self-contained Airflow deployments based on Google Kubernetes Engine. They work with other Google Cloud services using connectors built into Airflow. 

Cloud Composer provisions Google Cloud services that run your workflows and all Airflow components. The main components of an environment are GKE cluster, Airflow web server, Airflow database, and Cloud Storage bucket. For more information, check out Cloud Composer environment architecture

Alerts at this level primarily consist of cluster and Airflow component performance and health.

Airflow DAG Runs

A DAG Run is an object representing an instantiation of the DAG at a point in time. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the task’s state. Each DAG Run is run separately from one another, meaning that you can have many runs of a DAG at the same time.

Alerts at this level primarily consist of DAG Run state changes such as Success and Failure, as well as SLA Misses. Airflow’s Callback functionality can trigger code to send these alerts.

Airflow Task instances

A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them in order to express the order they should run in. Airflow tasks include Operators and Sensors.

Like Airflow DAG Runs, Airflow Tasks can utilize Airflow Callbacks to trigger code to send alerts. 

Summary

To summarize Airflow’s alerting hierarchy: Google Cloud → Cloud Composer Service → Cloud Composer Environment → Airflow Components (Worker) → Airflow DAG Run → Airflow Task Instance.

Any production-level implementation of Cloud Composer should have alerting and monitoring capabilities at each level in the hierarchy. Our Cloud Composer engineering team has extensive documentation around monitoring and alerting at the service/environment level. 

Airflow Alerting on Google Cloud

Now, let’s consider three options for alerting at the Airflow DAG Run and Airflow Task level. 

Option 1: Log-based alerting policies

Google Cloud offers native tools for logging and alerting within your Airflow environment. Cloud Logging centralizes logs from various sources, including Airflow, while Cloud Monitoring lets you set up alerting policies based on specific log entries or metrics thresholds.

You can configure an alerting policy to notify you whenever a specific message appears in your included logs. For example, if you want to know when an audit log records a particular data-access message, you can get notified when the message appears. These types of alerting policies are called log-based alerting policies. Check out Configure log-based alerting policies | Cloud Logging to learn more.

These services combine nicely with Airflow’s Callback feature previously mentioned above. To accomplish this:

  1. Define a Callback function and set at the DAG or Task level.

  2. Use Python’s native logging library to write a specific log message to Cloud Logging.

  3. Define a log-based alerting policy triggered by the specific log message and sends alerts to a notification channel.

Pros and cons

Pros:

  • Lightweight, minimal setup: no third party tools, no email server set up, no additional Airflow providers required

  • Integration with Logs Explorer and Log-based metrics for deeper insights and historical analysis

  • Multiple notification channel options

Cons:

  • Email alerts contain minimal info

  • Learning curve and overhead for setting up log sinks and alerting policies

  • Costs associated with Cloud Logging and Cloud Monitoring usage

Sample code

Airflow DAG Callback:

code_block <ListValue: [StructValue([('code', 'import time\r\nimport random\r\nimport logging\r\nfrom datetime import datetime, timedelta\r\nfrom airflow import models\r\nfrom airflow.operators.python_operator import PythonOperator\r\nfrom airflow import AirflowException\r\n\r\nON_DAG_FAILURE_ALERT = "Airflow DAG Failure:"\r\nON_SLA_MISS_ALERT = "Airflow DAG SLA Miss:"\r\n\r\ndef log_on_dag_failure(context):\r\n """collect DAG information and send to console.log on failure."""\r\n dag = context.get(\'dag\')\r\n\r\n log_msg = f"""\r\n {ON_DAG_FAILURE_ALERT}\r\n *DAG*: {dag.dag_id}\r\n *DAG Description*: {dag.description}\r\n *DAG Tags*: {dag.tags}\r\n *Context*: {context}\r\n """\r\n\r\n logging.info(log_msg)\r\n\r\ndef log_on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):\r\n """collect DAG information and send to console.log on sla miss."""\r\n\r\n log_msg = f"""\r\n {ON_SLA_MISS_ALERT}\r\n *DAG*: {dag.dag_id}\r\n *DAG Description*: {dag.description}\r\n *DAG Tags*: {dag.tags}\r\n *Task List*: {task_list}\r\n *Blocking*: {blocking_task_list}\r\n *slas*: {slas}\r\n *blocking task ids*: {blocking_tis}\r\n """\r\n\r\n logging.info(log_msg)\r\n\r\n\r\nwith models.DAG(\r\n f"log_alert_demo",\r\n schedule=f"*/5 * * * *", # every 5 minutes\r\ndefault_args={\r\n"owner": "Google",\r\n"depends_on_past": False,\r\n"retries": 1,\r\n"retry_delay": timedelta(minutes=1),\r\n"sla": timedelta(minutes=1), # intentionally trigger sla misses\r\n},\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n on_failure_callback=log_on_dag_failure,\r\n sla_miss_callback=log_on_sla_miss,\r\n) as dag:\r\n\r\n def three_minute_run():\r\n """\r\n run for three minutes\r\n """\r\n time.sleep(180)\r\n\r\n # 30% chance to raise an exception\r\n if random.randint(0,9) % 3 == 0:\r\n raise AirflowException("Error msg")\r\n\r\n three_minute_task = PythonOperator(\r\n task_id=\'three_minute_task\', \r\n python_callable=three_minute_run,\r\n )\r\n\r\n three_minute_task'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e14c5f81fd0>)])]>

This Airflow DAG uses a Python operator to miss a defined SLA and/or raise an Airflow Exception. If the DAG Run enters a failed state it triggers the log_on_dag_failure callback function and if it misses an SLA it triggers the log_on_sla_miss callback function. Both of these callbacks log a specific message string "Airflow DAG Failure:" and "Airflow SLA Miss:" respectively. These are the messages that the log-based alerting catches and uses to send an alert to the defined notification channel.

Airflow Task callback:

code_block <ListValue: [StructValue([('code', 'import time\r\nimport random\r\nimport logging\r\nfrom datetime import timedelta\r\nfrom airflow import models\r\nfrom airflow.operators.python_operator import PythonOperator\r\nfrom airflow import AirflowException\r\n\r\nON_TASK_FAILURE_ALERT = "Airflow Task Failure:"\r\n\r\ndef log_on_task_failure(context):\r\n ti = context.get(\'task_instance\')\r\n log_msg = f"""\r\n {ON_TASK_FAILURE_ALERT}\r\n *DAG*: {ti.dag_id}\r\n *DAG Run*: {ti.run_id}\r\n *Task*: {ti.task_id}\r\n *state*: {ti.state}\r\n *operator*: {ti.operator}\r\n """\r\n\r\n logging.info(log_msg)\r\n\r\n\r\nwith models.DAG(\r\n f"log_alert_demo",\r\n schedule=f"*/5 * * * *", # every 5 minutes\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n) as dag:\r\n\r\n def one_minute_run():\r\n """\r\n run for one minutes\r\n """\r\n time.sleep(60)\r\n\r\n # 50% chance to raise an exception\r\n if random.randint(0,8) % 2 == 0:\r\n raise AirflowException("Error msg")\r\n\r\n\r\n one_minute_task = PythonOperator(\r\n task_id=\'one_minute_task\', \r\n python_callable=one_minute_run,\r\n on_failure_callback=log_on_task_failure\r\n )\r\n\r\n one_minute_task'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e14c5f81850>)])]>

In this example, the task instance itself calls back to log_on_task_failure. Since you can set specific callback functions at the task-level, you have great flexibility on when and how you send alerts based on a given task.

Option 2: Email alerts via SendGrid

SendGrid is an SMTP service provider and Cloud Composer’s email notification service of choice. For more information, check out how to Configure email notifications on Cloud Composer.

Pros and cons

Pros:

  • Widely supported and reliable notification method

  • Detailed emails with formatted log snippets for analysis

  • Uses native Airflow EmailOperator

  • Flexible recipient lists on a per-task basis

Cons:

  • Can be overwhelming with a high volume of alerts

  • Requires configuring an external email provider (SendGrid) and managing email templates

  • Might get lost in inboxes if not prioritized or filtered correctly

  • Costs associated with SendGrid

Sample code

EmailOperator

code_block <ListValue: [StructValue([('code', 'from datetime import datetime\r\n\r\nimport airflow\r\nfrom airflow.operators.email import EmailOperator\r\n\r\nwith airflow.DAG(\r\n "demo_sendgrid_email",\r\n start_date=datetime(2024, 1, 1),\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n "sla": timedelta(minutes=55),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n) as dag:\r\n task_email = EmailOperator(\r\n task_id="send-email",\r\n conn_id="sendgrid_default",\r\n # You can specify more than one recipient with a list.\r\n to="[email protected]",\r\n subject="EmailOperator test for SendGrid",\r\n html_content="This is a test message sent through SendGrid."\r\n )'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e14c5f81400>)])]>

Option 3: Third-party tools such as Slack, Pagerduty

Since Airflow is open-source, there are other providers to choose from that can handle alerting and notifications for you, such as Slack or Pagerduty

Pros and cons

Pros:

  • Real-time notifications in a familiar communication channel

  • Customizable formatting and the ability to send messages to specific channels or users

  • Third-party options integrate with your team's existing communication workflow. Alerts can be discussed directly, keeping the context and resolution steps together. This promotes faster troubleshooting and knowledge sharing compared to isolated emails or logging entries.

Cons:

  • Requires a third-party workspace , webhook, and API token setup

  • Requires management of additional Airflow connections

  • Might lead to notification fatigue if not used judiciously

  • Potential security concerns if the webhook or API token is compromised

  • Potentially limited long-term log storage within third-party message history

  • Costs associated with third-party tools

Sample code

Slack:

code_block <ListValue: [StructValue([('code', 'from airflow import DAG\r\nfrom datetime import datetime, timedelta\r\nfrom airflow.operators.python import PythonOperator\r\nfrom airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator\r\n\r\n# Replace with your actual Slack webhook URL (get it from your Slack app settings)\r\nSLACK_CONN_ID = \'your_slack_connection\'\r\n\r\ndef task_fail_slack_alert(context):\r\n slack_msg = f"""\r\n :red_circle: Airflow Task Failed :red_circle:\r\n *Task*: {context.get(\'task_instance\').task_id} \r\n *DAG*: {context.get(\'task_instance\').dag_id}\r\n *Execution Time*: {context.get(\'execution_date\')}\r\n *Log URL*: {context.get(\'task_instance\').log_url}\r\n """\r\n return SlackWebhookOperator(\r\n task_id=\'slack_alert\',\r\n http_conn_id=SLACK_CONN_ID,\r\n message=slack_msg,\r\n ).execute(context=context) # Send the message immediately\r\n\r\nwith DAG(\r\n \'demo_slack_alerts\',\r\n start_date=datetime(2024, 1, 1),\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n "sla": timedelta(minutes=55),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n) as dag:\r\n\r\n task_that_might_fail = PythonOperator(\r\n task_id=\'failing_task\',\r\n python_callable=lambda: 1 / 0, # This will raise a ZeroDivisionError\r\n on_failure_callback=task_fail_slack_alert,\r\n )'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e14c5f81220>)])]>

Pagerduty:

code_block <ListValue: [StructValue([('code', 'from datetime import datetime\r\nfrom airflow import DAG\r\nfrom airflow.operators.bash import BashOperator\r\nfrom airflow.providers.pagerduty.notifications.pagerduty import send_pagerduty_notification\r\n\r\nwith DAG(\r\n "demo_pagerduty_alerts",\r\n start_date=datetime(2024, 1, 1),\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n "sla": timedelta(minutes=55),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n on_failure_callback=[\r\n send_pagerduty_notification(\r\n summary="The dag {{ dag.dag_id }} failed",\r\n severity="critical",\r\n source="airflow dag_id: {{dag.dag_id}}",\r\n dedup_key="{{dag.dag_id}}-{{ti.task_id}}",\r\n group="{{dag.dag_id}}",\r\n component="airflow",\r\n class_type="Prod Data Pipeline",\r\n )\r\n ],\r\n):\r\n BashOperator(\r\n task_id="mytask",\r\n bash_command="fail",\r\n on_failure_callback=[\r\n send_pagerduty_notification(\r\n summary="The task {{ ti.task_id }} failed",\r\n severity="critical",\r\n source="airflow dag_id: {{dag.dag_id}}",\r\n dedup_key="{{dag.dag_id}}-{{ti.task_id}}",\r\n group="{{dag.dag_id}}",\r\n component="airflow",\r\n class_type="Prod Data Pipeline",\r\n )\r\n ],\r\n )'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e14c5f81550>)])]>

Opinionated guidance and next steps

Considering the pros and cons, we recommend log-based alerting policies (Option 1) for Airflow alerting in production environments. This approach offers scalable log collection, simple threshold-based alerting, diverse notification channels, metric exploration, and integration with other Google Cloud services. Logging is intuitive and integrated with Cloud Composer, eliminating the need for extra provider packages.

By incorporating logging and alerting into your Airflow DAGs, you proactively monitor your data pipelines and leverage the full potential of Google Cloud.

To learn more about Cloud Composer, Apache Airflow, and the alerting mechanisms discussed in this guide, consider exploring the following resources:

Also check out some of our other Cloud Composer-related Google Cloud blogs:

Read Entire Article