Skip to content

Airflow cơ bản

Photo from airflow-tutorial

Giới thiệu

Ở bài trước, chúng ta đi vào tìm hiểu tại sao phải xây dựng pipeline, và ở bài học này sẽ giúp bạn hình dung rõ hơn về cách xây dựng pipeline thông qua Airflow.

Các khái niệm cơ bản

Airflow là một nền tảng cung cấp SDK và UI để hỗ trợ xây dựng, đặt lịch thực thi và theo dõi các pipeline. Một số khái niệm cơ bản trong Airflow:

  • task: một thành phần (hoặc một bước) trong pipeline
  • DAG (Directed Acyclic Graph): định nghĩa thứ tự thực thi, lịch chạy và số lượng lần retry.v.v. cho các task

Task

Để tạo ra task, chúng ta sẽ dùng các operators cung cấp bởi Airflow SDK. Một số loại operator phổ biến bao gồm:

  • BashOperator: thực thi các bash command
  • PythonOperator: thực thi các Python script
  • EmailOperator: gửi email
  • DockerOperator: thực hiện các command bên trong docker container
  • MySQLOperator: thực thi các MySQL query, ngoài ra còn rất nhiều operator khác được phát triển bởi cộng đồng, bạn xem thêm tại đây

Ở series này chúng ta sẽ chủ yếu sử dụng 2 loại operators là DockerOperatorBashOperator.

Tip

Việc sử dụng DockerOperator thay cho PythonOperator đảm bảo môi trường chạy code được đóng gói và có thể chạy code này ở trên bất kỳ máy nào mà không bị các vấn đề về cài đặt hoặc xung đột thư viện.

DAG

Sau khi đã code xong các task, chúng ta sẽ đưa các task này vào trong DAG như ví dụ sau:

1
2
3
4
5
6
7
8
with DAG(
    "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),  # (1)
    schedule="@daily", catchup=False
) as dag:
    ingest_task = PythonOperator(...) # (2)
    clean_task = PythonOperator(...)
    validate_task = PythonOperator(...)
    ingest_task >> clean_task >> [explore_task, validate_task] # (3)
  1. Định nghĩa thời gian bắt đầu chạy pipeline từ 1/1/2021, lịch chạy là daily và không catchup, tức là không chạy pipeline trước start_date
  2. Định nghĩa task bằng PythonOperator. Bạn tự truyền các config vào (...)
  3. Thứ tự chạy: ingest_task tới clean_task, cuối cùng là 2 task song song: explore_task và _validate_task

Tổng kết

Ở bài học hôm nay, chúng ta đã tìm hiểu về một số khái niệm cơ bản trong Airflow và làm quen với Airflow Python SDK để xây dựng taskDAG. Bài học tiếp theo, chúng ta sẽ cụ thể hóa việc xây dựng pipeline bằng cách ứng dụng vào một bài toán cụ thể.

Tài liệu tham khảo