JOIN
- SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 Merge
- 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용
- 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면
- JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성
- 조인의 방식에 따라 어떤 레코드들이 선택되는지, 어떤 필드들이 채워지가 달라짐
Join의 종류
SparkSession 생성 및 사용할 테이블 생성
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "sparksql join")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
vital = [
{ 'UserID': 100, 'VitalID': 1, 'Date': '2020-01-01', 'Weight': 75 },
{ 'UserID': 100, 'VitalID': 2, 'Date': '2020-01-02', 'Weight': 78 },
{ 'UserID': 101, 'VitalID': 3, 'Date': '2020-01-01', 'Weight': 90 },
{ 'UserID': 101, 'VitalID': 4, 'Date': '2020-01-02', 'Weight': 95 },
]
alert = [
{ 'AlertID': 1, 'VitalID': 4, 'AlertType': 'WeightIncrease', 'Date': '2020-01-01', 'UserID': 101},
{ 'AlertID': 2, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-04', 'UserID': 100},
{ 'AlertID': 3, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-05', 'UserID': 101}
]
df_vital.createOrReplaceTempView("Vital")
df_alert.createOrReplaceTempView("Alert")
Inner JOIN
- 양쪽 테이블에서 매치가 되는 레코드들만 리턴함
- 양쪽 테이블의 필드가 모두 채워진 상태로 리턴됨
# INNER JOIN
df_inner_join = spark.sql(
"""
SELECT *
FROM Vital v
INNER JOIN Alert a
ON v.vitalID = a.vitalID;
"""
)
df_inner_join.show()
LEFT JOIN
- 왼쪽 테이블(Base)의 모든 레코드들을 리턴함
- 오른쪽 테이블의 필드는 왼쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
# LEFT JOIN
df_left_join = spark.sql(
"""
SELECT *
FROM Vital v
LEFT JOIN Alert a
ON v.vitalID = a.vitalID;
"""
)
df_left_join.show()
RIGHT JOIN
- 오른쪽 테이블(Base)의 모든 레코드들을 리턴함
- 왼쪽 테이블의 필드는 오른쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
# RIGHT JOIN
df_right_join = spark.sql(
"""
SELECT *
FROM Vital v
RIGHT JOIN Alert a
ON v.vitalID = a.vitalID;
"""
)
df_right_join.show()
FULL(OUTER) JOIN
- 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들을 리턴함
- 매칭되는 경우에만 양쪽 테이블들의 모든 필드들이 채워진 상태로 리턴됨
# OUTER JOIN
df_outer_join = spark.sql(
"""
SELECT *
FROM Vital v
FULL JOIN Alert a
ON v.vitalID = a.vitalID;
"""
)
df_outer_join.show()
CROSS JOIN
- 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들의 조합을 리턴함
- JOIN할 key조건을 주지 않는다.
# CROSS JOIN
df_cross_join = spark.sql(
"""
SELECT *
FROM Vital v
CROSS JOIN Alert a
"""
)
df_cross_join.show()
SELF JOIN
- 동일한 테이블을 alias를 달리해서 자기 자신과 조인함
# SELF JOIN
df_self_join = spark.sql(
"""
SELECT *
FROM Vital v1
JOIN Vital v2
ON v1.vitalID = v2.vitalID
"""
)
df_self_join.show()
최적화 관점에서 본 조인
Shuffle JOIN
- 일반 조인 방식
- Bucket JOIN: 조인 키를 바탕으로 Left, Right 데이터프레임의 파티션을 새로 만들고 조인을 하는 방식
- JOIN을 수행하면 조인키가 같은 레코드들을 셔플링을 통해 같은 Partition으로 복사해줌
- 즉 hashing을 통한 shuffling
- 문제는 수많은 partition 이동이 발생하고 파티션들의 크기가 다를 수 있음(Data skew)
- 병렬처리의 이점이 없어짐
Broadcast JOIN
- 큰 데이터와 작은 데이터 간의 조인
- 데이터 프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌리는 것 (broadcasting)
- spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
- RIGHT 데이터프레임 전체를 LEFT 데이터프레임의 파티션들이 있는 서버로 복사해서 Shuffling의 양을 최소화
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형
'Spark & Hadoop > Pyspark & SparkSQL' 카테고리의 다른 글
[Pyspark/SparkSQL] Hive-메타스토어 사용 (테이블 생성방법) (Managed Table, External Table) (1) | 2023.10.03 |
---|---|
[Pyspark/SparkSQL] UDF(User Defined Function) (사용자 정의 함수) (1) | 2023.10.03 |
[SparkSQL] SparkSQL이란 (0) | 2023.10.03 |
[Pyspark] DataFrame Join 및 Unique User Count (0) | 2023.10.02 |
[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy) (1) | 2023.10.02 |