Spark & Hadoop/Spark

[Spark] Spark 데이터 처리 방식 (Partitioning, Shuffling, Spill) (InputPartition, OutputPartition, ShufflePartition)(Range partition, Hashing partition, Data Skewness)

YSY^ 2023. 9. 29. 22:21

Spark 데이터 시스템 아키텍처

  • 큰데이터를 ETL하거나 일시적으로 데이터를 select할때는 hive나 presto도 상관 없음
  • 하지만 spark가 각광받는 이유는 하나의 시스템으로 다양한 것들을 할 수 있기 때문
  • 외부데이터의 예 : RDS, Nocurl 등
  • 외부데이터를 주기적으로 ETL을 통해 HDFS로 가져와주어야 함.
  • 이를 위해
    • 1. 주기적으로 ETL작업 진행 (보통 Airflow를 활용)
    • 2. 필요할 때 바로 spark에서 로딩
  • 내부든 외부든 spark에 올라가는 순간 똑같은 데이터가 됨

 

데이터 병렬처리와 파티션(Partition)

  • 데이터가 먼저 분산되어야함
  • 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)
    • hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정
  • Spark에서는 이를 파티션 (Partition)이라 부름. 파티션의 기본크기도 128MB
    • 기본크기는 조정할 수 있음
    • spark.sql.files.maxPartitionBytes: HDFS등에 있는 파일을 읽어올 때만 적용됨
  • 다음으로 나눠진 데이터를 각각 따로 동시 처리
    • MapReduce에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
    • Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됨
    • 따라서 이론적으로는 한 Excutor에서 여러개의 파티션을 처리할 수 있음

Partitioning과 병렬처리

  • 최적의 파티션 개수에 대해서는 의견이 다름
    • 보통 적절한 파티션의 수 = Executor의 수 x Executor당 CPU의 수
    • 1개 파티션의 크기는 약 100MB에서 300MB사이
    • 최적의 Partition 개수를 설정하지 못한다고 하더라도 Executor가 보유한 메모리 보다는 Partition의 크기가 작아야 Spill이 발생하는 상황을 피할 수 있음
  • 적절하게 Executor수와 Partition수가 맞아 떨어져야 함.
    • 아래에는  4개의 파티션이 있지만 병렬처리 가능한 것은 2개뿐임.

Partition 종류

1. InputPartition

  • 처음 파일을 읽을 때 생성되는 Partition
  • 해당 Partition을 기본으로 transformation 작업 진행
  • Spark config중에서 spark.sql.files.maxpartitionBytes 값을 기준으로 읽어오는 파일의 크기가 크다면 설정된 값 만큼 파일을 쪼개어 partition을 만듬
  • 컬럼 기반 저장방식 사용하기에, 필요한 컬럼만 읽어 처리하여 보통 128MB보다 작기에 잘 수정하지 않는 옵션임

2. OutputPartition

  • 파일을 저장할 때 생성하는 Partitio
  • 해당 Partition의 숫자가 파일을 저장할때 (보통 HDFS에 저장) 마지막 경로의 최종 파일의 개수를 결정함
  • 특정 연산을 (groupby나 where 조건 등)할 경우 파일이 너무 잘게 쪼개져, 해당 파티션의 크기를 조정해야 하는 경우가 생김
  • Repartition이나 Coaleasce 함수를 사용하여 조절
    • 이 두개의 차이는 Shuffle을 하냐 안하냐의 차이 
    • Repartition : 파티션 개수를 늘리거나 줄일 때 사용, 셔플을 함
    • Coaleasce : 파티션 개수를 줄일 때 사용, Shuffle을 하지 않음
      • 따라서 파티션 개수를 줄일 때는 Coaleasce를 사용하는 것을 추천

3. ShufflePartition

  • Spark 성능에 가장 큰 영향을 끼치는 Partition
  • Join이나 Group by같은 Wide연산이 일어나는 경우 해당
  • 아래 부분에서 자세히 설명

Shuffling (셔플링)

Spark 데이터 처리 흐름과 문제점

  • 데이터프레임은 작은 파티션들로 구성됨
    • Spark의 데이터 프레임은 굉장히 큰 데이터 일 수 있기에 파티션이라는 단위로 나뉘어 구성됨
    • 즉, 판다스 데이터프레임과의 차이는 크기라고 보면 됨.
  • 데이터프레임은 한번 만들어지면 수정 불가 (Immutable)
  • 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환 후 저장
    • sort, group by, filter, map, join, …
  • 그런데 파티션간에 데이터 이동없이 계속 변환이 가능할지에 대한 의문이 있음.
    • map, filter는 물리적으로 다른 파티션으로 이동할 일은 없음
    • 하지만 group by나 sort는 새 파티션이 필요하기에, 데이터 이동이 필요함
    • 새로운 파티션은 데이터 크기가 균등하지 않을 수 있음

Shuffling

  • 파티션간에 데이터 이동이 필요한 경우 발생 
    • Spark에서 Executor간 데이터를 주고 받을 때 무조건 파일로 주고, 받을 파일은 Wirte를 해놓음 (도중에 에러가 발생하여 복구 과정에서 Shuffle file을 다시 만들지 않기 위해서임)
    • Shuffle을 위해 파일을 쓰고 읽은 값이 Shuffle Read, Shuffle Write이며 Spark History Server에서 확인 가능
  • 셔플링이 발생하는 경우
    • 명시적 파티션을 새롭게 하는 경우 (예: 파티션 수를 줄이기)
    • 시스템에 의해 이뤄지는 셔플링
      • 예를 들면 그룹핑 등의 aggregation이나 sorting
  • 셔플링이 발생할 때 네트워크를 타고 데이터가 이동하게 됨
    • 몇 개의 파티션이 추가될지는 spark.sql.shuffle.partitions이 결정
    • 일반적으로 1개 Partition 크기가 100~200MB 가 되도록 하며 Executor의 메모리를 넘지 않도록 설정
    • 적게 설정할 경우 Spill이나 OOM(Out of Memory)가 발생하여 Executor가 중지될 수 있음
    • default값은 200이며 이는 최대 파티션 수
    • Spark에서 가장 시간이 많이 소요되는 부분이며, 해당 Task 중에 오류가 발생하여 문제가 발생한 경우 Shuffle Fetch Fail 에러를 확인할 수 있음
      • 해당 에러는 원인이 다양하며 가장 디버깅하기 어려움.
    • 오퍼레이션에 따라 파티션 수가 결정됨
      • random, hashing partition, range partition 등등
  • Range partition
    • 순서가 있는, 정렬된 파티셔닝 (키의 순서에 따라 or 키의 집합의 순서에 따라)
    • 특정 범위내에 있는 키들을 동일한 노드로 모으고, 파티션의 크기를 지정해주면 그 크기내에서 데이터를 최대한 분배한다.
    • sorting의 경우 range partition을 사용함 (분포에 따라 새로 파티션을 만듬)
      Range partition 예시
  • Hashing partition
    • Aggregation operation데이터를 여러 파티션에 균일하게 분배하는 방식
    • key로 주어진 필드의 값을 hashing function으로 넘기고, 그 값을 만들어지는 파티션의 수로 나눠서 나머지를 어느 파티션으로 보낼지 결정
    • 아래의 경우 두개의 파티션으로 충분하다고 판단하여, 나머지연산에서 분모의 값이 2로 들어가게됨
      • 이에 따라, 나머지 값이 0이면 1번파티션, 1이면 2번 파티션으로 들어가게됨.

  • Data Skewness
    • Data partitioning은 데이터 처리에 병렬성을 주지만 단점도 존재
    • 이는 데이터가 균등하게 분포하지 않는 경우
      • 주로 데이터 셔플링 후에 발생
    • 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요.

 

Spill

Spill이 발생하는 경우
  • Executor 메모리 영역 중에 Execution Memory(wide transformation 수행시 임시 파일 메모리)나 Storage Memory(cache값)이 터지게 되면 Spill이 발생
  • Spill이 발생한 경우
    1. Disk I/O로 인하여 속도가 느려짐
      • Disk I/O 속도가 느려지고 Interrupt발생
      • Memory 객체를 binary로 serializer, deserilizer하는 과정에서 resource 소모
    2. OOM(Out Of Memory)로 인한 Executor precess 에러
      • Spark Executor는 on-heap 메모리 영역에 객체를 생성 및 관리하며, 별도로 OOM이 발생하지 않기 위해 메모리를 여유롭게 설정해놓음
      • Spill은 off-heap영역을 사용하기 때문에 OS가 OOM으로 Executor를 kill할 수 있음

 

참고자료

 
728x90
반응형