데이터 포멧 종류
- 데이터는 디스크에 파일로 저장
- Unstructured와 Semi-structured만 Human Readable함
- 하려는 일에 맞게 최적화 필요
Unstructured | Semi-structured (Type 정보가 없음) | Structured (Type 정보가 있음) |
Text | JSON | PARQUET |
XML | AVRO | |
CSV | ORC | |
SequenceFile |
- Avro : Apache에서 개발한 파일 포맷
- Parquet : 트위터와 클라우데라에서 공동 개발한 Hbase에 최적화된 파일 포맷이다
- ORC: Hive에서 개발한 Hive에 최적화된 파일 포맷
Spark의 주요 파일 타입
특징 | CSV | JSON | PARQUET | AVRO |
컬럼 스토리지 | X | X | O | X |
압축 가능 | O | O | O | O |
Splittable | O | O | O | O |
Human Readable | O | O | X | X |
Nested structure support | X | O | O | O |
Schema evolution | X | X | O | O |
- 컬럼 스토리지 : 컬럼 별로 저장하는지
- Splittable : 데이터블럭이 하나의 partition으로 올라갈 수 있는지
- 압축되면 Splittable 하지 않음 (압축 방식에 따라 다름 - snappy 압축이라면 Splittable)
- gzip으로 압축된 CSV, JSON 파일은 Splittable하지 않기 때문에 하나의 executor가 일단 처리하게 되며 메모리 에러가 날 확률이 높음
- Nested structure support : 자료구조 안에 자료구조가 있는가
- Schema evolution : 기존데이터에 새로운 컬럼이 나타났을때 문제 없이 사용가능한가
Spark의 기본 파일 포맷 : Parquet
- Parquet가 사용하는 방식: Hybrid Storage (Row Group)
- 하나의 데이터 블록은 하나의 Row Group으로 구성됨
Pyspark로 데이터 Write 하는 방법
1. SparkSession 생성
- spark.jars.packages : AVRO는 기본적으로 spark가 로딩되지 않아 직접 지정해주어야함.
from pyspark.sql import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Spark Writing Demo") \
.master("local[3]") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
.getOrCreate()
2. 데이터 호출
- 애플 주식 가격 데이터
df = spark.read \
.format("csv") \
.load("appl_stock.csv")
3. 파티션 개수 및 레코드 개수 확인
- spark_partition_id : 데이터 프레임을 구성하는 파티션 별 id return
print("Num Partitions before: " + str(df.rdd.getNumPartitions()))
df.groupBy(spark_partition_id()).count().show()
4. Repartiiton
- 파티션을 나눠줄 수 있다.
- 4개의 partition으로 데이터를 나눈다.
df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()
5. 파티션 줄이기 (coalesce)
- coalesce : 셔플링을 최소화 하는 방향으로 파티션의 수를 줄인다.
- 2개의 파티션으로 줄인다.
df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()
6. 데이터 저장
6-1) AVRO 형식으로 저장
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
6-2) Parquet 형식으로 저장
df2.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "dataOutput/parquet/") \
.save()
!ls -tl dataOutput/parquet/
- snappy 압축 방식 : Splittable한 압축방식
6-3) JSON 형식으로 저장
df3.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataOutput/json/") \
.save()
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형
'Spark & Hadoop > Spark' 카테고리의 다른 글
[Spark] HDFS Bucketing & Partitioning (Partitioning pyspark 코드 예시) (0) | 2023.10.08 |
---|---|
[Spark] Schema Evolution (1) | 2023.10.03 |
[Spark] Windows 10에 Spark 설치하기 (Java설치, 환경변수 세팅) (1) | 2023.10.02 |
[Spark] SparkSession 생성과 환경변수 세팅 (0) | 2023.09.30 |
[Spark] Spark 데이터 구조 (RDD, DataFrame, Dataset) (0) | 2023.09.29 |