hive 테이블에 insert overwrite 하기 일반적인 insert 쿼리는 아래와 같이 시작한다 INSERT INTO TABLE_NAME PARTITION(part_date = '20200101') overwrite는 아래와 같이 한다. INSERT OVERWRITE TABLE_NAME PARTITION(part_date = '20200101') overwrite를 할 때는 파티션 키에 있는 데이터를 모두 삭제하고 적재한다. 빅데이터(BigData)/Hive 2021.02.23
Service 'SparkUI' failed after 16 retries (starting from 4040)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries 에러 발생시 스파크 잡을 실행하면 아래와 같은 에러 메지시가 나타날 때가 있다. Service 'SparkUI' failed after 16 retries (starting from 4040)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries. 원인은 동시에 어러개의 잡이 실행될 때 SparkUI를 생성하기 위한 포트가 이미 사용 중이면 발생한다. 포트를 찾아보는 기본 값은 16이다. 가장 간단한 해결 방법은 더 많은 포트를 찾아보고 비어있는 포트에서 Spark.. 빅데이터(BigData)/Spark 2021.02.19
airflow로 데이터 입수 여부 확인하기 데이터를 생성하는 배치를 수행하고 나면 데이터가 정상적으로 입수 됐는지 확인이 필요할 때가 있다. 이 때는 airflow에서 check_operator를 사용하면 된다. check_operator는 sql을 실행 결과에서 반환한 값이 0인이 아닌지에 따라 성공여부를 판단한다. 0이면 실패이고, 1이면 성공이다. druid_broker_conn_id 값은 airflow admin의 connections에 생성한 connection id 값을 입력하면 된다. 아래 예시는 druid이지만 sql은 SQLCheckOperator를 이용하면 된다. 예시 코드는 아래와 같다. from airflow.operators.druid_check_operator import DruidCheckOperator check_in.. 빅데이터(BigData)/Airflow 2021.02.05
airflow에서 sql_sensor 사용하기 sql_sensor는 airflow에서 sql 조회 결과에 따라서 다음 task로 넘어갈지 여부를 체크하는 작업입니다. wait_sql = SqlSensor( task_id = 'wait_sql', conn_id='oracle_db', sql=""" select 1 from BATCH_TABLE where batch_nm = '{batch_nm}' and batch_date = '{batch_date}'""".format(batch_nm = 'ORDER', batch_date = current_part_date), poke_interval=wait_poke_intervql, timeout=wait_timeout, dag=dag ) poke_interval은 얼마 주기로 쿼리를 실행할지 정하는 것입니다.시.. 빅데이터(BigData)/Airflow 2021.01.20
airflow의 connections에서 oracle 연결 정보 설정하기 airflow 에서 oracle에 연결하기 위해서는 아래와 같이 세팅하면 된다. 모든 컬럼 값을 입력하고, 추가적으로 Extra에 SID와 dns를 입력하면 된다. dns는 Host와 같은 IP를 입력하면 된다. 정상적으로 입력됐는지 확인은 아래와 같이 Data Profiling에서 Ad Hoc Query를 선택 후 해당 connection으로 쿼리를 날려보면 된다. 빅데이터(BigData)/Airflow 2021.01.20
spark에서 oracle로 데이터 입력 시 ORA-01861 오류가 발생할 때 pyspark에서 oracle로 데이터를 넣을 때 아래과 같은 에러가 발생할 때가 있다. ORA-01861 : 리터럴이 형식 문자열과 일치하지 않음 직접적인 원인은 spark의 데이터 타입과 오라클의 데이터 타입이 일치하지 않는 경우에 발생한다. 필자와 같은 경우에는 날짜타입의 컬럼일 때 위와 같은 에러가 발생했다. 이 문제를 해결하기 위해서는 spark에서 data frame의 컬럼의 데이터 타입을 변경해야 한다. 수정 전에 DF의 컬럼 타입을 보면 아래와 같다. DataFrame[visit_dt : string] spark에서 날짜타입이 string일 때 timestamp로 변경하면 oracle의 컬럼이 date 타입일 때 문제가 해결된다. 데이터 타입을 수정하는 방법은 아래와 같다. df_rslt .. 빅데이터(BigData)/Spark 2021.01.18
spark에서 pandas 대신 databricks의 koalas 이용하기 pandas는 spark에서 분산 병렬 처리가 되지 않기 때문에 대용량 데이터를 다루기에는 한계가 있다. 그렇다고 spark의 dataframe을 이용하면 pandas에 비해서 기능이 부족하거나 불편한 경우가 있다. 이런 경우에는 databricks에서 만든 koalas를 이용하면 된다. koalas는 분산 병렬처리가 가능하고, 문법도 pandas와 유사해서 어려움 없이 이용할 수 있다. 아래는 koalas를 이용해 df의 describe() 함수를 이용하는 방법이다. import databricks.koalas as ks sdf = spark.sql("select cnt from table") # koalas df로 변환 kdf = sdf.to_koalas() kdf.describe() ##결과 cou.. 빅데이터(BigData)/Spark 2020.12.03
pyspark dataframe join 후 원하는 column 선택하기 spark에서 df를 조인한 후에 원하는 컬럼을 선택해야 한다. 이때 sql처럼 편하게 "*" 기호를 쓰거나, 원하는 컬럼을 선택하기를 원한다. 이 때는 아래와 같이 하면된다. "*"를 쓰기 위해서는 alias로 df의 별칭을 지정해줘야 한다. df = (df_a.alias("a").join(df_b.alias("b"), df_a.prd_no == df_b.prd_no) .selectExpr("a.*", "b.mem_no")) 빅데이터(BigData)/Spark 2020.12.01
spark-submit 중 spark config 값 변경하기 spark를 실행 중에 config 값을 변경해야하는 경우가 있다. 필자 같은 경우에는 처음 spark submit을 할 때는 spark.master를 yarn을 사용하다가 중간에 local mode로 변경해야 하는 경우가 있다. 조금 더 자세히 이야기를 하자면 yarn cluster에서는 필자가 접근하려는 db에 방화벽이 있기 때문에 접속을 못하고 driver node에서만 접근이 가능했기 때문에 사용했다. # spark session을 stand alone node로 변경 conf = spark.sparkContext._conf.setAll([('spark.master', 'local[10]'),('spark.driver.memory','8g')]) spark.sparkContext.stop() sp.. 빅데이터(BigData)/Spark 2020.11.20
spark에서 string으로 날짜 데이터 만들기 아래와 같은 패턴을 이용하면 문자열로 날짜 값을 만들 수 있다. select from_unixtime(unix_timestamp('20161023235959', 'yyyyMMddHHmmss'), 'yyyy-MM-dd HH:mm:ss') as new_format 빅데이터(BigData)/Spark 2020.11.19