728x90

빅데이터(BigData)/Airflow 14

airflow로 데이터 입수 여부 확인하기

데이터를 생성하는 배치를 수행하고 나면 데이터가 정상적으로 입수 됐는지 확인이 필요할 때가 있다. 이 때는 airflow에서 check_operator를 사용하면 된다. check_operator는 sql을 실행 결과에서 반환한 값이 0인이 아닌지에 따라 성공여부를 판단한다. 0이면 실패이고, 1이면 성공이다. druid_broker_conn_id 값은 airflow admin의 connections에 생성한 connection id 값을 입력하면 된다. 아래 예시는 druid이지만 sql은 SQLCheckOperator를 이용하면 된다. 예시 코드는 아래와 같다. from airflow.operators.druid_check_operator import DruidCheckOperator check_in..

airflow에서 sql_sensor 사용하기

sql_sensor는 airflow에서 sql 조회 결과에 따라서 다음 task로 넘어갈지 여부를 체크하는 작업입니다. wait_sql = SqlSensor( task_id = 'wait_sql', conn_id='oracle_db', sql=""" select 1 from BATCH_TABLE where batch_nm = '{batch_nm}' and batch_date = '{batch_date}'""".format(batch_nm = 'ORDER', batch_date = current_part_date), poke_interval=wait_poke_intervql, timeout=wait_timeout, dag=dag ) poke_interval은 얼마 주기로 쿼리를 실행할지 정하는 것입니다.시..

dag에 있는 특정 task만 실행하기

airflow를 이용해서 data pipeline을 관리하고 있다면 신규로 추가 되는 작업에 대해서 테스트가 필요하다. 이 때 dag를 기준으로 테스트를 하면 이전에 처리되는 task들을 모두 실행해야하기 때문에 시간과 리소스에 낭비가 발생한다. 이럴 때는 특정 task만 실행하는 방법을 사용하면된다. #airflow dag명 task명 execution_date airflow dag task 2020-01-11 참고 airflow.apache.org/docs/stable/tutorial.html

728x90