Spark & Hadoop/Hadoop

[Hadoop] 맵리듀스(MapReduce) Programming

YSY^ 2023. 8. 9. 20:56

맵리듀스 프로그래밍 특징

  • 큰 데이터를 처리할 수 있는데에 목표
  • 데이터 셋의 포맷도 하나로 단순화하였고, 변경 불가
    • 데이터 셋의 포멧은 Key, Value의 집합이며 변경 불가(immutable)
  • 데이터 조작은 map과 reduce 두 개의 오퍼레이션으로만 가능
    • map는 입력으로 들어온 key, value를 다른 key, value나 key, value의 집합으로 만들어줌
      • map의 output가 없을 수도 있음
    • reduce : map의 출력 중(key, value)에 같은 key의 출력들을 모아서 처리해서 새로운 key, value를 만들어 주는 것
    • 이 두 오퍼레이션은 항상 하나의 쌍으로 연속으로 실행됨
    • 이 두 오퍼레이션의 코드를 개발자가 채워야함
      • 입력이되는 HDFS와 출력 HDFS 위치도 지정해주어야 함
  • 맵리듀스 시스템이 Map의 결과를 Reduce단으로 모아줌
    • 이 단계를 보통 셔플링이라 부르며 네트웍단을 통한 데이터 이동이 생김
    • 데이터 전송량에 따라 전체 오퍼레이션이 걸리는 시간이 길어질 수 있음
  • 하둡 1.0에서는 Map코드와 Reduct코드가 Task Manager에 의해 동작
  • 하둡 2.0에서는 Yarn의 노드 매니저를 통해 컨테이너가 할당 되고, 컨테이너 안에서 Task의 형태로 실행
  • Map의 Input로 들어가는 데이터의 크기와 파일 수에 따라 Map의 Task 수가 결정
  • Reduce의 Task의 수는 개발자가 지정 (Hadoop의 맹점 중 하나)

맵 & 리듀스

맵리듀스 프로그래밍의 핵심

Map

  • (k, v) -> [(k', v')*]
  • 입력은 시스템에 의해 주어지며 입력으로 지정된 HDFS 파일에서 넘어옴
  • key, value 페어를 새로운 key, value 페어 리스트로 변환 (transformation)
  • 출력: 입력과 동일한 key, value 페어를 그대로 출력해도 되고 출력이 없어도 됨

Reduce

  • (k’, [v1’, v2’, v3’, v4’, …]) -> (k’’, v'')
  • 입력은 시스템에 의해 주어짐
    • 맵의 출력 중 같은 키를 갖는 key, value 페어를 시스템이 묶어서 입력으로 넣어줌
  • key, value 리스트를 새로운 key, value 페어로 변환
  • SQL의 GROUP BY와 흡사
  • 출력이 HDFS에 저장됨

맵리듀스 프로그램 동작 예시

Map의 입력이 Key Value 형식이어야함.

  • 위 케이스는 데이터가 하나씩 밖에 없기에 Key, Value를 지정할 수 없음
  • 이 경우는 key를 아무 랜덤값을 주고 Value에 파싱할 텍스트를 지정 (반대로 해도 상관 없음)
  • 아래와 같이 프로그래밍을 해야하기에 복잡하여 하둡 2.0으로 넘어가도록 하는 니즈가 발생

Mapper

  • Map: (k, v) -> [(k', v')*]
  • Transformation
    • key, value 리스트를 새로운 key, value 리스트로 변환

EX)
Input: (100, “the brave yellow lion”)
Output:[(“the”, 1),(“brave”, 1),(“yellow”, 1),(“lion”, 1)]

 public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

 

Reducer

  • (k’, [v1’, v2’, v3’, v4’, …]) -> (k’’, v'')
  • key, value 리스트를 새로운 key, value 페어로 변환

EX)
Input: ("lion": [1, 1, 1])
Output: ("lion": 3)

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

Shuffling and Sorting

  • Shuffling
    • Mapper의 출력을 Reducer로 보내주는 프로세스를 말함
    • 전송되는 데이터의 크기가 크면 네트웍 병목을 초래하고 시간이 오래 걸림
  • Sorting
    • Reduce 코드 실행전 여러개의 Map테스크에서 출력으로 나온 레코드들을 하나로 묶어줌
    • 상단에 나온 사진 같은 경우 2개의 Reduce로 묶은 것. (아래 사진은 1개의 Reduce로 묶음)

Data Skew

  • 특정 Reduce로 데이터가 많이 몰리는 데이터 skewed가(비대칭) 발생할 수 있음
  • 각 태스크가 처리하는 데이터 크기에 불균형이 존재하는 경우
    • 병렬처리의 큰 의미가 없음. 가장 느린 태스크가 전체 처리 속도를 결정
  • 특히 Reducer로 오는 데이터 크기는 큰 차이가 있을 수 있음
    • Group By나 Join등에 이에 해당함
    • 처리 방식에 따라 Reducer의 수에 따라 메모리 에러등이 날 수 있음
  • 데이터 엔지니어가 고생하는 이유 중의 하나
    • 빅데이터 시스템에는 이 문제가 모두 존재

맵리듀스 프로그래밍 문제점

  1. 낮은 생산성
    • 프로그래밍 모델이 가진 융통성 부족
    • 2가지 오퍼레이션만 지원하고 데이터 형식도 고정되어 있기 때문
    • 튜닝/최적화가 쉽지 않음
      • EX) 데이터 분포가 균등하지 않은 경우
  2. 배치작업 중심
    • 기본적으로 Low Latency(대기시간 최소화)가 아니라 Throughput(처리량)에 초점이 맞춰짐

MapReduce의 대안

  • 더 범용적인 대용량 데이터 처리 프레임웍들의 등장
    • YARN, Spark
  • SQL의 컴백: Hive, Presto등이 등장
  • Hive
    • MapReduce위에서 구현됨. Throughput에 초점. 대용량 ETL에 적합
  • Presto
    • Low latency에서 초점. 메모리를 주로 사용. Adhoc 쿼리에 적합
    • AWS Athena가 Presto 기반
  • 요즘은 Hive와 Presto가 비슷해짐

 

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다.

728x90
반응형