Creating Detailed Logs in Airflow (Airflow States Collector)

Maintaining logs for the Airflow system and taking necessary actions based on the observed logs is a crucial process in workflows. Typically, logging is attempted by developing code using Airflow libraries, which can be both administratively challenging and labor-intensive. For this task, you can use the Airflow States Collector in your projects. It meets general needs with its parameters and provides easy management with ready-made dashboards. Since these dashboards are prepared with Looker Studio, there is no additional cost. Visually, it makes monitoring this information much more elegant and comprehensible. You can quickly and easily implement this setup in your projects, simplifying the Airflow monitoring process.

Source: Airflow-states-collector_github_link

Clicking the link opens the GitHub project page. Here, you will find step-by-step instructions on how to implement it and example screenshots.

First, the necessary tables and views are created in BigQuery. Then, using the ‘airflow_states_collector.py’ file, a DAG is set up in Airflow to run every 5 minutes, collecting logs and transferring them to BigQuery. Finally, dashboards are created using this information, and as data comes in, they are updated to visualize detailed information about the Airflow logs.

Implementation

If not already installed, the necessary installations should be performed.

First, clone the GitHub repository using git clone:

$ git clone https://github.com/GoogleCloudPlatform/professional-services.git 

Navigate to the relevant directory with the cd command:

cd professional-services/tools/airflow-states-collector

Create a virtual environment:

$ virtualenv --python python3 env
$ source env/bin/activate

Install the required packages (BigQuery, Jinja2, etc.):

$ pip install -r requirements.txt

Connect to the Google environment and select the project:

$ gcloud init

Make the necessary changes in the code and run it. The history is set to 5 days; if you encounter a query size error when the DAG runs, you may need to reduce this further. Additionally, it is recommended that the BigQuery dataset location matches the composer location.

python airflow_states_collector.py \
   --bq-storage-project-id=project-id \
   --airflow-version=2 \
   --dags-gcs-folder=gs://europe-west3-airflow-state-bucket/dags \
   --ndays-history=5 \
   --airflow-dagid=airflow-states-collector \
   --skip-dagids='airflow_monitoring,airflow-states-collector'\
   --bq-dataset-location='europe-west3'\
   --bq-storage-dataset='airflow_states_collector'

The output is as follows:

When the execution is completed, the necessary dataset and materials will be created in BigQuery.

The DAG is displayed on the Airflow screen.

When it runs successfully and completes without any changes, or if it is skipped, it will be displayed.

Dashboards are viewed with Looker Studio. It allows filtering and viewing with desired details within the specified range.

It allows performing necessary inspections on DAG breakdowns and uniquely, providing minimum, median, and maximum durations for the DAG.

It also provides a detailed examination on a task-by-task basis.

Author: Gökay Solak, Senior Data Engineer, Oredata

Oredata is a premier Google Cloud Partner specialized in

  • Cloud Migration Services
  • Data & Analytics Services
  • Infrastructure Services
  • Google Workspace

If you are interested joining us, feel free to apply our job openings: https://www.linkedin.com/company/oredata/jobs/

Contact us