Spark & Hadoop/Spark

[Spark] Spark 파일 Type 종류 및 Pyspark로 데이터 Write하는 방법 (Parquet / AVRO)

YSY^ 2023. 10. 3. 19:37

데이터 포멧 종류

  • 데이터는 디스크에 파일로 저장
  • Unstructured와 Semi-structured만 Human Readable함
  • 하려는 일에 맞게 최적화 필요
Unstructured Semi-structured (Type 정보가 없음) Structured (Type 정보가 있음)
Text JSON PARQUET
  XML AVRO
  CSV ORC
    SequenceFile
  • Avro : Apache에서 개발한 파일 포맷
  • Parquet : 트위터와 클라우데라에서 공동 개발한 Hbase에 최적화된 파일 포맷이다
  • ORC:  Hive에서 개발한 Hive에 최적화된 파일 포맷

Spark의 주요 파일 타입

특징 CSV JSON PARQUET AVRO
컬럼 스토리지 X X O X
압축 가능  O O O O
Splittable  O O O O
Human Readable O O X X
Nested structure support X O O O
Schema evolution X X O O
  • 컬럼 스토리지 : 컬럼 별로 저장하는지
  • Splittable : 데이터블럭이 하나의 partition으로 올라갈 수 있는지
    • 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)
    • gzip으로 압축된 CSV, JSON 파일은 Splittable하지 않기 때문에 하나의 executor가 일단 처리하게 되며 메모리 에러가 날 확률이 높음
  • Nested structure support : 자료구조 안에 자료구조가 있는가
  • Schema evolution : 기존데이터에 새로운 컬럼이 나타났을때 문제 없이 사용가능한가

Spark의 기본 파일 포맷 : Parquet

  • Parquet가 사용하는 방식: Hybrid Storage (Row Group)
    • 하나의 데이터 블록은 하나의 Row Group으로 구성됨

Pyspark로 데이터 Write 하는 방법

1. SparkSession 생성

  • spark.jars.packages : AVRO는 기본적으로 spark가 로딩되지 않아 직접 지정해주어야함.
from pyspark.sql import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Spark Writing Demo") \
        .master("local[3]") \
        .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
        .getOrCreate()

2. 데이터 호출

  • 애플 주식 가격 데이터

appl_stock.csv
0.14MB

df = spark.read \
    .format("csv") \
    .load("appl_stock.csv")

3. 파티션 개수 및 레코드 개수 확인

  • spark_partition_id : 데이터 프레임을 구성하는 파티션 별 id return
print("Num Partitions before: " + str(df.rdd.getNumPartitions()))
df.groupBy(spark_partition_id()).count().show()

 

4. Repartiiton

  • 파티션을 나눠줄 수 있다.
  • 4개의 partition으로 데이터를 나눈다.
df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()

5. 파티션 줄이기 (coalesce)

  • coalesce : 셔플링을 최소화 하는 방향으로 파티션의 수를 줄인다.
  • 2개의 파티션으로 줄인다.
df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()

6. 데이터 저장

6-1) AVRO 형식으로 저장

df.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "dataOutput/avro/") \
    .save()

6-2) Parquet 형식으로 저장

df2.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "dataOutput/parquet/") \
    .save()
!ls -tl dataOutput/parquet/

  • snappy 압축 방식 : Splittable한 압축방식

6-3) JSON 형식으로 저장

df3.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "dataOutput/json/") \
    .save()

 

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

 

728x90
반응형