728x90
데이터를 생성하는 배치를 수행하고 나면 데이터가 정상적으로 입수 됐는지 확인이 필요할 때가 있다.
이 때는 airflow에서 check_operator를 사용하면 된다.
check_operator는 sql을 실행 결과에서 반환한 값이 0인이 아닌지에 따라 성공여부를 판단한다.
0이면 실패이고, 1이면 성공이다.
druid_broker_conn_id 값은 airflow admin의 connections에 생성한 connection id 값을 입력하면 된다.
아래 예시는 druid이지만 sql은 SQLCheckOperator를 이용하면 된다.
예시 코드는 아래와 같다.
from airflow.operators.druid_check_operator import DruidCheckOperator
check_input_data = DruidCheckOperator(
task_id = "check_input_data",
druid_broker_conn_id = "druid_broker_default",
sql = """
SELECT
SUM(amt) cnt
FROM
table_name
WHERE
"__time" >= '{start_date}'
AND "__time" < '{end_date}'
HAVING SUM(amt) > 0
""".format(start_date=start_date, end_date= end_date),
dag=dag
)
<그림 : connection 설정 예시>
참고
airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/check_operator/index.html
airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/sql/index.html
airflow.apache.org/docs/apache-airflow/1.10.13/_modules/airflow/operators/druid_check_operator.html
728x90
'빅데이터(BigData) > Airflow' 카테고리의 다른 글
특정 이름의 파일을 airflow dag에서 제외하기 (0) | 2021.07.26 |
---|---|
Tree View에서 task 박스에 볼드가 없는 경우 (0) | 2021.03.09 |
airflow에서 sql_sensor 사용하기 (0) | 2021.01.20 |
airflow의 connections에서 oracle 연결 정보 설정하기 (0) | 2021.01.20 |
dag에 있는 특정 task만 실행하기 (0) | 2020.10.07 |