Data Pipelines With Apache Airflow

Munish Goyal
The Startup
Published in
17 min readJul 23, 2020

--

Apache Airflow — The most widely used tool for workflow orchestration

Photo by Mike Benna on Unsplash

· WHAT IS DATA PIPELINE
· WHAT IS APACHE AIRFLOW?
· HOW AIRFLOW WORKS?
· GETTING STARTED WITH AIRFLOW
· RUN AIRFLOW SERVER AND SCHEDULER
· AIRFLOW PIPELINE — DAG DEFINITION FILE
· AIRFLOW OPERATORS
· SETTING UP DEPENDENCIES
· TEMPLATING WITH JINJA
· ADDING DAG AND TASKS DOCUMENTATION
· AIRFLOW SCHEDULER
· BACKFILL AND CATCHUP
· EXTERNAL TRIGGERS
· RUNNING A DAG TASK
· AIRFLOW VARIABLES
· AIRFLOW CONNECTIONS
· AIRFLOW HOOKS
· CHECKING METADATA THROUGH COMMAND LINE
· TESTING A DAG

For cloud solution for Airflow, refer Google Cloud Composer, and a cloud alternative as Amazon Glue.

WHAT IS DATA PIPELINE

A Data Pipeline describes and encode a series of sequential data processing steps. Depending on the data requirements for each step, some steps may occur in parallel. Schedules are the most common mechanism for triggering execution of a data pipeline, external triggers and events can also be used to execute data pipelines. ETL or ELT are the common patterns found in data pipelines, but not strictly required. Some data pipelines perform only a subset of ETL or ELT.

Data Validation is the process of ensuring that data is present, correct & meaningful. Ensuring the quality of your data through automated validation checks is a critical step in building data pipelines at any organization. Validation can and should become part of your pipeline definitions.

Data pipelines are well-expressed as Directed acyclic graphs (DAGs).

WHAT IS APACHE AIRFLOW?

Apache Airflow is a workflow orchestration tool — platform to programmatically author, schedule, and monitor workflows.

Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Airbnb open-sourced Airflow in 2015 with the goal of creating a DAG-based, schedulable, data-pipeline tool that could run in mission-critical environments.

Airflow’s source code is available at github.com/apache/airflow, and can be installed by installing apache-airflow package using pip. One can setup Airflow using Airflow Quick Start Guide.

AWS cloud-specific serverless version of Airflow is available as Amazon Glue. If you are using AWS, then still it makes sense to use Airflow to handle the data pipeline for all things outside of AWS (e.g. pulling in records from an API and storing in S3).

Refer Airflow Documentation for details, but first you might like to go through Airflow Concepts such as DAGs, Operators, Operator Relationships, Tasks, Task Instance, Schedules, Hooks, Connections, etc.

For configuring Airflow for production-guide environment, check How-to Guides.

HOW AIRFLOW WORKS?

Airflow is partner for data-frameworks, but not a replacement:

Airflow itself is not a data processing framework, in Airflow you don’t pass data in memory between steps in your DAG. Instead, you’re going to use Airflow to coordinate the movement of data between other data storage and data processing tools.

So, we are not going to pass data between step sand task and we will not typically run heavy processing workloads on Airflow. The reason for this is that Airflow workers often have less memory and processing power individually and some data-frameworks offer an aggregate. Tools like Spark are able to expose the computing power of many machines all at once. Whereas in Airflow you will always be limited to the processing power of a single machine (the machine on which an individual worker is running). This is why Airflow developers prefer to use Airflow to trigger heavy processing steps in analytics warehouses like Redshift or data-framework works like Spark, instead of within Airflow itself. Airflow can be thought of as a partner to those data-frameworks but not as a replacement.

For example,

Airflow is designed to codify the definition and execution of data pipelines.

Airflow Components:

Here are the main components of Airflow:

  • Scheduler Orchestrates the execution of jobs on a trigger or schedule. The Scheduler chooses how to prioritize the running and execution of tasks within the system.
  • Worker Queue is used by scheduler in most Airflow installations to deliver tasks that need to be run to the Workers.
  • Worker processes execute the operations defined in each DAG. In most Airflow installations, workers pull from work queue when it is ready to process a task. When the worker completes the execution of the task, it will attempt to process more work from the work queue until there is no further work remaining. When work in the queue arrives, the worker will begin to process it. In multi-node airflow architecture, daemon processes are distributed across all worker nodes. The web server and scheduler are installed at the master node, and workers would be installed at each different worker nodes. To this mode of architecture, Airflow has to be configured with CeleryExecutor.
  • Database saves credentials, connections, history, and configuration. The database, often referred to as the metadata database, also stores the state of all tasks in the system. Airflow components interact with the database with the Python ORM, SQLAlchemy.
  • Web Interface provides a control dashboard for users and maintainers. The web interface is built using the Flask web-development micro-framework.

Data Partitioning:

Pipeline data partitioning is the process of isolating data to be analyzed by one or more attributes, such as time, logical type, or data size.

Data partitioning often leads to faster and more reliable pipelines.

Pipelines designed to work with partitioned data fail more gracefully. Smaller datasets, smaller time periods, and related concepts are easier to debug than big datasets, large time periods, and unrelated concepts. Partitioning makes debugging and re-running failed tasks much simpler. It also enables easier redos of work, reducing cost and time.

Another great thing about Airflow is that if your data is partitioned appropriately, your tasks will naturally have fewer dependencies on each other. Because of this, Airflow will be able to parallelize the execution of your DAGs to produce your results even faster.

Data Lineage:

The data lineage of a dataset describes the discrete steps involved in the creation, movement, and calculation of that dataset.

Being able to describe the data lineage of a given data will build confidence in the consumers of that data that our data pipeline is creating meaningful results using the correct data sets. Describing and servicing data lineage is one of the key ways we can ensure that everyone in the organization has access to and understands where data originates and how it is calculated.

The Airflow UI parses our DAGs and surfaces a visualization of the graph. The airflow keeps track of all runs of a particular DAG as task instances.

Airflow also shows us the rendered code for each task. One thing to keep in mind: Airflow keeps a record of historical DAGs and task execution but it does not store the data from those historical runs. Whatever the latest code is in your DAG definition, is what Airflow will render for you in the browser. So, be careful in making assumptions about what was run historically.

GETTING STARTED WITH AIRFLOW

Check stocks project for example on basic setup.

Airflow keeps its configuration files in AIRFLOW_HOME, by default which is set to ~/airflow.

Airflow requires a database to be initiated before you can run tasks. If you’re just experimenting and learning Airflow, you can stick with the default SQLite option (but SQLite works with SequentialExecutor and hence runs in sequences).

RUN AIRFLOW SERVER AND SCHEDULER

Setup Airflow Container:

# create docker container
# note: remove KEYCHIN_ENABLED line if you are running the container on a server where you SSH using ssh agent forwarding
# note: remove SSH_AUTH_SOCK line if you are running the container on machine which has its own SSH key
# container created with `deimagerun` will automatically be part of `devenv` network
AIRFLOW_HOME="$HOME/airflow"
SSH_HOME="$HOME/.ssh"
deimagerun \
-e KEYCHAIN_ENABLED=True \
-v $SSH_AUTH_SOCK:/ssh-agent -e SSH_AUTH_SOCK=/ssh-agent \
-v ${AIRFLOW_HOME}:${AIRFLOW_HOME} -p 8080:8080 -e AIRFLOW_HOME=${AIRFLOW_HOME} \
-v ~/stocks:/root/stocks -p 5000:5000
# get into container
deshell

Here, commands deimagerun and deshell are borrowed from Let’s make Docker CLI easier.

Initialize DB, and start airflow webserver and scheduler as:

# pip install --upgrade 'apache-airflow[ssh]'# initialize the db
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler

If required, you can burn down and rebuild the metadata database as:

airflow resetdb

In order to use MySQL instead of SqlLite:

  • Run MySQL server using Docker Image
docker pull mysqldocker run -d --name mysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 mysql --default-authentication-plugin=mysql_native_passworddenetcon mysql
  • Run dkshell mysql /bin/bash and then echo 'explicit_defaults_for_timestamp=1' >> /etc/mysql/my.cnf (as recommended by Airflow)
  • Change sql_alchemy_conn setting in airflow.cfg to sql_alchemy_conn = mysql+pymysql://root:root@mysql:3306/airflow. Here mysql_devenv is network-scoped alias for mysql container in ${DEVENV_CONTAINER} network, and change executor setting to executor = LocalExecutor (to parallelize task instances locally).
  • Run pip install pymysql in devenv container, and:
mysql -h mysql_devenv -uroot -prootmysql> CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;mysql> create user 'root'@'localhost' identified by 'root';mysql> grant all privileges on  root.* TO 'root'@'localhost';mysql> flush privileges; mysql> \q
  • This is how you can alter permissions later and check them:
mysql> ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'root';mysql> SELECT * from mysql.user where User="root"; mysql> \q
  • Try MySQL connection both ways as:
mysql -h localhost -uroot -prootmysql -h 127.0.0.1 -uroot -proot

AIRFLOW PIPELINE — DAG DEFINITION FILE

In Airflow, a DAG — Directed Acyclic Graph — is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

A Task defines a unit of work within a DAG; it is represented as a node in the DAG graph, and it is written in Python. Each task is an implementation of an Operator.

Airflow Python script, DAG definition file, is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined in it will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross-communication between tasks. Note that for this purpose we have a more advanced feature called XCom.

The script’s purpose is to define a DAG (along with modules to import, default args, DAG definition, task definitions, and dependency relationship between tasks). It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.

DAG files are saved in dags_folder referenced in your airflow.cfg. The default location is ~/airflow/dags. Each DAG should represent a single logical workflow.

When searching for DAGs, Airflow only considers Python files that contain the strings “airflow” and “DAG” by default. To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.

The DAG object is instantiated by passing the default argument dictionary, default_args with arguments common to all operators. Each individual task is instantiated with operator-specific argument or with an override of some default arguments (as passed to DAG object).

The precedence rules for a task are as follows:

  • Explicitly passed arguments
  • Values that exist in the default_args dictionary
  • The operator’s default value, if one exists

A task must include or inherit the argument task_id and owner, otherwise, Airflow will raise an exception.

Airflow will load any DAG object it can import from a DAGfile. Critically, that means that DAG must appear in globals().

DAGs can be used as context managers to automatically assign new operators to that DAG.

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
op = DummyOperator('op')
op.dag is dag # True

AIRFLOW OPERATORS

While DAG describes how to run a workflow, Operators determine what actually get done by a task.

Once an operator is instantiated, it is referred to as a task. The instantiation defines specific values when calling the abstract operator, and the parameterized task becomes a node in DAG.

A task instance represents a specific run of a task and is characterized as the combination of a DAG, a task, and a point in time.

An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operator. The DAG will make sure that operators run in the correct order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.

Operators are only loaded by Airflow if they are assigned to a DAG.

All operators derive from BaseOperator or BaseSensorOperator, and inherit many attributes and methods that way.

There are 3 main types of operators:

  • Operations that perform an action, or tell another system to perform an action
  • Transfer operators move data from one system to another
  • Sensors are a certain type of operator that will keep running a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived fromBaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

All operators are in the following packages:

In general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.

Airflow provides operators for many common tasks, including:

In addition to these basic building blocks, there are many more specific operators: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator, etc.

Operators do not have to be assigned to DAGs immediately. However, once an operator is assigned to a DAG, it cannot be transferred or unassigned. DAG assignment can be done explicitly when the operator is created, through deferred assignment, or even inferred from other operators. For example,

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

In python_operator, if provide_context is set to True, Airflow will pass a set of keyword arguments that can be used in your function (the runtime context). This set of kwargs corresponds exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header. For example, one can get execution_date as kwargs["execution_date"]. You can get a list of available context variables is available at Macros reference.

For example,

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def hello_date(*args, **kwargs):
print(f"Hello {kwargs['execution_date']}")
divvy_dag = DAG(...)
task = PythonOperator(
task_id="hello_date",
python_callable=hello_date,
provide_context=True,
dag=divvy_dag)

SETTING UP DEPENDENCIES

If B and C are downstream nodes of node A, then A depends upon B and C. That is [B, C] >> A.

Writing t1.set_downstream(t2) means that we are setting t1 as downstream node (that is, as dependency) of t2. That is, t2 depends on t1 and can be run only after successfully run of t1. This can also be represented using bitshift operators as t1 >> t2.

# task t2 depends on task t1
# task t1 is downstream node for task t2
# task t2 is upstream node for task t1
# set t1 as downstream node of t2
t1 >> t2
t1.set_downstream(t2)

Similarly t2.set_upstream(t1) can be represented as t2 << t1.

It is recommended to set operator relationships with bitshift operators rather than set_upstream() and set_downstream().

Chaining multiple dependencies becomes concise with the bitshift operator. For example, t1 >> t2 >> t3.

A list of tasks can also be set as dependencies. For example,

# these operations all have the same effectt1 >> [t2, t3][t2, t3] << t1t1.set_downstream([t2, t3])

Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.

When setting a relationship between two lists, if we want all operators in one list to be upstream to all operators in the other, we cannot use a single bitshift composition. Instead we have to split one of the list:

[op1, op2, op3] >> op4
[op1, op2, op3] >> op5
[op1, op2, op3] >> op6

The cross_downstream could handle list relationship easier.

cross_downstream([op1, op2, op3], [op4, op5, op6])

When setting single direction relationships to many operators, we could concat them with bitshift composition.

op1 >> op2 >> op3 >> op4 >> op5

This can be accomplished using chain:

chain(op1, op2, op3, op4, op5)

even without the operator’s name:

chain([DummyOperator(task_id='op' + i, dag=dag) for i in range(1, 6)])

The chain can handle a list of operators:

chain(op1, [op2, op3], op4)# equivalent to
op1 >> [op2, op3] >> op4

TEMPLATING WITH JINJA

Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline authors to define their own parameters, macros, and templates. We can use it with templated_command type of bash_command.

ADDING DAG AND TASKS DOCUMENTATION

We can add documentation for DAG or to every single task. DAG documentation only supports markdown (doc_md) so far, and task documentation support markdown, plain text, reStructuredText, JSON, and YAML.

AIRFLOW SCHEDULER

The airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (even a minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered.

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be triggered soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended. That is, the scheduler runs your job on schedule_interval AFTER the start date, at the END of the period.

Each DAG may or may not have a schedule, which informs how DAG Runs are created. The schedule_interval is defined as a DAG argument, and receives preferably a Cron expression as a str, or a datetime.timedelta object. Use schedule_interval=None when you don’t want to schedule your DAG.

BACKFILL AND CATCHUP

The airflow backfill respects your dependencies, emits logs into files, and talks to the database to record status.

Note that if you use depends_on_past=True, individual task instances will depend on the success of their previous task instances (that is, previous according to execution_date). Task instances with execution_date==start_date will disregard this dependency because there would be no past task instances created for them.

You may also want to consider wait_for_downstream=True when using depends_on_past=True. While depends_on_past=True cause a task instance to depend on the success of its previous task instance, wait_for_downstream=True will cause a task instance to also wait for all task instances immediately downstream of the previous task instances to succeed.

The date range in this context is a start_date and optionally an end_date, which are used to populate the run schedule with task instances from this dag.

An Airflow DAG with a start_date, possibly an end_date, and a schedule_interval (which is by default "@daily" from the start_date) defines a series of intervals which the scheduler turns into individual DAG Runs and executes. A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been created). This concept is called Catchup.

If you DAG is written to handle its own catchup (i.e., not limited to the interval, but instead of “Now” for instances), then you will want to turn catchup off (either on the DAG itself with dag.catcup=False) or by default at the configuration file level with catchup_by_default=False. What this will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG interval series.

EXTERNAL TRIGGERS

The DAG Runs can also be created manually through the CLI while running an airflow trigger_dag command, where you can define a specific run_id. The DAG Runs created externally to the scheduler get associated with the trigger’s timestamp and will be displayed in the UI alongside schedule DAG runs.

Note:

  • The first DAG Run is created based on the minimum start_date for the tasks in your DAG.
  • Subsequent DAG Runs are created by the scheduler process, based on your DAG’s schedule_interval, sequentially.
  • When clearing a task’s state in hope of getting them to re-run, it is important to keep in mind the DAG Run’s state too as it defines whether the scheduler should look into triggering tasks for that run.

RUNNING A DAG TASK

A DAG run is a physical instance of a DAG, containing task instances that run for a specific execution_date.

A task instance represents a specific run of a task and is characterized as the combination of a DAG, a task, and a point in time (execution_time). A task goes through various stages from start to completion. In the Airflow UI (graph and tree views), these stages are displayed by a color representing each stage.

The complete lifecycle of a task looks like this:

There is also visual difference between scheduled and manually triggered DAGs/tasks:

The DAGs/tasks with a black border are scheduled runs, whereas the non-bordered DAGs/tasks are manually triggered, i.e. by airflow DAGs trigger.

A DAG run is usually created by Airflow scheduler, but can also be created by an external trigger. Multiple DAG runs may be running at once for a particular DAG, each of them having a different execution_date.

The execution_date is a logical date and time which the DAG Run, and its task instances, are running for. This allows task instances to process data for the desired logical date & time. While a task instance or DAG run might have a physical start date of now, their logical date might be 3 months ago because we were busy reloading something.

A DAG run and all task instances created within it are instanced with the same execution_date, so that logically you can think of a DAG run as simulating the DAG running all of its tasks at some previous date & time specified by the execution_date.

The general form of running a DAG’s task is:

airflow run <dag_id> <task_id> <execution_date>

Example of running a DAG task:

# visit http://localhost:8080/admin/
# enable example_bash_operator
# visit http://localhost:8080/admin/taskinstance/
# then, run
airflow run example_bash_operator runme_0 2015-01-01

AIRFLOW VARIABLES

Variables are a generic way to store and retrieve configuration settings (such as AWS bucket name) as a simple key-value store within Airflow. Variables can be listed, created, updated, and deleted from the UI, code, or CLI.

AIRFLOW CONNECTIONS

The connection information to the external system is stored in the Airflow metadata database and managed in UI. A conn_id is defined there and hostname/login/password/schema information attached to it. Airflow pipelines can simply refer to centrally managed conn_id without having to hard code any of this information anywhere.

AIRFLOW HOOKS

Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators.

Check airflow.hooks and airflow.contrib.hooks packages.

See Managing Connections for how to create and manage connections.

CHECKING METADATA THROUGH COMMAND LINE

Here, are the commands to check the metadata:

# print the list of available DAGs
airflow list_dags
# print list of tasks defined by `example_bash_operator` DAG
airflow list_tasks example_bash_operator
# print the hierarchy of tasks in `example_bsh_operator` DAG
airflow list_tasks example_bash_operator --tree

TESTING A DAG

The airflow test command runs task instances locally, outputs their logs to STDOUT (on-screen), doesn’t bother with dependencies, and doesn’t communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.

The date specified in this context is called execution_date.

The general form of testing by running the actual task instances for a specific date:

airflow test <dag_id> <task_id> <execution_date> [-tp TASK_PARAMS]

Example of testing a DAG:

airflow test example_bash_operator runme_0 2015-01-01

This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.

Here are some related interesting stories that you might find helpful:

--

--

Munish Goyal
The Startup

Designing and building large-scale data-intensive cloud-based applications/APIs.