728x90

빅데이터(BigData) 54

airflow에서 log 디렉토리 변경하기

airflow의 로그 파일이 적재되는 기본 경로는 다음와 같다. {AIRFLOW_HOME}/logs/ 만약 로그 파일을 다른 곳에 적재하고 싶다면 아래와 같은 작업을 해야 한다. 1. vi airflow.cfg 2. base_log_folder, child_process_log_directory 두개의 경로를 변경하기 3. airflow 재실행하기 - 필자는 재실행했지만 재실행 안해도 적용이 되는지는 확인해보지 못했다. https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#base-log-folder https://airflow.apache.org/docs/apache-airflow/stable/configuration..

spark structured streaming + kafka를 이용한 개발 후기

최근에 진행했던 프로젝트에서 structured streaming + kafka를 사용한 경험을 공유하려고 한다. 현황 현재 운영하고 있는 추천 시스템에서는 고객의 최근 행동(상품 조회, 구매)에 따라서 추천 상품 랭킹이 개인화된다. 예를들어 오늘 키보드를 구매했다면, 추천 상품중 키보드 카테고리의 상품의 스코어를 낮춰서 추천 페이지의 아래쪽에서 보이도록 하고, 오늘 키보드를 조회하고 아직 구매하지 않았다면 현재 키보드에 관심이 있다고 판단해서 추천 상품 중 키보드 카테고리의 상품을 추천 페이지의 상단에 노출 시킨다. 이 때 최근에 방문했던 상품에 대한 로그 데이터는 현재 시간 기준으로 2~3시간 전의 것을 사용하고 있다. 그래서 추천 상품에 대한 최신성(recency)를 높여주고 싶어했다. 요구사항 현..

ERROR cluster.YarnScheduler: Lost executor 1 on xxx-Xxxx: Slave lost 에러 발생 시

spark-submit으로 Spark streaming을 실행하면 2시간 마다 배치가 중단되는 현상이 발생하고 아래와 같은 에러가 나타난다. local모드로 실행하면 문제가 없지만 client 모드 또는 cluster 모드로 실행하면 에러가 발생한다. [2021-08-09 02:19:05,746] {bash_operator.py:128} INFO - 21/08/09 02:19:05 ERROR cluster.YarnScheduler: Lost executor 1 on xxx-Xxxx: Slave lost [2021-08-09 02:19:05,911] {bash_operator.py:128} INFO - 21/08/09 02:19:05 ERROR client.TransportClient: Failed to ..

특정 이름의 파일을 airflow dag에서 제외하기

airflow dag 파일을 jupyter notebook으로 조회를 하면 checkpoint 파일이 생성되고 airflow admin의 dag 리스트에서 조회되는 것을 확인할 수 있다. 이러한 파일들은 .ipynb_checkpoints 디렉토리 안에 존재한다. 이 파일들을 dag 목록에서 보여주지 않으려면 아래와 같이 실행하면 된다. 1. dags 디렉토리에 .airflowignore 파일을 생성한다. 2. .airflowignore 파일에 .ipynb_checkpoints를 입력한다. 이 것은 .ipynb_checkpoints 디렉토리 안에 있는 파일들은 모두 dag에서 보여주지 않는다는 의미다. 참고 https://stackoverflow.com/questions/53660558/airflow-da..

yarn log 확인하기

console에서 로그를 확인하거나 spark ui에서 로그를 확인하기 어려운 상황에서는 아래 yarn 명령어를 이용해서 로그 확인이 가능하다. 1. 특정 application 로그 확인 yarn logs -applicationId 2. 에러로그만 확인 yarn logs -applicationId -log_files stderr 3. container별 로그 확인 # application에서 사용하는 container 전체출력 yarn logs -applicationId -show_application_log_info # 위에 container정보에서 특정 container에 대한 로그 확인 yarn logs -applicationId -containerId 출처: https://semode.tistor..

ModuleNotFoundError: No module named 'pyspark' 에러 발행할 때 findspark로 해결하기

pyspark 코드를 실행하기 위해서 import pyspark를 해도 아래와 같이 에러가 발생할 수 있다. ModuleNotFoundError: No module named 'pyspark' 그 이유는 pyspark가 정규 library로 인식되지 않기 때문에 pyspark의 위치를 찾을 수 없기 때문이다. 이 때는 아래와 같이 findspark 라이브러리를 설치 후 실행하면 된다. pip install findspark import findspark findspark.init() import pyspark sc = pyspark.SparkContext(appName="myAppName") 추가적으로 궁금한 내용은 아래 링크에서 찾아보면 된다. https://github.com/minrk/findspark

pyspark에서 비어있는 DataFrame 만들기

loop를 돌면서 DF된 새로운 데이터를 만들고 모든 데이터를 합치려고 할 때 loop 안에서 변수를 정의하면, 기존에 변수에 저장된 데이터가 없어진다. 이런 경우 전역변수를 만들어서 해결해야 한다. 그렇게 하기 위해서는 비어있는 Dataframe을 만들어야하는데 번거로운 작업일 수 있다. 그래서 필자는 아래와 같이 진행했다. 비어있는 문자열 변수 A를 정의한다. for문에서 Dataframe 타입의 변수 B에 새로운 데이터를 삽입한다. A 변수가 문자열 타입이면 첫 번째 반복이므로 변수 B를 삽입한다. A 변수가 DF 타입이면, 두 번째 이후의 반복이므로 unionAll을 이용해서 데이터를 추가한다. from pyspark.sql import DataFrame df_all = '' for _ in ra..

728x90