Spark & Hadoop/Pyspark & SparkSQL

[SparkSQL] JOIN의 종류 (INNER, LEFT, RIGHT, FULL(OUTER), CROSS, SELF JOIN) (Shuffle JOIN, Broadcast JOIN)

YSY^ 2023. 10. 3. 16:30

JOIN

  • SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 Merge
  • 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용
  • 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면
    • JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성
    • 조인의 방식에 따라 어떤 레코드들이 선택되는지, 어떤 필드들이 채워지가 달라짐

Join의 종류

https://theartofpostgresql.com/blog/2019-09-sql-joins/

SparkSession 생성 및 사용할 테이블 생성

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "sparksql join")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()
        
vital = [
     { 'UserID': 100, 'VitalID': 1, 'Date': '2020-01-01', 'Weight': 75 },
     { 'UserID': 100, 'VitalID': 2, 'Date': '2020-01-02', 'Weight': 78 },
     { 'UserID': 101, 'VitalID': 3, 'Date': '2020-01-01', 'Weight': 90 },
     { 'UserID': 101, 'VitalID': 4, 'Date': '2020-01-02', 'Weight': 95 },
]

alert = [
    { 'AlertID': 1, 'VitalID': 4, 'AlertType': 'WeightIncrease', 'Date': '2020-01-01', 'UserID': 101},
    { 'AlertID': 2, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-04', 'UserID': 100},
    { 'AlertID': 3, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-05', 'UserID': 101}
]

df_vital.createOrReplaceTempView("Vital")
df_alert.createOrReplaceTempView("Alert")

Inner JOIN

  • 양쪽 테이블에서 매치가 되는 레코드들만 리턴함
  • 양쪽 테이블의 필드가 모두 채워진 상태로 리턴됨
# INNER JOIN
df_inner_join = spark.sql(
    """
    SELECT  * 
      FROM  Vital v
            INNER JOIN Alert a
            ON v.vitalID = a.vitalID;
    """
)
df_inner_join.show()

LEFT JOIN

  • 왼쪽 테이블(Base)의 모든 레코드들을 리턴함
  • 오른쪽 테이블의 필드는 왼쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
# LEFT JOIN
df_left_join = spark.sql(
    """
    SELECT  * 
      FROM  Vital v
            LEFT JOIN Alert a
            ON v.vitalID = a.vitalID;
    """
)
df_left_join.show()

RIGHT JOIN

  • 오른쪽 테이블(Base)의 모든 레코드들을 리턴함
  • 왼쪽 테이블의 필드는 오른쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
# RIGHT JOIN
df_right_join = spark.sql(
    """
    SELECT  * 
      FROM  Vital v
            RIGHT JOIN Alert a
            ON v.vitalID = a.vitalID;
    """
)
df_right_join.show()

FULL(OUTER) JOIN

  • 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들을 리턴함
  • 매칭되는 경우에만 양쪽 테이블들의 모든 필드들이 채워진 상태로 리턴됨
# OUTER JOIN
df_outer_join = spark.sql(
    """
    SELECT  * 
      FROM  Vital v
            FULL JOIN Alert a
            ON v.vitalID = a.vitalID;
    """
)
df_outer_join.show()

CROSS JOIN

  • 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들의 조합을 리턴함
  • JOIN할 key조건을 주지 않는다.
# CROSS JOIN
df_cross_join = spark.sql(
    """
    SELECT  * 
      FROM  Vital v
            CROSS JOIN Alert a
    """
)
df_cross_join.show()

SELF JOIN

  • 동일한 테이블을 alias를 달리해서 자기 자신과 조인함
# SELF JOIN
df_self_join = spark.sql(
    """
    SELECT  * 
      FROM  Vital v1
            JOIN Vital v2
            ON v1.vitalID = v2.vitalID
    """
)
df_self_join.show()

최적화 관점에서 본 조인

Shuffle JOIN

  • 일반 조인 방식
  • Bucket JOIN: 조인 키를 바탕으로 Left, Right 데이터프레임의 파티션을 새로 만들고 조인을 하는 방식
  • JOIN을 수행하면 조인키가 같은 레코드들을 셔플링을 통해 같은 Partition으로 복사해줌
    • 즉 hashing을 통한 shuffling
  • 문제는 수많은 partition 이동이 발생하고 파티션들의 크기가 다를 수 있음(Data skew)
    • 병렬처리의 이점이 없어짐

Broadcast JOIN

  • 큰 데이터와 작은 데이터 간의 조인
  • 데이터 프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌리는 것 (broadcasting)
    • spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
  • RIGHT 데이터프레임 전체를 LEFT 데이터프레임의 파티션들이 있는 서버로 복사해서 Shuffling의 양을 최소화

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

 

728x90
반응형