일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- apollo-server-v3
- 마법의연금굴리기
- Zone2
- 가람집옹심이
- 이코노미스트한국구독센터
- neovim
- 마연굴
- deepseek
- lner
- 여니브레드
- 강릉여행
- apollo-sandbox
- 플라스틱은 어떻게 브랜드의 무기가 되는가
- 달리기
- 여행
- 한국걱정
- 송고버섯피자
- 오운완
- 트럼프2.0시대
- 일권하는사회
- 티지아이포럼
- schema-registry
- 프로젝트헤일메리
- 런데이애플워치
- 루스틱
- 저동하녹
- 중사랑
- kafka-connect
- 런데이
- 잘쉬어야지
- Today
- Total
해뜨기전에자자
airflow 개요 및 사용하면서 신경써야할 부분 본문
한줄 요약
airflow는 workflow engine이다. 매일 돌아야 하는 offline배치를 파이프라인 구성하는 것이 목적인 툴이다. hdfs, spark, aws s3 등 다양한 operator를 지원해서 사용자 풀 및 레퍼런스도 많은 편이다. 요즘은 k8s를 많이 써서 argo같은 대체제도 있는 듯 하다.
아키텍처
https://airflow.apache.org/docs/stable/executor/celery.html
현재 db로는 postgre, queue로는 redis를 쓰고 있다.
기본 컨셉
https://airflow.apache.org/docs/stable/concepts.html
-
DAG는 태스크로 구성된다
-
각 태스크는 오퍼레이터 클래스를 인스턴스화하여 만든다. 구성한 오퍼레이터 인스턴스는 다음과 같이 태스크가 된다. my_task = MyOperator(...)
-
DAG가 시작되면 Airflow는 데이터베이스에 DAG 런 항목을 만든다
-
특정 DAG 런 맥락에서 태스크를 실행하면 태스크 인스턴스가 만들어진다.
-
오퍼레이터
- BashOperaton, PythonOperator, EmailOperator, SimpleHttpOperator
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator 같은 db 커맨드
- DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator etc
- airflow sensor 다른 시스템에서 장시간 실행 중인 태스크를 모니터링하는데 사용하는 특별한 유형의 오퍼레이터 an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
-
태스크끼리 Xcom를 이용해 통신할 수 있다.
- 각 태스크 인스턴스는 xcom_push를 이용해서 xcom 정보 저장, xcom_pull로 정보를 불러올 수 있다.
- execute 가 반환하는 값은 return_value 키 아래 Xcom 메세지로 저장된다. Xcom 정보는 db에 피클화(python serialization)시켜 저장하므로 크기가 큰 객체는 지양하는 것이 좋다.
- 보통 storage위치 등을 넘김.
- TTL 없음
- 가급적 사용하지 않는 것이 좋을 것 같다. https://medium.com/the-prefect-blog/why-not-airflow-4cfa423299c4
- execute 는 여러번 재시도 될 수 있도록 멱등이어야 한다.
execution_date: DAG 실행 시 입력 받는 유일한 변수. 헷갈리게 하는 부분
- execute_date는 실행 하려는 날짜가 아니라 run id, 즉 주문 번호 같은 것
- 굳이 따지자면 DAG의 시작시간이 아니라 시작 시간으로 제한되는 interval의 끝으로 해석 됨
- 처음 airflow 요구사항이 4월 7일에 대한 데이터 작업을 4월 8일에 동작하게 하려고 했던 것이기 때문.
- execution_date 없이 실행할 수 없으며 동일한 execution_date를 두 번 실행할 수 없다.
- 다시 실행 시키려면 실패 처리를 잘 해야겠다
- 비정기적 스케줄, 동시에 여러 개의 워크플로우 실행, 매뉴얼에 의해서만 유지되는 워크플로우 실행은 airflow 의 철학과 맞지 않음.
스케줄러
- backbone of airflow. 중앙집중식
- 수 초마다 DAG 폴더 파싱
- DagBag(dag 디렉토리)에 있는 파일 안에 "airflow"나 "DAG"문자열이 있어야 파싱 대상이 됨 https://airflow.apache.org/docs/stable/faq.html
- 명시적으로 .airflowignore 에 적어 줄 수도 있다. https://airflow.apache.org/docs/stable/concepts.html#airflowignore
- DAG 가 실행할 준비가 되었는지 결정하기 위해 스케줄을 확인
- 어떤 Task가 실행 준비가 되었는지 결정하기 위해 모든 Task 의존성 확인
- 최종 DAG 상태를 DB에 넣음
- 수 초마다 DAG 폴더 파싱
- 많은 작업이나 시간이 중요한 DAG의 경우, 병목이 일어나 딜레이가 일어날 수 있음
- sla, timeout 등 설정으로 알 수 있게 하자
Frequently used CLI command
airflow initdb
airflow test <dag> <task> <date>
airflow run <dag> <task> <date>
airflow backfill <dag> -s <start_date> -e <end_date>
ariflow backfill <dag> -s <start_date> -e <end_date> -m true # mark success
airflow clear <dag> -s <start_date> -e <end_date> -t <task_regex>
macro reference
https://airflow.apache.org/docs/stable/macros.html
best practice
https://airflow.apache.org/docs/stable/best-practices.html
참고 문서
- Operator, Sensor 직접 만들어보기. 중간에 IPython 으로 디버깅 하는거 시도해보면 좋을 거 같다 https://aldente0630.github.io/data-engineering/2018/06/17/developing-workflows-with-apache-airflow.html
- airflow tips, tricks and pitfalls https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb#.2zt0krkn2
- dag를 retry 할 수 있도록 idempotent 하게 작성할 것
- airflow 1.6에서 가장 큰 변화는 DagRun의 도입. backfill 보다는 trigger_dag
- celearyExecutor는 종종 멈춘다