Skip to content

Xây dựng pipeline

Photo by Sneha Mehrin on Medium

Giới thiệu

Ở bài học trước, chúng ta đã làm quen với feature store, Feast và dùng command feast apply để tạo ra feature definition ở đường dẫn data_pipeline/feature_repo/registry/local_registry.db. Trong bài học này chúng ta sẽ sử dụng folder này để cấu hình client store giao tiếp với feature store như sau:

from feast import FeatureStore
store = FeatureStore(repo_path="../feature_repo")  # (1)
  1. Khởi tạo client store để giao tiếp với feature store

Client này sẽ được sử dụng ở nhiều bước khác nhau bao gồm 1, 2, 3, 4, 5 như hình dưới đây:

Môi trường phát triển

Ngoài Feast, bài học này sẽ sử dụng thêm Airflow, bạn vào repo mlops-crash-course-platform/ và start service này như sau:

bash run.sh airflow up

Các tương tác chính với Feast

Chúng ta có 6 tương tác chính với Feast như sau:

1. Materialize feature từ offline sang online store để đảm bảo online store lưu trữ feature mới nhất

data_pipeline/scripts/feast_helper.sh
cd feature_repo
feast materialize-incremental $(date +%Y-%m-%d)
Tip

Để hiểu rõ hơn về materialize, giả sử rằng ở offline store chúng ta có 3 record của ID 1001 như sau:

datetime driver_id conv_rate acc_rate avg_daily_trips created
2021-07-13 11:00:00+00:00 1001 0.852406 0.059147 340 2021-07-28 11:08:04.802
2021-08-10 12:00:00+00:00 1001 0.571599 0.244896 752 2021-07-28 11:08:04.802
2021-07-13 13:00:00+00:00 1001 0.929023 0.479821 716 2021-07-28 11:08:04.802

Khi chúng ta chạy command feast materialize 2021-08-07T00:00:00, thì dữ liệu có datetime mới nhất mà trước thời điểm 2021-08-07T00:00:00 sẽ được cập nhật vào online store. Đó chính là record thứ 3 ở bảng trên.

datetime driver_id conv_rate acc_rate avg_daily_trips created
2021-07-13 13:00:00+00:00 1001 0.929023 0.479821 716 2021-07-28 11:08:04.802

2. Data scientist, training pipeline hoặc offline batch serving pipeline kéo features về để train model

data_pipeline/examples/get_historical_features.py
entity_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001, 1002, 1003],
        "datetime": [
            datetime(2022, 5, 11, 11, 59, 59),
            datetime(2022, 6, 12, 1, 15, 10),
            datetime.now(),
        ],
    }
)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["driver_stats:acc_rate", "driver_stats:conv_rate"],
).to_df()
print(training_df.head())

3. Kéo features mới nhất tương ứng với các IDs trong request API để cho qua model dự đoán

data_pipeline/examples/get_online_features.py
features = store.get_online_features(
    features=[
        "driver_stats:acc_rate",
        "driver_stats:conv_rate"
    ],
    entity_rows=[
        {
            "driver_id": 1001,
        }
    ],
).to_dict(include_event_timestamps=True)

def print_online_features(features):
    for key, value in sorted(features.items()):
        print(key, " : ", value)

print_online_features(features)

4. Đẩy stream feature vào offline store

data_pipeline/src/stream_to_stores/processor.py
def preprocess_fn(rows: pd.DataFrame):
    print(f"df columns: {rows.columns}")
    print(f"df size: {rows.size}")
    print(f"df preview:\n{rows.head()}")
    return rows

ingestion_config = SparkProcessorConfig(mode="spark", source="kafka", spark_session=spark, processing_time="30 seconds", query_timeout=15)
sfv = store.get_stream_feature_view("driver_stats_stream")

processor = get_stream_processor_object(
    config=ingestion_config,
    fs=store,
    sfv=sfv,
    preprocess_fn=preprocess_fn,
)

processor.ingest_stream_feature_view(PushMode.OFFLINE)

5. Đẩy stream feature vào online store

processor.ingest_stream_feature_view()

7. ETL (Extract, Transform, Load) pipeline cập nhật dữ liệu của offline store

Tip

Ở tương tác 2., thông thường các Data Scientist sẽ kéo dữ liệu từ feature store để:

  • thực hiện POC
  • thử nghiệm với các feature khác nhằm mục đích cải thiện model

Ở công đoạn xây dựng data pipeline, chúng ta sẽ xây dựng pipeline cho các tương tác 1., 4., 5., 7.

Xây dựng các pipelines

ETL pipeline

Để tạo ra một Airflow pipeline, thông thường chúng ta sẽ làm theo trình tự sau:

  1. Định nghĩa DAG cho pipeline (line 1-8)
  2. Viết các task cho pipeline, ví dụ: ingest_task, clean_taskexplore_and_validate_task (line 9-25)
  3. Viết thứ tự chạy các task (line 27)
  4. Chạy lệnh sau để build Docker image cho các pipeline component, và copy file code DAG sang folder airflow/run_env/dags/data_pipeline của repo clone từ MLOps Crash course platform

    cd data_pipeline
    make build_image
    # Đảm bảo Airflow server đã chạy
    make deploy_dags # (1)
    
    1. Copy data_pipeline/dags/* vào folder dags của Airflow

    DAG dưới đây thể hiện ETL pipeline.

    data_pipeline/dags/db_to_offline_store.py
    with DAG(
        dag_id="db_to_offline_store", # (1)
        default_args=DefaultConfig.DEFAULT_DAG_ARGS, # (2)
        schedule_interval="@once",  # (3)
        start_date=pendulum.datetime(2022, 1, 1, tz="UTC"), # (4)
        catchup=False,  # (5)
        tags=["data_pipeline"],
    ) as dag:
        ingest_task = DockerOperator(
            task_id="ingest_task",
            **DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS,
            command="/bin/bash -c 'cd src/db_to_offline_store && python ingest.py'",  # (6)
        )
    
        clean_task = DockerOperator(
            task_id="clean_task",
            **DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS,
            command="/bin/bash -c 'cd src/db_to_offline_store && python clean.py'",
        )
    
        explore_and_validate_task = DockerOperator(
            task_id="explore_and_validate_task",
            **DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS,
            command="/bin/bash -c 'cd src/db_to_offline_store && python explore_and_validate.py'",
        )
    
        ingest_task >> clean_task >> explore_and_validate_task  # (7)
    
    1. Định nghĩa tên pipeline hiển thị ở trên Airflow dashboard
    2. Định nghĩa pipeline owner, số lần retry pipeline, và khoảng thời gian giữa các lần retry
    3. Lịch chạy pipeline, ở đây @once là một lần chạy, bạn có thể thay bằng cron expression ví dụ như 0 0 1 * *
    4. Ngày bắt đầu chạy pipeline theo múi giờ UTC
    5. Nếu start_date là ngày 01/01/2022, ngày deploy/turn on pipeline là ngày 02/02/2022, và schedule_interval là @daily thì sẽ không chạy các ngày trước 02/02/2022 nữa
    6. Command chạy trong docker container cho bước này
    7. Định nghĩa thứ tự chạy các bước của pipeline: đầu tiên là ingest sau đó tới clean và cuối cùng là explore_and_validate
    Info

    Do chúng ta dùng DockerOperator để tạo task nên cần phải build image chứa code và môi trường trước, sau đó sẽ truyền tên image vào DEFAULT_DOCKER_OPERATOR_ARGS trong từng pipeline component (ví dụ như line 11). Dockerfile để build image bạn có thể tham khảo tại data_pipeline/deployment/Dockerfile

    Biến DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS chứa các config như sau:

    data_pipeline/dags/utils.py
    DEFAULT_DOCKER_OPERATOR_ARGS = {
        "image": f"{AppConst.DOCKER_USER}/mlops_crash_course/data_pipeline:latest", # (1)
        "api_version": "auto",  # (2)
        "auto_remove": True, # (3)
        "mounts": [
            Mount(
                source=AppPath.FEATURE_REPO.absolute().as_posix(), # (4)
                target="/data_pipeline/feature_repo", # (5)
                type="bind", # (6)
            ),
        ],
    }
    
    1. Docker image dùng cho task
    2. Tự động xác định Docker engine API version
    3. Tự động dọn dẹp container sau khi exit
    4. Folder ở máy local, bắt buộc là đường dẫn tuyệt đối
    5. Folder nằm trong docker container
    6. Kiểu bind, đọc thêm ở đây
  5. Đăng nhập vào Airflow tại http://localhost:8088, account airflow, password airflow, các bạn sẽ thấy một DAG với tên db_to_offline_store, 2 DAG bên dưới chính là những pipeline còn lại trong data pipelines (đề cập ở bên dưới).

  6. Đặt Airflow Variable MLOPS_CRASH_COURSE_CODE_DIR bằng đường dẫn tuyệt đối tới folder mlops-crash-course-code/. Tham khảo hướng dẫn này về cách đặt Airflow Variable.

    Info

    Airflow Variable MLOPS_CRASH_COURSE_CODE_DIR được dùng trong file data_pipeline/dags/utils.py. Variable này chứa đường dẫn tuyệt đối tới folder mlops-crash-course-code/, vì DockerOperator yêu cầu Mount Source phải là đường dẫn tuyệt đối.

  7. Kích hoạt data pipeline và đợi kết quả

  8. Xem thứ tự các task của pipeline này như sau:

Tương tự như ETL pipeline, chúng ta sẽ code tiếp Feast materialize pipelineStream to stores pipeline như bên dưới.

Feast materialize pipeline

Materialize dữ liệu từ offline qua online giúp làm mới dữ liệu ở online store

data_pipeline/dags/materialize_offline_to_online.py
with DAG(
    dag_id="materlize_offline_to_online",
    default_args=DefaultConfig.DEFAULT_DAG_ARGS,
    schedule_interval="@once",
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
    tags=["data_pipeline"],
) as dag:
    materialize_task = DockerOperator(
        task_id="materialize_task",
        **DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS,
        command="/bin/bash ./scripts/feast_helper.sh materialize",
    )

Stream pipeline

Trong phần này, chúng ta sẽ làm mới features bằng cách ghi dữ liệu trực tiếp từ stream source vào offlineonline store.

Ở bài trước, chúng ta đã chạy Kafka stream. Đoạn code dưới đây sẽ thực hiện tác vụ đọc và xử lý data từ Kafka stream và lưu vào offline store và online store. Khi bạn sử dụng thư viện Feast trong đoạn code training hay inference để đọc features, bạn sẽ thấy features được cập nhật mới liên tục. Các đoạn code training và inference này sẽ được hướng dẫn trong các bài tiếp theo.

data_pipeline/dags/stream_to_stores.py
with DAG(
    dag_id="stream_to_stores",
    default_args=DefaultConfig.DEFAULT_DAG_ARGS,
    schedule_interval="@once",
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
    tags=["data_pipeline"],
) as dag:
    stream_to_online_task = DockerOperator(
        task_id="stream_to_online_task",
        command="/bin/bash -c 'cd src/stream_to_stores && python ingest.py --store online'",
        **DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS,
    )

    stream_to_offline_task = DockerOperator(
        task_id="stream_to_offline_task",
        **DefaultConfig.DEFAULT_DOCKER_OPERATOR_ARGS,
        command="/bin/bash -c 'cd src/stream_to_stores && python ingest.py --store offline'",
    )

Tổng kết

Ở bài học này, chúng ta đã sử dụng Feast SDK để lưu trữ và lấy feature từ feature store. Để đảm bảo feature luôn ở trạng thái mới nhất có thể, chúng ta cũng đã xây dựng các Airflow pipeline để cập nhật dữ liệu định kỳ cho các store.

Chúng ta cũng hoàn thành chuỗi bài về data pipeline, hy vọng bạn có thể vận dụng các kiến thức đã học để vận hành hiệu quả các luồng dữ liệu và luồng feature của mình.

Tài liệu tham khảo