728x90

빅데이터(BigData) 54

Service 'SparkUI' failed after 16 retries (starting from 4040)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries 에러 발생시

스파크 잡을 실행하면 아래와 같은 에러 메지시가 나타날 때가 있다. Service 'SparkUI' failed after 16 retries (starting from 4040)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries. 원인은 동시에 어러개의 잡이 실행될 때 SparkUI를 생성하기 위한 포트가 이미 사용 중이면 발생한다. 포트를 찾아보는 기본 값은 16이다. 가장 간단한 해결 방법은 더 많은 포트를 찾아보고 비어있는 포트에서 Spark..

airflow로 데이터 입수 여부 확인하기

데이터를 생성하는 배치를 수행하고 나면 데이터가 정상적으로 입수 됐는지 확인이 필요할 때가 있다. 이 때는 airflow에서 check_operator를 사용하면 된다. check_operator는 sql을 실행 결과에서 반환한 값이 0인이 아닌지에 따라 성공여부를 판단한다. 0이면 실패이고, 1이면 성공이다. druid_broker_conn_id 값은 airflow admin의 connections에 생성한 connection id 값을 입력하면 된다. 아래 예시는 druid이지만 sql은 SQLCheckOperator를 이용하면 된다. 예시 코드는 아래와 같다. from airflow.operators.druid_check_operator import DruidCheckOperator check_in..

airflow에서 sql_sensor 사용하기

sql_sensor는 airflow에서 sql 조회 결과에 따라서 다음 task로 넘어갈지 여부를 체크하는 작업입니다. wait_sql = SqlSensor( task_id = 'wait_sql', conn_id='oracle_db', sql=""" select 1 from BATCH_TABLE where batch_nm = '{batch_nm}' and batch_date = '{batch_date}'""".format(batch_nm = 'ORDER', batch_date = current_part_date), poke_interval=wait_poke_intervql, timeout=wait_timeout, dag=dag ) poke_interval은 얼마 주기로 쿼리를 실행할지 정하는 것입니다.시..

spark에서 oracle로 데이터 입력 시 ORA-01861 오류가 발생할 때

pyspark에서 oracle로 데이터를 넣을 때 아래과 같은 에러가 발생할 때가 있다. ORA-01861 : 리터럴이 형식 문자열과 일치하지 않음 직접적인 원인은 spark의 데이터 타입과 오라클의 데이터 타입이 일치하지 않는 경우에 발생한다. 필자와 같은 경우에는 날짜타입의 컬럼일 때 위와 같은 에러가 발생했다. 이 문제를 해결하기 위해서는 spark에서 data frame의 컬럼의 데이터 타입을 변경해야 한다. 수정 전에 DF의 컬럼 타입을 보면 아래와 같다. DataFrame[visit_dt : string] spark에서 날짜타입이 string일 때 timestamp로 변경하면 oracle의 컬럼이 date 타입일 때 문제가 해결된다. 데이터 타입을 수정하는 방법은 아래와 같다. df_rslt ..

spark에서 pandas 대신 databricks의 koalas 이용하기

pandas는 spark에서 분산 병렬 처리가 되지 않기 때문에 대용량 데이터를 다루기에는 한계가 있다. 그렇다고 spark의 dataframe을 이용하면 pandas에 비해서 기능이 부족하거나 불편한 경우가 있다. 이런 경우에는 databricks에서 만든 koalas를 이용하면 된다. koalas는 분산 병렬처리가 가능하고, 문법도 pandas와 유사해서 어려움 없이 이용할 수 있다. 아래는 koalas를 이용해 df의 describe() 함수를 이용하는 방법이다. import databricks.koalas as ks sdf = spark.sql("select cnt from table") # koalas df로 변환 kdf = sdf.to_koalas() kdf.describe() ##결과 cou..

spark-submit 중 spark config 값 변경하기

spark를 실행 중에 config 값을 변경해야하는 경우가 있다. 필자 같은 경우에는 처음 spark submit을 할 때는 spark.master를 yarn을 사용하다가 중간에 local mode로 변경해야 하는 경우가 있다. 조금 더 자세히 이야기를 하자면 yarn cluster에서는 필자가 접근하려는 db에 방화벽이 있기 때문에 접속을 못하고 driver node에서만 접근이 가능했기 때문에 사용했다. # spark session을 stand alone node로 변경 conf = spark.sparkContext._conf.setAll([('spark.master', 'local[10]'),('spark.driver.memory','8g')]) spark.sparkContext.stop() sp..

728x90