Using Airflow to Schedule Spark Jobs (2022)

Using Airflow to Schedule Spark Jobs (1)

Apache Airflow is used for defining and managing a Directed Acyclic Graph of tasks. Data guys programmatically orchestrate and schedule data pipelines and also set retry and alert when a task fails. A single task can be a wide range of operators like bash script, PostgreSQL function, Python function, SSH, Email, etc… and even a Sensor which waits (polls) for a certain time, file, database row, S3 key, etc.

As you may already be aware, failure in Apache Spark applications is inevitable due to various reasons. One of the most common failure is OOM (out of memory at the driver or executor level). We can manage(schedule, retry, alert, etc.) Spark applications along with other types of tasks in Airflow without any headache and need of any code outside the Airflow ecosystem.

In this post, I will focus on building a dag of three Spark app tasks(i.e. SparkSubmitOperator) in Airflow and won’t go into details of each Spark app.

1- Business Logic

Task 1: Data Ingestion |We have an Spark Structured Streaming app which is consuming users’ flight search data from Kafka and appending in a Delta table; This is not a real-time streaming app since we do not need to process the search data as soon as it is generated, so first we set the writeStream option Trigger to once(Trigger.once) in Spark app to run like a batch job while getting benefit of Spark Structured Streaming app like Kafka offset management, etc. According to business requirements, this task should be run once an hour. This Spark app appends data to a Delta table with the following schema and partition by year_month:

flight_search
root
|-- searched_at: timestamp (nullable = true)
|-- responsed_at: timestamp (nullable = false)
|-- channel: string (nullable = true)
|-- origin: string (nullable = true)
|-- destination: string (nullable = true)
|-- departure_date: date (nullable = true)
|-- year_month: string (nullable = true)

Task 2: search-waiting-time | This is a simple Spark batch job which reads flight_search Delta table (the task 1 is appending into it)and subsequently calculate the waiting time between searched_at and responsed_at.

Task 3: nb-search | This is a simple Spark batch job which reads flight_search Delta table (the task 1 is appending into it) and then calculate the number of search by channel and route.

As you guessed, task 1 should be executed successfully first and then task 2 and 3 are run in parallel since these two tasks both depend on the first task but do not depend on each other.

You can take a close look at the Spark codes in my github repo .

2- Airflow

2–1. Concepts — This section is for those who have not yet tried Airflow. First, you need to know about Airflow basic concepts which include:

(Video) Run Spark Scala Job using Airflow | Apache Airflow Practical Tutorial | Part 5 | DM | DataMaking

  • DAG: A DAG (Directed Acyclic Graph) is a collection of tasks with relationships and dependencies.
  • Task: The unit of work within a DAG i.e. the nodes in a DAG.
  • Operator: An operator defines and describes a single task in a DAG. BashOperator, SSHOperator and PostgresOperator are just some examples of an Operator, each of which has its own attributes.
Using Airflow to Schedule Spark Jobs (2)

Figure 1 shows graph view of a DAG named flight_search_dag which consists of three tasks, all of which are type of SparkSubmitOperator operator. tasks flight_search_waiting_time and flight_nb_search run in parallel if the first task flight_search_ingestion is done successfully.

2-2. Installation — The installation is straightforward as said in the reference website:

Set environment variable AIRFLOW_HOME to be used by the Airflow to build its own config files, log dir, etc.

export AIRFLOW_HOME=~/airflow

You can simply do the following to install and run the Airflow:

#install the Airflow using pip
pip install apache-airflow
#you need to initialize the Airflow db:
airflow initdb
#you can interact with Airflow after starting the webserver
airflow webserver -p 8080
#Airflow manages the DAG using a class of Executor. The scheduler starts by:
airflow scheduler

You can access Airflow web UI on http://localhost:8080.

Now you can see some files in the $AIRFLOW_HOME dir. There are some important configuration options in the $AIRFLOW_HOME/airflow.cfg that you’d better to know and also change the default value so open the airflow.cfg by your preferred editor like vim. (1)dags_folder accepts a dir to be watched periodically by the Airflow to build the DAGs. As said earlier, you can build the DAGs programmatically; In fact, this path hosts all of your python files including airflow-related code. (2)executor is a very important variable in airflow.cfg that determines the level of parallelization of running tasks or dags. This option accepts the following values:

  • SequentialExecutor is a an executor class which works locally and runs only one task at a time.
  • LocalExecutor is a an executor class which executes tasks locally in parallel using the multiprocessing Python library and also queuing technique.
  • CeleryExecutor, DaskExecutor and KubernetesExecutor are classes to run tasks in a distributed fashion to improve availability.

LocalExecutor is chosen since running parallel task instances is inevitable and also we do not need Airflow to highly available at this stage.

(3) sql_alchemy_conn this is another important variable in airflow.cfg that determine the type of database that is used by the Airflow to interact with its metadata. PostgreSQL is chosen so the varialbe would be as follows:

sql_alchemy_conn = postgresql+psycopg2://aiflow_user:pass@192.168.10.10:5432/airflow_db

(4) parallelism determines the maximum number of task instances can be running in parallel across DAGs, (5) dag_concurrency determines how many task instances are allowed to run by the scheduler per dag And (6) max_threads is used to set the number of threads to be used by the scheduler to schedule dags.

You need to restart webserver and scheduler after changing the confs.

(Video) airflow example with spark submit operator

3- Building the DAG

This section describes all the steps to build the DAG shown in figure 1.

As you know, spark-submit script is used for submitting an Spark app to an Spark cluster manager. For example, running PySpark app search_event_ingestor.py is as follows:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,io.delta:delta-core_2.12:0.7.0 --master local[*] --driver-memory 12g --executor-memory 12g spark/search_event_ingestor.py

So for building an SparkSubmitOperator in Airflow you need to do the followings:

3–1. SPARK_HOME environment variable — We need to set spark binary dir in OS environment variable as follows (in Ubuntu):
export SPARK_HOME=/path_to_the_spark_home_dir
export PATH=$PATH:$SPARK_HOME/bin

3–2. Spark Connection — Create Spark connection in Airflow web ui (localhost:8080) > admin menu > connections > add+ > Choose Spark as the connection type, give a connection id and put the Spark master url (i.e local[*], or the cluster manager master’s URL) and also port of your Spark master or cluster manager if you have an Spark cluster.

Using Airflow to Schedule Spark Jobs (3)

3–3. Set Spark app home variable — This is very useful to define a global variable in Airflow to be used in any DAG. I define the PySpark app home dir as an Airflow variable which will be used later. In admin menu, hit the variable and define the variable as shown in the figure below:

Using Airflow to Schedule Spark Jobs (4)

3–4. Building DAG — Now, it’s time to build an Airflow DAG. As I said earlier, an Airflow DAG is a typical Python script which needs to be in the dags_folder(This is a configuration option in airflow.cfg).

Let’s get into it our DAG file named flight_search_dag.py:

3–4–1 Imports — Like any other Python app, we need to import some modules and classes as follows:

(Video) Run PySpark Job using Airflow | Apache Airflow Practical Tutorial |Part 4|Data Making|DM| DataMaking

from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable

3–4–2 default_args — This will be a dictionary for setting up default configuration of our DAG. (This is almost the same for all DAGs):

local_tz = pendulum.timezone("Asia/Tehran")
default_args = {
'owner': 'mahdyne',
'depends_on_past': False,
'start_date': datetime(2020, 10, 10, tzinfo=local_tz),
'email': ['nematpour.ma@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}

3–4–3 dag — This is going to be an object instantiated using DAG class:

dag = DAG(dag_id='flight_search_dag',
default_args=default_args,
catchup=False,
schedule_interval="0 * * * *")

catchup=False means we do not need Airflow to fill the undone past execution since the start_date. schedule_interval="0 * * * *" You guessed right! We can pass contab-style scheduling pattern to this attribute.

3–4–4 pyspark_app_home — This variable is set for keeping the PySpark app dir as defined in Airflow Variable in the UI earlier:

pyspark_app_home=Variable.get("PYSPARK_APP_HOME")

3–4–5 SparkSubmitOperator — We can have a Spark app in a DAG(i.e. a task which executes Spark app in theDAG) using This kind of operator very simple and straightforward.

flight_search_ingestion= SparkSubmitOperator(task_id='flight_search_ingestion',
conn_id='spark_local',
application=f'{pyspark_app_home}/spark/search_event_ingestor.py',
total_executor_cores=4,
packages="io.delta:delta-core_2.12:0.7.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0",
executor_cores=2,
executor_memory='5g',
driver_memory='5g',
name='flight_search_ingestion',
execution_timeout=timedelta(minutes=10),
dag=dag
)

The SparkSubmitOperator class includes useful attributes that eliminate the need for a separate bash script and calling it using a BashOperator.

conn_id attribute takes the name of Spark connection which has been built in section 3.2. We can pass the path of the PySpark app Python file to the application attribute, and also pass the dependencies using packages in comma separated style. Other attributes are Spark developer-friendly :).

This is easy as well in the Spark Scala/Java case. You would pass the fat jar file to the application attribute and also the pass the main class to the attribute jar_class.

There are two other SparkSubmitOperator tasks like flight_search_ingestion named flight_search_waiting_time, flight_nb_search.

3–4–6 Dependecies — After instantiating other tasks, now is the time to define the dependencies.

flight_search_ingestion>>[flight_search_waiting_time,flight_nb_search]

Airflow is overloading the binary right shift >> oparator to define the dependencies, meaning that flight_search_ingestion should be executed successfully first and then two tasks flight_search_waiting_time, flight_nb_search are run in parallel since these two tasks both depend on the first task flight_search_ingestion but do not depend on each other and also we have enough resources in the cluster to run two Spark jobs at the same time.

Using Airflow to Schedule Spark Jobs (5)
(Video) Data Engineer's Lunch #25: Airflow and Spark

Finished! Now we have a DAG including three Spark jobs that is running once an hour and receive email if something goes wrong(i.e. any failure or running longer than expected). Don’t forget to turn on the DAG by the cool button above :) .

The project can be found on my Github repository https://github.com/mahdyne/pyspark-tut

FAQs

Does Airflow use Spark? ›

Airflow is a popular open source tool that is used to orchestrate and schedule various workflows as directed acyclic graphs (DAGs). You can use spark-submit and Spark SQL CLI to enable Airflow to schedule Spark jobs. The serverless Spark engine of Data Lake Analytics (DLA) provides a CLI package.

How many tasks can Airflow handle? ›

You can also tune your worker_concurrency (environment variable: AIRFLOW__CELERY__WORKER_CONCURRENCY ), which determines how many tasks each Celery worker can run at any given time. By default, the Celery executor runs a maximum of sixteen tasks concurrently.

How are Spark jobs scheduled? ›

By default, Spark's scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc.

Is Airflow better than Jenkins? ›

Airflow vs Jenkins: Production and Testing

Since Airflow is not a DevOps tool, it does not support non-production tasks. This means that any job you load on Airflow will be processed in real-time. However, Jenkins is more suitable for testing builds. It supports test frameworks like Robot, PyTest, and Selenium.

Is Airflow a scheduling tool? ›

Apache Airflow is a workflow management system created by Airbnb. In layman's terms, it can be thought of as a job scheduler on steroids. If you have Python scripts that should be run on schedule or run in a sequence, Apache Airflow is a convenient and reliable tool that handles just that — and more.

Is Airflow good for ETL? ›

Airflow ETL is one such popular framework that helps in workflow management. It has excellent scheduling capabilities and graph-based execution flow makes it a great alternative for running ETL jobs.

What is the difference between Spark and Airflow? ›

Airflow can be classified as a tool in the "Workflow Manager" category, while Apache Spark is grouped under "Big Data Tools". Some of the features offered by Airflow are: Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation.

Is Airflow a big data tool? ›

Summary. Airflow fills a gap in the big data ecosystem by providing a simpler way to define, schedule, visualize and monitor the underlying jobs needed to operate a big data pipeline.

Is Airflow ETL or ELT? ›

Airflow is purpose-built to orchestrate the data pipelines that provide ELT at scale for a modern data platform.

How many companies are using Airflow? ›

We have data on 3,175 companies that use Apache Airflow.

How long does it take to get hired by Spark? ›

Our signup process takes just a few minutes. After signing up, it may take 1-2 weeks* to get your account fully activated. This depends on how long it takes to complete a standard background check and other factors like the number of available deliveries in your area. What do I need to be a delivery provider?

How do you automate a Spark job? ›

To use Oozie Spark action with Spark 2 jobs, create a spark2 ShareLib directory, copy associated files into it, and then point Oozie to spark2 . (The Oozie ShareLib is a set of libraries that allow jobs to run on any node in a cluster.) To verify the configuration, run the Oozie shareliblist command.

How do you orchestrate Spark jobs? ›

You can submit Spark applications using schedulers like Airflow, Azure Data Factory, Kubeflow, Argo, Prefect, or just a simple CRON job. You can also directly call the Ocean Spark REST API to submit Spark applications from anywhere, thereby enabling custom integrations with your infrastructure, CI/CD tools, and more.

What is the best shape for Airflow? ›

If you want to speed quickly through the air, you're better off in a long, thin vehicle—something like a plane or a train—that creates as little disturbance as possible: planes and trains are tube-shaped for exactly the same reason that we swim horizontally with our bodies laid out long and thin.

For which use Apache Airflow is best suited? ›

Apache Airflow is used for the scheduling and orchestration of data pipelines or workflows. Orchestration of data pipelines refers to the sequencing, coordination, scheduling, and managing complex data pipelines from diverse sources.

What is AWS equivalent of Airflow? ›

Amazon MWAA is a managed service for Apache Airflow that lets you use your current, familiar Apache Airflow platform to orchestrate your workflows. You gain improved scalability, availability, and security without the operational burden of managing underlying infrastructure.

Why is Airflow scheduler so slow? ›

Upgrade the machine type of the Cloud SQL instance that runs the Airflow database in your environment, for example using the gcloud composer environments update commands. Low performance of the Airflow database might be the reason why the scheduler is slow.

How does Airflow scheduling work? ›

The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.

Does Airflow use cron? ›

Airflow can utilize cron presets for common, basic schedules. For example, schedule='@hourly' will schedule the DAG to run at the beginning of every hour. For the full list of presets, see Cron Presets.

Is airflow better than cron? ›

Using cron to manage networks of jobs will not scale effectively. Airflow offers ability to schedule, monitor, and most importantly, scale, increasingly complex workflows.

Does Airbnb still use airflow? ›

Our growing workforce of data engineers, data scientists and analysts are using Airflow, a platform we built to allow us to move fast, keep our momentum as we author, monitor and retrofit data pipelines.

What are the three disadvantages of using air? ›

This ease of movement of goods by air has a number of possibilities but also some drawbacks.
  • It is more expensive than other types of transport because fuel is expensive. ...
  • It has capacity limits. ...
  • It is more polluting than other more sustainable means of transport such as electric vehicles.

Do data engineers use Airflow? ›

Data Engineers can easily integrate and use Talend together with Airflow for better data management. Using Airflow for orchestration allows for easily running multiple jobs with dependencies, parallelizing jobs, monitoring run status and failures, and more.

Can we use PySpark in Airflow? ›

Running PySpark in an Airflow task

To embed the PySpark scripts into Airflow tasks, we used Airflow's BashOperator to run Spark's spark-submit command to launch the PySpark scripts on Spark. After migrating the Zone Scan processing workflows to use Airflow and Spark, we ran some tests and verified the results.

Is Luigi better than Airflow? ›

Because Airflow has the Scheduler feature, users can separate tasks from crons, which makes everything easy to scale. Luigi, however, doesn't offer the same scalability benefits. This is because users have to split tasks into various sub-pipelines, which is a long and laborious process.

What is the advantage of Airflow? ›

The advantage of using Airflow over other workflow management tools is that Airflow allows you to schedule and monitor workflows, not just author them. This outstanding feature enables enterprises to take their pipelines to the next level.

Why is Airflow better than prefect? ›

Summary: Airflow vs Prefect

But what Prefect lacks, for now, is the massive community backing of Airflow, and this probably won't change for a few years. Airflow makes a lot of the technical details of its workflows and pipelines available to users, however, which is excellent for technical users.

Which executor is best for Airflow? ›

Airflow comes configured with the SequentialExecutor by default, which is a local executor, and the safest option for execution, but we strongly recommend you change this to LocalExecutor for small, single-machine installations, or one of the remote executors for a multi-machine/cloud installation.

What is the difference between Kafka and Airflow? ›

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design. Airflow belongs to "Workflow Manager" category of the tech stack, while Kafka can be primarily classified under "Message Queue".

Why is Apache Airflow better? ›

Benefits of Apache Airflow include: Ease of use—you only need a little python knowledge to get started. Open-source community—Airflow is free and has a large community of active users. Integrations—ready-to-use operators allow you to integrate Airflow with cloud platforms (Google, AWS, Azure, etc).

Is Airflow scalable? ›

Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

Which ETL tool is best? ›

15 Best ETL Tools In 2022
  1. Informatica PowerCenter - Cloud data management solution. ...
  2. Microsoft SQL Server Integration Services - Enterprise ETL platform. ...
  3. Talend Data Fabric - Enterprise data integration with open-source ETL tool. ...
  4. Integrate.io (XPlenty) - ETL tool for e-commerce. ...
  5. Stitch - Modern, managed ETL service.
17 Aug 2022

How is Airflow different from Jenkins? ›

Airflow is more for considering the production scheduled tasks and hence Airflows are widely used for monitoring and scheduling data pipelines whereas Jenkins are used for continuous integrations and deliveries.

Is Airflow a data pipeline? ›

Apache Airflow is a batch-oriented tool for building data pipelines. It is used to programmatically author, schedule, and monitor data pipelines commonly referred to as workflow orchestration. Airflow is an open-source platform used to manage the different tasks involved in processing data in a data pipeline.

Who bought Airflow? ›

In an announcement this week, Electra.aero shared news of its acquisition of the company Airflow. Both companies have developed electric short take-off and landing (eSTOL) aircraft, and Airflow's eSTOL business will now be consolidated into the Electra brand.

What are the two types of Airflow? ›

Types of airflow

Laminar flow occurs when air can flow smoothly, and exhibits a parabolic velocity profile; turbulent flow occurs when there is an irregularity (such as a disruption in the surface across which the fluid is flowing), which alters the direction of movement.

What is the difference between Airflow and DBT? ›

Airflow helps orchestrate jobs that extract data, load it into a warehouse, and handle machine-learning processes. dbt hones in on a subset of those jobs – enabling team members who use SQL to transform data that has already landed in the warehouse.

How much can you make in a day with spark? ›

Spark drivers are paid for each delivered order. There is no set hourly wage, but you can expect Spark pay to be around $10–$20 per hour, on average. You can earn $500/week on part-time hours.

Can you get paid daily with spark? ›

Walmart Spark delivery drivers are paid through Branch Wallet every Tuesday. This app allows you to receive money electronically, which you can then save in the app or transfer to your bank account. You can keep track of your weekly earnings through the Spark app.

Does spark offer Fast pay? ›

To cash out, simply go to your spark driver app, select 'Quick pay', and enter the amount you would like to withdraw. Your money will be instantly deposited into your bank account. Using the instant payments option is a great way to get your money right away however there is a $2.99 fee per transaction.

What is the best way to automate the hiring process? ›

15 ways to automate your recruitment process
  1. Pre-pre-screening. ...
  2. Pre-screening questions with automated ranking. ...
  3. Automated shortlisting and rejection. ...
  4. One or two-way video interviewing. ...
  5. Video interview profiling. ...
  6. Automated translations for international recruitment. ...
  7. Candidate-led interview scheduling. ...
  8. Candidate communication.

What are the three ways to automate a process? ›

Within this amalgam of concepts, there are three key technologies that we must take into account: macros, IT process automation (ITPA) and Robotic Process Automation (RPA).

How we can improve the performance of Spark job? ›

Spark's efficiency is based on its ability to process several tasks in parallel at scale. Therefore, the more we facilitate the division into tasks, the faster they will be completed. This is why optimizing a Spark job often means reading and processing as much data as possible in parallel.

How do you schedule a Spark job using Airflow? ›

Scheduling Spark Airflow Jobs: Business Logic. Scheduling Spark Airflow Jobs: Diving into Airflow. Scheduling Spark Airflow Jobs: Building the DAG.
...
Step 1: Create Spark connection in Airflow web UI i.e., http://localhost:8080.
  1. Step 2: Navigate to the admin menu.
  2. Step 3: Select the “connections” option.
21 Feb 2022

How does Airflow integrate Spark? ›

Preparing the environment on Airflow machine

On the Spark page you can download the tgz file and unzip it on the machine that hosts Airflow. Put in the file . bashrc the SPARK_HOME and add it to the system PATH. Finally you must add the pyspark package to the environment where Airflow runs.

How do I submit Spark job in Airflow? ›

Recipe Objective: How to use the SparkSubmitOperator in Airflow DAG?
  1. Install Ubuntu in the virtual machine click here.
  2. Install Apache airflow click here.
  3. Install packages if you are using the latest version airflow pip3 install apache-airflow-providers-apache-spark pip3 install apache-airflow-providers-cncf-kubernetes.
23 Aug 2022

Does Spark engine do scheduling? ›

At its core, Spark is a “computational engine” that is responsible for scheduling, distributing, and monitoring applications consisting of many computational tasks across many worker machines, or a computing cluster.

Is Spark a valuable skill? ›

Spark is one of the most well-known and implemented Big Data processing frameworks makes it crucial in the job market. In US, Machine Learning is the second most growing job and requires Apache Spark as a key skill.

When should you not use Spark? ›

When Not to Use Spark
  • Ingesting data in a publish-subscribe model: In those cases, you have multiple sources and multiple destinations moving millions of data in a short time. ...
  • Low computing capacity: The default processing on Apache Spark is in the cluster memory.
24 Sept 2020

What causes scheduler delay in Spark? ›

Scheduler Delay

Spark relies on data locality and tries to execute tasks as close to the data as possible to minimize data transfer. Task location can either be a host or a pair of a host and an executor. If an available executor does not satisfy its data locality, it keeps waiting until a timeout is reached.

How do you optimize long running Spark jobs? ›

Throughout this article I will put out all the best practices we follow in DataKareSolutions to optimize spark application.
  1. Data Serialization. ...
  2. Broadcasting. ...
  3. Avoid UDF and UDAF. ...
  4. Data locality. ...
  5. Dynamic allocation. ...
  6. Garbage collection. ...
  7. Executor Tuning. ...
  8. Parallelism.

Videos

1. AWS EMR Tutorial - Submitting Apache Spark Jobs
(Cloud Guru)
2. Apache Airflow Series | Schedule and Trigger Airflow DAG
(dataNX)
3. Workflow and Scheduling using AirFlow
(itversity)
4. The New Way of Scheduling DAGs in Airflow with Datasets
(Data with Marc)
5. Fully Orchestrating Databricks with Airflow [Presentation]
(inovex GmbH)
6. Running EMR jobs with Airflow
(dacort - AWS Analytics)

Top Articles

Latest Posts

Article information

Author: Geoffrey Lueilwitz

Last Updated: 01/03/2023

Views: 5875

Rating: 5 / 5 (60 voted)

Reviews: 83% of readers found this page helpful

Author information

Name: Geoffrey Lueilwitz

Birthday: 1997-03-23

Address: 74183 Thomas Course, Port Micheal, OK 55446-1529

Phone: +13408645881558

Job: Global Representative

Hobby: Sailing, Vehicle restoration, Rowing, Ghost hunting, Scrapbooking, Rugby, Board sports

Introduction: My name is Geoffrey Lueilwitz, I am a zealous, encouraging, sparkling, enchanting, graceful, faithful, nice person who loves writing and wants to share my knowledge and understanding with you.