Spark 데이터 시스템 아키텍처
- 큰데이터를 ETL하거나 일시적으로 데이터를 select할때는 hive나 presto도 상관 없음
- 하지만 spark가 각광받는 이유는 하나의 시스템으로 다양한 것들을 할 수 있기 때문
- 외부데이터의 예 : RDS, Nocurl 등
- 외부데이터를 주기적으로 ETL을 통해 HDFS로 가져와주어야 함.
- 이를 위해
- 1. 주기적으로 ETL작업 진행 (보통 Airflow를 활용)
- 2. 필요할 때 바로 spark에서 로딩
- 내부든 외부든 spark에 올라가는 순간 똑같은 데이터가 됨
데이터 병렬처리와 파티션(Partition)
- 데이터가 먼저 분산되어야함
- 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)
- hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정
- Spark에서는 이를 파티션 (Partition)이라 부름. 파티션의 기본크기도 128MB
- 기본크기는 조정할 수 있음
- spark.sql.files.maxPartitionBytes: HDFS등에 있는 파일을 읽어올 때만 적용됨
- 다음으로 나눠진 데이터를 각각 따로 동시 처리
- MapReduce에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
- Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨
- 따라서 이론적으로는 한 Excutor에서 여러개의 파티션을 처리할 수 있음
Partitioning과 병렬처리
- 최적의 파티션 개수에 대해서는 의견이 다름
- 보통 적절한 파티션의 수 = Executor의 수 x Executor당 CPU의 수
- 1개 파티션의 크기는 약 100MB에서 300MB사이
- 최적의 Partition 개수를 설정하지 못한다고 하더라도 Executor가 보유한 메모리 보다는 Partition의 크기가 작아야 Spill이 발생하는 상황을 피할 수 있음
- 적절하게 Executor수와 Partition수가 맞아 떨어져야 함.
- 아래에는 4개의 파티션이 있지만 병렬처리 가능한 것은 2개뿐임.
Partition 종류
1. InputPartition
- 처음 파일을 읽을 때 생성되는 Partition
- 해당 Partition을 기본으로 transformation 작업 진행
- Spark config중에서 spark.sql.files.maxpartitionBytes 값을 기준으로 읽어오는 파일의 크기가 크다면 설정된 값 만큼 파일을 쪼개어 partition을 만듬
- 컬럼 기반 저장방식 사용하기에, 필요한 컬럼만 읽어 처리하여 보통 128MB보다 작기에 잘 수정하지 않는 옵션임
2. OutputPartition
- 파일을 저장할 때 생성하는 Partitio
- 해당 Partition의 숫자가 파일을 저장할때 (보통 HDFS에 저장) 마지막 경로의 최종 파일의 개수를 결정함
- 특정 연산을 (groupby나 where 조건 등)할 경우 파일이 너무 잘게 쪼개져, 해당 파티션의 크기를 조정해야 하는 경우가 생김
- Repartition이나 Coaleasce 함수를 사용하여 조절
- 이 두개의 차이는 Shuffle을 하냐 안하냐의 차이
- Repartition : 파티션 개수를 늘리거나 줄일 때 사용, 셔플을 함
- Coaleasce : 파티션 개수를 줄일 때 사용, Shuffle을 하지 않음
- 따라서 파티션 개수를 줄일 때는 Coaleasce를 사용하는 것을 추천
3. ShufflePartition
- Spark 성능에 가장 큰 영향을 끼치는 Partition
- Join이나 Group by같은 Wide연산이 일어나는 경우 해당
- 아래 부분에서 자세히 설명
Shuffling (셔플링)
Spark 데이터 처리 흐름과 문제점
- 데이터프레임은 작은 파티션들로 구성됨
- Spark의 데이터 프레임은 굉장히 큰 데이터 일 수 있기에 파티션이라는 단위로 나뉘어 구성됨
- 즉, 판다스 데이터프레임과의 차이는 크기라고 보면 됨.
- 데이터프레임은 한번 만들어지면 수정 불가 (Immutable)
- 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환 후 저장
- sort, group by, filter, map, join, …
- 그런데 파티션간에 데이터 이동없이 계속 변환이 가능할지에 대한 의문이 있음.
- map, filter는 물리적으로 다른 파티션으로 이동할 일은 없음
- 하지만 group by나 sort는 새 파티션이 필요하기에, 데이터 이동이 필요함
- 새로운 파티션은 데이터 크기가 균등하지 않을 수 있음
Shuffling
- 파티션간에 데이터 이동이 필요한 경우 발생
- Spark에서 Executor간 데이터를 주고 받을 때 무조건 파일로 주고, 받을 파일은 Wirte를 해놓음 (도중에 에러가 발생하여 복구 과정에서 Shuffle file을 다시 만들지 않기 위해서임)
- Shuffle을 위해 파일을 쓰고 읽은 값이 Shuffle Read, Shuffle Write이며 Spark History Server에서 확인 가능
- 셔플링이 발생하는 경우
- 명시적 파티션을 새롭게 하는 경우 (예: 파티션 수를 줄이기)
- 시스템에 의해 이뤄지는 셔플링
- 예를 들면 그룹핑 등의 aggregation이나 sorting
- 셔플링이 발생할 때 네트워크를 타고 데이터가 이동하게 됨
- 몇 개의 파티션이 추가될지는 spark.sql.shuffle.partitions이 결정
- 일반적으로 1개 Partition 크기가 100~200MB 가 되도록 하며 Executor의 메모리를 넘지 않도록 설정
- 적게 설정할 경우 Spill이나 OOM(Out of Memory)가 발생하여 Executor가 중지될 수 있음
- default값은 200이며 이는 최대 파티션 수
- Spark에서 가장 시간이 많이 소요되는 부분이며, 해당 Task 중에 오류가 발생하여 문제가 발생한 경우 Shuffle Fetch Fail 에러를 확인할 수 있음
- 해당 에러는 원인이 다양하며 가장 디버깅하기 어려움.
- 오퍼레이션에 따라 파티션 수가 결정됨
- random, hashing partition, range partition 등등
- Range partition
- 순서가 있는, 정렬된 파티셔닝 (키의 순서에 따라 or 키의 집합의 순서에 따라)
- 특정 범위내에 있는 키들을 동일한 노드로 모으고, 파티션의 크기를 지정해주면 그 크기내에서 데이터를 최대한 분배한다.
- sorting의 경우 range partition을 사용함 (분포에 따라 새로 파티션을 만듬)
- Hashing partition
- Aggregation operation데이터를 여러 파티션에 균일하게 분배하는 방식
- key로 주어진 필드의 값을 hashing function으로 넘기고, 그 값을 만들어지는 파티션의 수로 나눠서 나머지를 어느 파티션으로 보낼지 결정
- 아래의 경우 두개의 파티션으로 충분하다고 판단하여, 나머지연산에서 분모의 값이 2로 들어가게됨
- 이에 따라, 나머지 값이 0이면 1번파티션, 1이면 2번 파티션으로 들어가게됨.
- Data Skewness
- Data partitioning은 데이터 처리에 병렬성을 주지만 단점도 존재
- 이는 데이터가 균등하게 분포하지 않는 경우
- 주로 데이터 셔플링 후에 발생
- 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요.
Spill
- Executor 메모리 영역 중에 Execution Memory(wide transformation 수행시 임시 파일 메모리)나 Storage Memory(cache값)이 터지게 되면 Spill이 발생
- Spill이 발생한 경우
- Disk I/O로 인하여 속도가 느려짐
- Disk I/O 속도가 느려지고 Interrupt발생
- Memory 객체를 binary로 serializer, deserilizer하는 과정에서 resource 소모
- OOM(Out Of Memory)로 인한 Executor precess 에러
- Spark Executor는 on-heap 메모리 영역에 객체를 생성 및 관리하며, 별도로 OOM이 발생하지 않기 위해 메모리를 여유롭게 설정해놓음
- Spill은 off-heap영역을 사용하기 때문에 OS가 OOM으로 Executor를 kill할 수 있음
- Disk I/O로 인하여 속도가 느려짐
참고자료
- [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의
- https://tech.kakao.com/2021/10/08/spark-shuffle-partition/
728x90
반응형
'Spark & Hadoop > Spark' 카테고리의 다른 글
[Spark] Schema Evolution (1) | 2023.10.03 |
---|---|
[Spark] Spark 파일 Type 종류 및 Pyspark로 데이터 Write하는 방법 (Parquet / AVRO) (1) | 2023.10.03 |
[Spark] Windows 10에 Spark 설치하기 (Java설치, 환경변수 세팅) (1) | 2023.10.02 |
[Spark] SparkSession 생성과 환경변수 세팅 (0) | 2023.09.30 |
[Spark] Spark 데이터 구조 (RDD, DataFrame, Dataset) (0) | 2023.09.29 |