XCom
- Apache Airflow에서 사용되는 개념으로, Task 간에 데이터를 전달하는 방법 제공
- Airflow에서 Task는 서로 독립적으로 실행되므로, 각 Task 간에 데이터를 공유하거나 전달하기 위해 XCom 활용
주요 기능
- 데이터 공유 : Task 간에 데이터를 전달, 공유 가능하게 하는 기능 제공
- Push & Pop
- Push : Task에서 데이터를 Xcom으로 저장
-> 명시적으로 xcom_push하는 것이 아닌 return으로 자동 push 지원! - Pop : 다른 Task에서 Push된 데이터를 꺼내서 사용
- Push : Task에서 데이터를 Xcom으로 저장
장점
- 간편한 데이터 공유 : 데이터를 전역 변수나 파일 시스템에 저장할 필요 없이, Airflow의 메타 데이터베이스에 저장하여 빠르고 안전한 데이터 공유 가능
- 비동기 작업 처리 : Task가 병렬로 실행되더라도, XCom을 통해 각 Task 간의 데이터를 동기화하여 Task 실행 가능
- 디버깅 및 로깅 : XCom을 사용하여 중간 데이터를 기록할 수 있어, 문제 발생 시 디버깅에 유리
- 유연한 데이터 형식 처리 : XCom은 데이털르 문자열 형식으로 저장하지만, 복잡한 객체도 pickle을 통해 저장 가능
예시
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_data_to_xcom(**kwargs):
kwargs['ti'].xcom_push(key='my_key', value='some_value')
# 명시적으로 xcom_push 하는 것이 아닌 return으로 자동 push 가능
# return some_value
def pull_data_from_xcom(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='push_data_task', key='my_key')
# value = kwargs['ti'].xcom_pull(task_ids='push_data_task', key='return_value')
print(f"Pulled value: {value}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 11, 10),
}
dag = DAG('xcom_example', default_args=default_args, schedule_interval=None)
push_data_task = PythonOperator(
task_id='push_data_task',
python_callable=push_data_to_xcom,
provide_context=True,
dag=dag,
)
pull_data_task = PythonOperator(
task_id='pull_data_task',
python_callable=pull_data_from_xcom,
provide_context=True,
dag=dag,
)
push_data_task >> pull_data_task
'데이터 > Airflow' 카테고리의 다른 글
[Airflow] 6. Slack Alert (0) | 2024.11.18 |
---|---|
[Airflow] 5. Backfill? (1) | 2024.11.12 |
[Airflow] 4. Airflow.cfg? (0) | 2024.11.11 |
[Airflow] 3. Python Operator / Task Decorator (0) | 2024.11.10 |
[Airflow] 1. Airflow? (1) | 2024.11.07 |