728x90

빅데이터(BigData)/Spark 27

spark에서 master node로 데이터를 모으기 위해서 collect를 사용하는 방법

spark에서 dataframe을 이용할 때 데이터는 worker node에 분산되어 저장된다. 이 때 어떠한 필요에 의해서 데이터를 master node로 보내야 하는 경우가 있다면 collect()를 이용하면 된다. 필자의 경우 df의 데이터를 oracle 데이터로 보내기 위해서 이 작업이 필요했다. master node는 오라클 db에 접근이 가능하지만 worker node는 보안 정책으로 인해 접근할 수 없었다. 그래서 데이터를 master node로 보내고 master node의 데이터를 다른 db로 보내려고 했다. 결론적으로는 실패 해서 다른 방법을 이용했지만 어쨌든 데이터를 master node에 모으고 싶다면 이 방법을 이용하면 된다. df.collect() 데이터가 많은 경우에는 데이터가..

dataframe(데이터프레임)으로 hive table(테이블) 생성하거나 데이터를 입력하기

데이터프레임으로 테이블을 만들어야 하는 경우에는 아래 같이 코드를 작성하면 된다. 테이블을 생성하고 데이터를 넣기 df.write.mode("overwrite").saveAsTable("스키마.테이블명") # 데이터 조회하기 spark.sql("select * from 스키마.테이블명").show() 기존에 존재하는 테이블에 데이터만 넣기 df.write.mode("append").saveAsTable("스키마.테이블명") spark.sql("select * from 스키마.테이블명").show() 참고 kontext.tech/column/spark/294/spark-save-dataframe-to-hive-table

java.time.format.DateTimeParseException: Text '2020-09-16 16:24:08.0' could not be parsed, unparsed text found at index 19 와 같은 에러가 발생하는 경우에 조치 방법

spark 2.3에서 잘 수행되던 코드가 spark 3.0에서 아래와 같은 오류가 발행했다. select unix_timestamp(update_dt) 오류 메세지 Caused by: java.time.format.DateTimeParseException: Text '2020-09-16 16:24:08.0' could not be parsed, unparsed text found at index 19 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1952) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.spa..

spark-submit으로 spark 코드 실행시 Non-ASCII character 에러 대처 방법

spark shell에서는 pyspark 코드가 실행되는데 .py 파일로 만든 후 spark-submit으로 실행하면 오류가 발생하는 경우가 있습니다. 제 경우에는 원인이 한글로 된 주석이 있어서였습니다. 해결 방법은 두 가지가 있습니다. 1. 한글로된 주석을 제거하기 2. py 파일을 utf8로 변경하는 방법 당연히 2번 방법을 원할 것입니다. 해결 방법은 아래와 같이 두 가지 중 하나를 선택하면 됩니다. 첫 번째, spark-submit을 실행하기 전 아래와 같이 pythonioencoding을 utf8로 변경합니다. export PYTHONIOENCODING=utf8 두 번째, 파이썬 파일애서 아래 코드르 추가합니다. import sys sys.stdout = open(sys.stdout.filen..

Spark에서 Dataframe을 이용하여 Hive 테이블 생성하기

df라는 데이터프레임이 있을 경우 아래와 같이 실행하면 tb_df라는 테이블이 생성된다. df.write.saveAsTable("tb_df") df.write.saveAsTable("tb_df") 만약에 기존에 동일한 이름의 테이블이 있다면 아래와 같은 에러가 발생할 것이다. Table `tb_df` already exists. 이런 에러가 발생한다면 두 가지방법으로 해결할 수 있다. 첫 번째 방법은 spark.sql을 이용하여 테이블이 있는 경우 삭제하는 것이다. spark.sql("DROP TABLE IF EXISTS tb_df") 두 번째 방법은 데이터프레임으로 테이블을 만들 때 기존에 테이블이 있다면 overwrite하는 방법이다 df.write.mode(SaveMode.Overwrite).sav..

728x90