728x90

빅데이터(BigData)/Spark 27

spark structured streaming + kafka를 이용한 개발 후기

최근에 진행했던 프로젝트에서 structured streaming + kafka를 사용한 경험을 공유하려고 한다. 현황 현재 운영하고 있는 추천 시스템에서는 고객의 최근 행동(상품 조회, 구매)에 따라서 추천 상품 랭킹이 개인화된다. 예를들어 오늘 키보드를 구매했다면, 추천 상품중 키보드 카테고리의 상품의 스코어를 낮춰서 추천 페이지의 아래쪽에서 보이도록 하고, 오늘 키보드를 조회하고 아직 구매하지 않았다면 현재 키보드에 관심이 있다고 판단해서 추천 상품 중 키보드 카테고리의 상품을 추천 페이지의 상단에 노출 시킨다. 이 때 최근에 방문했던 상품에 대한 로그 데이터는 현재 시간 기준으로 2~3시간 전의 것을 사용하고 있다. 그래서 추천 상품에 대한 최신성(recency)를 높여주고 싶어했다. 요구사항 현..

ERROR cluster.YarnScheduler: Lost executor 1 on xxx-Xxxx: Slave lost 에러 발생 시

spark-submit으로 Spark streaming을 실행하면 2시간 마다 배치가 중단되는 현상이 발생하고 아래와 같은 에러가 나타난다. local모드로 실행하면 문제가 없지만 client 모드 또는 cluster 모드로 실행하면 에러가 발생한다. [2021-08-09 02:19:05,746] {bash_operator.py:128} INFO - 21/08/09 02:19:05 ERROR cluster.YarnScheduler: Lost executor 1 on xxx-Xxxx: Slave lost [2021-08-09 02:19:05,911] {bash_operator.py:128} INFO - 21/08/09 02:19:05 ERROR client.TransportClient: Failed to ..

ModuleNotFoundError: No module named 'pyspark' 에러 발행할 때 findspark로 해결하기

pyspark 코드를 실행하기 위해서 import pyspark를 해도 아래와 같이 에러가 발생할 수 있다. ModuleNotFoundError: No module named 'pyspark' 그 이유는 pyspark가 정규 library로 인식되지 않기 때문에 pyspark의 위치를 찾을 수 없기 때문이다. 이 때는 아래와 같이 findspark 라이브러리를 설치 후 실행하면 된다. pip install findspark import findspark findspark.init() import pyspark sc = pyspark.SparkContext(appName="myAppName") 추가적으로 궁금한 내용은 아래 링크에서 찾아보면 된다. https://github.com/minrk/findspark

pyspark에서 비어있는 DataFrame 만들기

loop를 돌면서 DF된 새로운 데이터를 만들고 모든 데이터를 합치려고 할 때 loop 안에서 변수를 정의하면, 기존에 변수에 저장된 데이터가 없어진다. 이런 경우 전역변수를 만들어서 해결해야 한다. 그렇게 하기 위해서는 비어있는 Dataframe을 만들어야하는데 번거로운 작업일 수 있다. 그래서 필자는 아래와 같이 진행했다. 비어있는 문자열 변수 A를 정의한다. for문에서 Dataframe 타입의 변수 B에 새로운 데이터를 삽입한다. A 변수가 문자열 타입이면 첫 번째 반복이므로 변수 B를 삽입한다. A 변수가 DF 타입이면, 두 번째 이후의 반복이므로 unionAll을 이용해서 데이터를 추가한다. from pyspark.sql import DataFrame df_all = '' for _ in ra..

스파크에서 로그 레벨 정의하기

spark submit을 하면 spark와 관련된 로그들을 확인할 수 있다. 이 때 너무 많은 로그가 나온다면 필요한 로그를 확인할 수 없다. 보통 정상적으로 수행하고 있는 코드라면 경고, 에러인 데이터만 확인하면 될 것이다. 이럴 때는 아래와 같은 코드를 pyspark 코드 상단에 추가하면 원하는 수준의 로그를 확인할 수 있다. from pyspark.sql import SparkSession spark = SparkSession.builder.\ master('xxx').\ appName('xxx').\ getOrCreate() spark.sparkContext.setLogLevel('WARN') 위에서 보여줄 로그의 수준은 아래 유형중에 선택하면 된다. ALL, DEBUG, ERROR, FATAL,..

toPandas() 후 조회 시 index 2 is out of bounds for axis 0 with size 에러가 발생할 때

spark dataframe 또는 koalas를 이용해서 DF를 만들고 toPandas()를 이용해서 pandas DF로 변환해야하는 경우가 있다. 필자 같은 경우 DF로 heatmap을 만드는데 koalas DF에서 만들면 에러가 발생해서 pandas df로 변환했다. 문제는 변환 후 조회하면 "index 2 is out of bounds for axis 0 with size" 와 같은 에러가 발생했다. 구체적으로 DF에 NaN 값이 있었고, df.fillna(0)으로 NaN을 0값으로 변환한 경우에 에러가 발생했다. 이경우 toPandas() 코드 윗 부분에 아래와 같은 코드를 추가하면 된다. 파라미터를 -1로 하면 동일한 에러가 발생하는 것을 확인할 수 있다. pd.set_option('displa..

728x90