728x90

spark 19

spark structured streaming + kafka를 이용한 개발 후기

최근에 진행했던 프로젝트에서 structured streaming + kafka를 사용한 경험을 공유하려고 한다. 현황 현재 운영하고 있는 추천 시스템에서는 고객의 최근 행동(상품 조회, 구매)에 따라서 추천 상품 랭킹이 개인화된다. 예를들어 오늘 키보드를 구매했다면, 추천 상품중 키보드 카테고리의 상품의 스코어를 낮춰서 추천 페이지의 아래쪽에서 보이도록 하고, 오늘 키보드를 조회하고 아직 구매하지 않았다면 현재 키보드에 관심이 있다고 판단해서 추천 상품 중 키보드 카테고리의 상품을 추천 페이지의 상단에 노출 시킨다. 이 때 최근에 방문했던 상품에 대한 로그 데이터는 현재 시간 기준으로 2~3시간 전의 것을 사용하고 있다. 그래서 추천 상품에 대한 최신성(recency)를 높여주고 싶어했다. 요구사항 현..

스파크에서 로그 레벨 정의하기

spark submit을 하면 spark와 관련된 로그들을 확인할 수 있다. 이 때 너무 많은 로그가 나온다면 필요한 로그를 확인할 수 없다. 보통 정상적으로 수행하고 있는 코드라면 경고, 에러인 데이터만 확인하면 될 것이다. 이럴 때는 아래와 같은 코드를 pyspark 코드 상단에 추가하면 원하는 수준의 로그를 확인할 수 있다. from pyspark.sql import SparkSession spark = SparkSession.builder.\ master('xxx').\ appName('xxx').\ getOrCreate() spark.sparkContext.setLogLevel('WARN') 위에서 보여줄 로그의 수준은 아래 유형중에 선택하면 된다. ALL, DEBUG, ERROR, FATAL,..

Koalas에서 Cannot combine the series or dataframe because it comes from a different dataframe 에러 발생 시

pyspark에서 koalas를 이용해서 DataFrame을 사용하는 작업에서 아래와 같은 에러를 만날 수 있다. Cannot combine the series or dataframe because it comes from a different dataframe. In order to allow this operation, enable 'compute.ops_on_diff_frames' option. 이 경우 에러 메시지에도 나와있는 옵션을 아래와 같이 추가하면 된다. from databricks.koalas.config import set_option, reset_option set_option("compute.ops_on_diff_frames", True) kdf['C'] = kser # Reset ..

Container killed by YARN for exceeding physical memory limits 에러 발생 시

spark submit 시 아래와 같은 에러가 발생할 수 있다. 원인은 executor에 할당된 메모리가 부족하다는 의미이다. 이 경우 executor의 할당된 메모리를 늘려주면 된다. cluster.YarnScheduler: Lost executor 20 on xxx: Container killed by YARN for exceeding physical memory limits. 6 GB of 6 GB physical memory used. Consider boosting spark.executor.memoryOverhead. 아래와 같이 spark-submit 시 아래 옵션의 값을 변경해준다. spark-submit --master yarn \ ..... --executor-memory 12g

Service 'SparkUI' failed after 16 retries (starting from 4040)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries 에러 발생시

스파크 잡을 실행하면 아래와 같은 에러 메지시가 나타날 때가 있다. Service 'SparkUI' failed after 16 retries (starting from 4040)! Consider explicitly setting the appropriate port for the service 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries. 원인은 동시에 어러개의 잡이 실행될 때 SparkUI를 생성하기 위한 포트가 이미 사용 중이면 발생한다. 포트를 찾아보는 기본 값은 16이다. 가장 간단한 해결 방법은 더 많은 포트를 찾아보고 비어있는 포트에서 Spark..

spark에서 oracle로 데이터 입력 시 ORA-01861 오류가 발생할 때

pyspark에서 oracle로 데이터를 넣을 때 아래과 같은 에러가 발생할 때가 있다. ORA-01861 : 리터럴이 형식 문자열과 일치하지 않음 직접적인 원인은 spark의 데이터 타입과 오라클의 데이터 타입이 일치하지 않는 경우에 발생한다. 필자와 같은 경우에는 날짜타입의 컬럼일 때 위와 같은 에러가 발생했다. 이 문제를 해결하기 위해서는 spark에서 data frame의 컬럼의 데이터 타입을 변경해야 한다. 수정 전에 DF의 컬럼 타입을 보면 아래와 같다. DataFrame[visit_dt : string] spark에서 날짜타입이 string일 때 timestamp로 변경하면 oracle의 컬럼이 date 타입일 때 문제가 해결된다. 데이터 타입을 수정하는 방법은 아래와 같다. df_rslt ..

spark에서 pandas 대신 databricks의 koalas 이용하기

pandas는 spark에서 분산 병렬 처리가 되지 않기 때문에 대용량 데이터를 다루기에는 한계가 있다. 그렇다고 spark의 dataframe을 이용하면 pandas에 비해서 기능이 부족하거나 불편한 경우가 있다. 이런 경우에는 databricks에서 만든 koalas를 이용하면 된다. koalas는 분산 병렬처리가 가능하고, 문법도 pandas와 유사해서 어려움 없이 이용할 수 있다. 아래는 koalas를 이용해 df의 describe() 함수를 이용하는 방법이다. import databricks.koalas as ks sdf = spark.sql("select cnt from table") # koalas df로 변환 kdf = sdf.to_koalas() kdf.describe() ##결과 cou..

728x90