Automating Tasks with Apache Airflow
Published on 07 December 2024 🕒 15:23:00
By Luis Esteban - Just a random guy
Introduction
Apache Airflow is a powerful open-source platform used to programmatically author, schedule, and monitor workflows. With its robust architecture and flexibility, you can automate tasks ranging from simple data processing to complex machine learning pipelines.
In this tutorial, we’ll cover:
- Setting up Apache Airflow
- Automating tasks using Directed Acyclic Graphs (DAGs)
- Examples of automation, including:
- Running a daily ETL pipeline
- Sending email notifications
- Automating website scraping
Prerequisites
Before diving in, ensure you have:
- Python (3.7 or later) installed.
- Basic knowledge of Python programming.
- A working Docker installation (optional but recommended for Airflow setup).
Setting Up Apache Airflow
1. Install Apache Airflow Locally
To install Apache Airflow, use pip:
pip install apache-airflow
Set up the Airflow database:
airflow db init
Create a user account:
airflow users create \
--username admin \
--password admin \
--firstname First \
--lastname Last \
--role Admin \
--email admin@example.com
Start the Airflow webserver and scheduler:
airflow webserver -p 8080 &
airflow scheduler &
Visit 127.0.0.1:8080 to access the Airflow web UI.
Automating Tasks with DAGs
Apache Airflow uses DAGs (Directed Acyclic Graphs) to define workflows. DAGs consist of tasks and dependencies between them.
Creating a DAG
Below is an example of a simple DAG that prints a message.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
# Define the DAG
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'retries': 1, }
def print_hello():
print("Hello, Apache Airflow!")
with DAG( 'hello_world_dag', default_args=default_args, description='A simple hello world DAG', schedule_interval='@daily', ) as dag:
hello_task = PythonOperator( task_id='print_hello', python_callable=print_hello, )
Save this file as hello_world_dag.py in the dags/ folder of your Airflow home directory.
Examples of Automated Tasks
- Daily ETL Pipeline The following example demonstrates automating an ETL pipeline.
from airflow import DAG
from airflow.operators.python_operator
import PythonOperator from datetime import datetime
def extract():
print("Extracting data...")
def transform():
print("Transforming data...")
def load():
print("Loading data...")
with DAG(
'etl_pipeline',
default_args={ 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'retries': 1, }, description='A simple ETL pipeline',
schedule_interval='@daily') as dag:
extract_task = PythonOperator( task_id='extract', python_callable=extract, )
transform_task = PythonOperator( task_id='transform', python_callable=transform, )
load_task = PythonOperator( task_id='load', python_callable=load, )
# Define dependencies
extract_task >> transform_task >> load_task
- Sending Email Notifications
Use the
EmailOperator` to automate email sending.
from airflow.operators.email_operator
import EmailOperator
email_task = EmailOperator( task_id='send_email', to='recipient@example.com', subject='Daily Report', html_content='The daily pipeline completed successfully.', dag=dag)
Add the email_task to a DAG and schedule it after a task completes.
- Automating Web Scraping Below is a task for scraping a website using Python’s requests library.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
import requests
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'email': ['your_email@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
with DAG(
dag_id='automation_example_dag',
default_args=default_args,
description='An example DAG with ETL, email notifications, and web scraping',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Task 1: Extract
def extract():
print("Extracting data...")
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract,
)
# Task 2: Transform
def transform():
print("Transforming data...")
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform,
)
# Task 3: Load
def load():
print("Loading data...")
load_task = PythonOperator(
task_id='load_data',
python_callable=load,
)
# Task 4: Send Email Notification
email_task = EmailOperator(
task_id='send_email_notification',
to='recipient@example.com',
subject='Daily Pipeline Notification',
html_content='The daily ETL pipeline completed successfully.',
)
# Task 5: Scrape Website
def scrape_website():
url = 'https://example.com'
response = requests.get(url)
with open('/tmp/scraped_data.html', 'w') as file:
file.write(response.text)
print(f"Scraped data saved to /tmp/scraped_data.html")
scrape_task = PythonOperator(
task_id='scrape_website',
python_callable=scrape_website,
)
# Define Task Dependencies
extract_task >> transform_task >> load_task
load_task >> email_task
load_task >> scrape_task
Conclusion
Apache Airflow simplifies the automation of complex workflows. Whether you’re building ETL pipelines, scheduling scripts, or monitoring task dependencies, Airflow’s flexibility and features make it a must-have tool for modern automation tasks.
Start exploring the possibilities and integrate Airflow into your workflow to save time and reduce errors!