728x90
sql_sensor는 airflow에서 sql 조회 결과에 따라서 다음 task로 넘어갈지 여부를 체크하는 작업입니다.
wait_sql = SqlSensor(
task_id = 'wait_sql',
conn_id='oracle_db',
sql="""
select
1
from
BATCH_TABLE
where
batch_nm = '{batch_nm}'
and batch_date = '{batch_date}'""".format(batch_nm = 'ORDER', batch_date = current_part_date),
poke_interval=wait_poke_intervql,
timeout=wait_timeout,
dag=dag
)
poke_interval은 얼마 주기로 쿼리를 실행할지 정하는 것입니다.시간 단위는 초입니다.
timeout을 배치가 실행된지 얼마 후에 에러로 처리할지 판단하는 것입니다. timeout으로 지정한 시간까지 sql 결과가 나오지 않는다면 에러로 처리합니다. 시간 단위는 초입니다.
위의 sql에 데이터 건수가 0건이 아니면 다음 task로 넘어갑니다.
conn_id는 airflow admin에서 설정한 connections에 등록한 id입니다.
dag 파일에서 마지막에 아래와 같이 설정하면 wait_sql task가 정상 종료된 후 task1, task2가 병렬로 실행됩니다.
wait_sql >> task1 >> task2
wait_sql >> task3 >> task4
728x90
'빅데이터(BigData) > Airflow' 카테고리의 다른 글
특정 이름의 파일을 airflow dag에서 제외하기 (0) | 2021.07.26 |
---|---|
Tree View에서 task 박스에 볼드가 없는 경우 (0) | 2021.03.09 |
airflow로 데이터 입수 여부 확인하기 (0) | 2021.02.05 |
airflow의 connections에서 oracle 연결 정보 설정하기 (0) | 2021.01.20 |
dag에 있는 특정 task만 실행하기 (0) | 2020.10.07 |