728x90

pyspark 11

spark structured streaming + kafka를 이용한 개발 후기

최근에 진행했던 프로젝트에서 structured streaming + kafka를 사용한 경험을 공유하려고 한다. 현황 현재 운영하고 있는 추천 시스템에서는 고객의 최근 행동(상품 조회, 구매)에 따라서 추천 상품 랭킹이 개인화된다. 예를들어 오늘 키보드를 구매했다면, 추천 상품중 키보드 카테고리의 상품의 스코어를 낮춰서 추천 페이지의 아래쪽에서 보이도록 하고, 오늘 키보드를 조회하고 아직 구매하지 않았다면 현재 키보드에 관심이 있다고 판단해서 추천 상품 중 키보드 카테고리의 상품을 추천 페이지의 상단에 노출 시킨다. 이 때 최근에 방문했던 상품에 대한 로그 데이터는 현재 시간 기준으로 2~3시간 전의 것을 사용하고 있다. 그래서 추천 상품에 대한 최신성(recency)를 높여주고 싶어했다. 요구사항 현..

ModuleNotFoundError: No module named 'pyspark' 에러 발행할 때 findspark로 해결하기

pyspark 코드를 실행하기 위해서 import pyspark를 해도 아래와 같이 에러가 발생할 수 있다. ModuleNotFoundError: No module named 'pyspark' 그 이유는 pyspark가 정규 library로 인식되지 않기 때문에 pyspark의 위치를 찾을 수 없기 때문이다. 이 때는 아래와 같이 findspark 라이브러리를 설치 후 실행하면 된다. pip install findspark import findspark findspark.init() import pyspark sc = pyspark.SparkContext(appName="myAppName") 추가적으로 궁금한 내용은 아래 링크에서 찾아보면 된다. https://github.com/minrk/findspark

pyspark에서 비어있는 DataFrame 만들기

loop를 돌면서 DF된 새로운 데이터를 만들고 모든 데이터를 합치려고 할 때 loop 안에서 변수를 정의하면, 기존에 변수에 저장된 데이터가 없어진다. 이런 경우 전역변수를 만들어서 해결해야 한다. 그렇게 하기 위해서는 비어있는 Dataframe을 만들어야하는데 번거로운 작업일 수 있다. 그래서 필자는 아래와 같이 진행했다. 비어있는 문자열 변수 A를 정의한다. for문에서 Dataframe 타입의 변수 B에 새로운 데이터를 삽입한다. A 변수가 문자열 타입이면 첫 번째 반복이므로 변수 B를 삽입한다. A 변수가 DF 타입이면, 두 번째 이후의 반복이므로 unionAll을 이용해서 데이터를 추가한다. from pyspark.sql import DataFrame df_all = '' for _ in ra..

Container killed by YARN for exceeding physical memory limits 에러 발생 시

spark submit 시 아래와 같은 에러가 발생할 수 있다. 원인은 executor에 할당된 메모리가 부족하다는 의미이다. 이 경우 executor의 할당된 메모리를 늘려주면 된다. cluster.YarnScheduler: Lost executor 20 on xxx: Container killed by YARN for exceeding physical memory limits. 6 GB of 6 GB physical memory used. Consider boosting spark.executor.memoryOverhead. 아래와 같이 spark-submit 시 아래 옵션의 값을 변경해준다. spark-submit --master yarn \ ..... --executor-memory 12g

spark에서 pandas 대신 databricks의 koalas 이용하기

pandas는 spark에서 분산 병렬 처리가 되지 않기 때문에 대용량 데이터를 다루기에는 한계가 있다. 그렇다고 spark의 dataframe을 이용하면 pandas에 비해서 기능이 부족하거나 불편한 경우가 있다. 이런 경우에는 databricks에서 만든 koalas를 이용하면 된다. koalas는 분산 병렬처리가 가능하고, 문법도 pandas와 유사해서 어려움 없이 이용할 수 있다. 아래는 koalas를 이용해 df의 describe() 함수를 이용하는 방법이다. import databricks.koalas as ks sdf = spark.sql("select cnt from table") # koalas df로 변환 kdf = sdf.to_koalas() kdf.describe() ##결과 cou..

pyspark dataframe에서 join하고 컬럼을 select 하거나 drop 하기

spark dataframe에서 조인을 하면 동일한 컬럼이 2개 생길 수 있다. 안 생길 때도 있다. 어쨌든 동일한 컬럼이 중복으로 생기면 제거해줘야 한다. 이 때는 2가지 방법이 있다. .select()를 이용해 사용할 컬럼만 선택하거나 .drop을 이용해서 필요없는 컬럼을 제거해야 한다. 여러 개의 컬럼을 선택하거나 삭제할 때는 아래와 같은 방법을 이용해야 한다. df = (df.join(df_b, (df.mem == df_b.mem) & (df.prd == df_b.prd), "left_anti") .drop("df_b.mem_no,df_b.prd_no")) spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join pyspark..

728x90