Spark & Hadoop/Spark

[Spark] Spark Action의 구성요소(Job, Stages, Tasks)와 Spark의 연산 (Transformations /Actions / Lazy Execution)

YSY^ 2024. 5. 5. 19:50

Job, Stages, Tasks

  • Action -> Job -> 1+ Stages -> 1+ Tasks (큰 순서대로)
  • Action
    • Job을 하나 만들어내고 코드가 실제로 실행됨
    • Lazy Execution
  • Job
    • 하나 혹은 그 이상의 Stage로 구성됨
    • Stage는 Shuffling이 발생하는 경우 새로 생김
  • Stage
    • Stage는 기본적으로 Shuffling없이 실행될 수 있는 Narrow Dependencies Task들의 집합
    • DAG의 형태로 구성된 Task들 존재
    • 여기 Task들은 파티션 수 만큼 병렬 실행이 가능
  • Task
    • 가장 작은 실행 유닛으로 Executor에 의해 실행됨

Stage의 예

Lazy Execution /Transformations /Actions

Spark의 연산은 Transformation과 Action으로 구분

Transformation

spark.read.option("header", True).csv(“test.csv”). \
 where("gender <> 'F'"). \
 select("name", "gender"). \
 groupby("gender"). \
 count(). \
 show()
  • 원본 데이터를 수정하지 않고 새로운 데이터를 만드는 연산
  • 위 코드에서 WHERE, SELECT, GROUPBY, COUNT에 해당됨
  • Narrow Transformation
    • 독립적인 Partition level 작업 (데이터 교환 없이 partition을 생성하는 함수)
    • select, filter, map, where, count 등등
  • Wide Transformation
    • 여러개 Partition을 읽어와 합쳐 하나의 Partition을 생성하는 작업으로, Shuffling이 필요한 작업
    • groupby, orderby, reduceby, partitionby, repartition 등등

Narrow Transformation VS  Wide Transformation

Action

  • 결과값을 계산하여 Driver로
    • Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨)

Lazy Execution

  • 코드를 바로 실행하는것이 아닌 특정 action을 만나기 전까지 실행 보류
  • 실행 계획 최적화와 디스크 I/O 및 메모리 사용량이 감소하여 외부 시스템과 효율적으로 연동 가능
  • 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음. 그래서 SQL이 더 선호됨
  • 실행 계획 최적화 예시 : join후 Fliter -> Filter 후 join
  • 집계후 join, 정렬 후 join의 경우는 별도로 최적화하지 않음

 

Web UI를 통한 시각화

  • 상단 코드의 실행 시각화
  • 어디서 최적화를 할 수 있는지, 어디서 병목구간이 생기는지 확인가능

  • JOB 0 
    • 처음 데이터를 읽을 때 바로 실행,
    • CSV File을 읽을 때 header = True인데 일단 데이터를 읽어야 어떤 스키마를 가지고 있는지 알 수 있기 때문
    • 따라서 가장 위에 있는 레코드를 읽는다
  • JOB 1
    • Show에 의해 trigger가 됨
    • Where, select, groupby등이 적용
    • 여기서 groupby로 인해 stage가 2개가 생기게 됨 (하나는 Narrow Dependencies Task들의 Stage)
    • stage간에는 exchange가 있음
    • 이는 shuffling을 의미. groubby gender에 맞춰서 같은 gender의 레코드는 같은 gender로 속하게 파티션이 새로 만들어져야함.
  • .option("inferSchema", True)가 추가되면 JOB이 하나 더 추가됨
    • 실제로 input데이터를 읽어보면서 각 컬럼의 타입이 무엇인지 정확히 파악하여야 하기 때문

 

WordCount 코드 예제

spark = SparkSession \
 .builder \
 .master("local[3]") \
 .appName("SparkSchemaDemo") \
 .config("spark.sql.adaptive.enabled", False) \
 .config("spark.sql.shuffle.partitions", 3) \
 .getOrCreate()
df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
  • Stage Visualization

  • Query Visualizaion

  • job은 하나
  • groupby로 인해 stage가 2개로 나뉘어짐

Shuffle Join 예제

df_large는 크기가 크고, df_small은 크기가 작음

spark = SparkSession \
 .builder \
 .master("local[3]") \
 .appName("SparkSchemaDemo") \
 .config("spark.sql.adaptive.enabled", False) \
 .config("spark.sql.shuffle.partitions", 3) \
 .getOrCreate()
 
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")

join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show()
  • Stage Visualization

  • Query Visualizaion

  • 3개의 job가 만들어짐
    • job 0 : df_large  데이터를 읽어올때 만들어짐
    • job 1 : df_small  데이터를 읽어올때 만들어짐
    • job 2 : show가 실행될때 job이 만들어짐
      • 두 데이터 각각을 join key가 되는 id기준으로 각각 shuffling이 되고 
      • input exchange에서 sorting 하고 같은 id를 갖는 양쪽의 dataframe레코드들을 하나로 묶어줌
      • 이 경우 shuffle hashing join이 사용되됨
      • 하지만 한쪽 데이터가 작은 경우 overhead 현상을 발생시키기에 broadcasting join 이 유리

BroadCast Join

  1. broadcast 함수 호출
  2. 옵션 세팅을 spark.sql.adaptive.autoBroadcastJoinThreshold = True로 세팅
    • 바이트 크기보다 작은 데이터가 있을때 join 시 shufflle join 이 아닌 broadcast join으로 진행
from pyspark.sql.functions import broadcast
spark = SparkSession \
 .builder \
 .master("local[3]") \
 .appName("SparkSchemaDemo") \
 .config("spark.sql.adaptive.enabled", False) \
 .config("spark.sql.shuffle.partitions", 3) \
 .getOrCreate()
 
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")

join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
join_df.show()

참고자료

728x90
반응형