빅데이터(BigData)/Airflow

airflow에서 sql_sensor 사용하기

leebaro 2021. 1. 20.
728x90

sql_sensor는 airflow에서 sql 조회 결과에 따라서 다음 task로 넘어갈지 여부를 체크하는 작업입니다.

wait_sql = SqlSensor(
    task_id = 'wait_sql',
    conn_id='oracle_db',
    sql="""
        select 
            1 
        from 
            BATCH_TABLE 
        where 
            batch_nm = '{batch_nm}' 
        and batch_date = '{batch_date}'""".format(batch_nm = 'ORDER', batch_date = current_part_date),
    poke_interval=wait_poke_intervql,
    timeout=wait_timeout,
    dag=dag
)

    poke_interval은 얼마 주기로 쿼리를 실행할지 정하는 것입니다.시간 단위는 초입니다.
    timeout을 배치가 실행된지 얼마 후에 에러로 처리할지 판단하는 것입니다. timeout으로 지정한 시간까지 sql 결과가 나오지 않는다면 에러로 처리합니다. 시간 단위는 초입니다.

 

위의 sql에 데이터 건수가 0건이 아니면 다음 task로 넘어갑니다.

conn_id는 airflow admin에서 설정한 connections에 등록한 id입니다.

 

dag 파일에서 마지막에 아래와 같이 설정하면 wait_sql task가 정상 종료된 후 task1, task2가 병렬로 실행됩니다.

 

wait_sql >> task1 >> task2
wait_sql >> task3 >> task4

 

 

 

728x90