빅데이터(BigData)/Airflow

airflow로 데이터 입수 여부 확인하기

leebaro 2021. 2. 5.
728x90

데이터를 생성하는 배치를 수행하고 나면 데이터가 정상적으로 입수 됐는지 확인이 필요할 때가 있다.

 

이 때는 airflow에서 check_operator를 사용하면 된다.

 

check_operator는 sql을 실행 결과에서 반환한 값이 0인이 아닌지에 따라 성공여부를 판단한다.

 

0이면 실패이고, 1이면 성공이다.

 

druid_broker_conn_id 값은 airflow admin의 connections에 생성한 connection id 값을 입력하면 된다.

 

아래 예시는 druid이지만 sql은 SQLCheckOperator를 이용하면 된다.

 

예시 코드는 아래와 같다.

from airflow.operators.druid_check_operator import DruidCheckOperator

check_input_data = DruidCheckOperator(
    task_id = "check_input_data",
    druid_broker_conn_id = "druid_broker_default",
    sql = """
        SELECT
        SUM(amt) cnt
        FROM
        table_name
        WHERE 
            "__time" >= '{start_date}'
        AND "__time" <  '{end_date}'
        HAVING SUM(amt) > 0
    """.format(start_date=start_date, end_date= end_date),
    dag=dag
)    

 

<그림 : connection 설정 예시>


참고

airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/check_operator/index.html

 

airflow.operators.check_operator — Airflow Documentation

 

airflow.apache.org

airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/sql/index.html

 

airflow.operators.sql — Airflow Documentation

 

airflow.apache.org

airflow.apache.org/docs/apache-airflow/1.10.13/_modules/airflow/operators/druid_check_operator.html

 

airflow.operators.druid_check_operator — Airflow Documentation

 

airflow.apache.org

 

728x90