Job, Stages, Tasks
- Action -> Job -> 1+ Stages -> 1+ Tasks (큰 순서대로)
- Action
- Job을 하나 만들어내고 코드가 실제로 실행됨
- Lazy Execution
- Job
- 하나 혹은 그 이상의 Stage로 구성됨
- Stage는 Shuffling이 발생하는 경우 새로 생김
- Stage
- Stage는 기본적으로 Shuffling없이 실행될 수 있는 Narrow Dependencies Task들의 집합
- DAG의 형태로 구성된 Task들 존재
- 여기 Task들은 파티션 수 만큼 병렬 실행이 가능
- Task
- 가장 작은 실행 유닛으로 Executor에 의해 실행됨
Lazy Execution /Transformations /Actions
Spark의 연산은 Transformation과 Action으로 구분
Transformation
spark.read.option("header", True).csv(“test.csv”). \
where("gender <> 'F'"). \
select("name", "gender"). \
groupby("gender"). \
count(). \
show()
- 원본 데이터를 수정하지 않고 새로운 데이터를 만드는 연산
- 위 코드에서 WHERE, SELECT, GROUPBY, COUNT에 해당됨
- Narrow Transformation
- 독립적인 Partition level 작업 (데이터 교환 없이 partition을 생성하는 함수)
- select, filter, map, where, count 등등
- Wide Transformation
- 여러개 Partition을 읽어와 합쳐 하나의 Partition을 생성하는 작업으로, Shuffling이 필요한 작업
- groupby, orderby, reduceby, partitionby, repartition 등등
Action
- 결과값을 계산하여 Driver로
- Read, Write, Show, Collect -> Job을 실행시킴 (실제 코드가 실행됨)
Lazy Execution
- 코드를 바로 실행하는것이 아닌 특정 action을 만나기 전까지 실행 보류
- 실행 계획 최적화와 디스크 I/O 및 메모리 사용량이 감소하여 외부 시스템과 효율적으로 연동 가능
- 더 많은 오퍼레이션을 볼 수 있기에 최적화를 더 잘할 수 있음. 그래서 SQL이 더 선호됨
- 실행 계획 최적화 예시 : join후 Fliter -> Filter 후 join
- 집계후 join, 정렬 후 join의 경우는 별도로 최적화하지 않음
Web UI를 통한 시각화
- 상단 코드의 실행 시각화
- 어디서 최적화를 할 수 있는지, 어디서 병목구간이 생기는지 확인가능
- JOB 0
- 처음 데이터를 읽을 때 바로 실행,
- CSV File을 읽을 때 header = True인데 일단 데이터를 읽어야 어떤 스키마를 가지고 있는지 알 수 있기 때문
- 따라서 가장 위에 있는 레코드를 읽는다
- JOB 1
- Show에 의해 trigger가 됨
- Where, select, groupby등이 적용
- 여기서 groupby로 인해 stage가 2개가 생기게 됨 (하나는 Narrow Dependencies Task들의 Stage)
- stage간에는 exchange가 있음
- 이는 shuffling을 의미. groubby gender에 맞춰서 같은 gender의 레코드는 같은 gender로 속하게 파티션이 새로 만들어져야함.
- .option("inferSchema", True)가 추가되면 JOB이 하나 더 추가됨
- 실제로 input데이터를 읽어보면서 각 컬럼의 타입이 무엇인지 정확히 파악하여야 하기 때문
WordCount 코드 예제
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
- Stage Visualization
- Query Visualizaion
- job은 하나
- groupby로 인해 stage가 2개로 나뉘어짐
Shuffle Join 예제
df_large는 크기가 크고, df_small은 크기가 작음
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show()
- Stage Visualization
- Query Visualizaion
- 3개의 job가 만들어짐
- job 0 : df_large 데이터를 읽어올때 만들어짐
- job 1 : df_small 데이터를 읽어올때 만들어짐
- job 2 : show가 실행될때 job이 만들어짐
- 두 데이터 각각을 join key가 되는 id기준으로 각각 shuffling이 되고
- input exchange에서 sorting 하고 같은 id를 갖는 양쪽의 dataframe레코드들을 하나로 묶어줌
- 이 경우 shuffle hashing join이 사용되됨
- 하지만 한쪽 데이터가 작은 경우 overhead 현상을 발생시키기에 broadcasting join 이 유리
BroadCast Join
- broadcast 함수 호출
- 옵션 세팅을 spark.sql.adaptive.autoBroadcastJoinThreshold = True로 세팅
- 바이트 크기보다 작은 데이터가 있을때 join 시 shufflle join 이 아닌 broadcast join으로 진행
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
join_df.show()
참고자료
- [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의
- https://livebook.manning.com/book/spark-in-action/chapter-2/
728x90
반응형
'Spark & Hadoop > Spark' 카테고리의 다른 글
[Spark] Spark 소개 및 Spark관련 정보 모음 (Spark 구성요소, 작동방식 등) (1) | 2024.05.05 |
---|---|
[Spark] Spark 프로그램 구조 (Driver, Executor), Spark Cluster Manager (0) | 2024.05.05 |
[Spark] HDFS Bucketing & Partitioning (Partitioning pyspark 코드 예시) (0) | 2023.10.08 |
[Spark] Schema Evolution (1) | 2023.10.03 |
[Spark] Spark 파일 Type 종류 및 Pyspark로 데이터 Write하는 방법 (Parquet / AVRO) (1) | 2023.10.03 |