Spark & Hadoop/Pyspark & SparkSQL

[Pyspark] DataFrame Join 및 Unique User Count

YSY^ 2023. 10. 2. 22:06

해당 포스팅에서는 pyspark로 두 데이터를 Join 후, 년월별로 Distinct한 User를 Count하는 방법을 소개한다.

1. sparksession을 세팅한다.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame #5") \
    .getOrCreate()

 

2. 데이터 2개를 호출한다.

df_user_session_channel = spark.read.csv("df_user_session_channel.csv", header=True)
df_session_timestamp = spark.read.csv("df_session_timestamp.csv", header=True)

df_user_session_channel
df_session_timestamp

3. 두 데이터를 join할 key를 선정하고 inner join을 한다.

  • 위 데이터를 sessionid를 기준으로 join한다
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")

4. join한 데이터에서 필요한 컬럼을 선택한다.

중요한 것은 컬럼을 선택할때, 두 데이터에서 겹치는 컬럼 (ex. key가 되는 sessionid 컬럼)은 무조건 테이블 명을 명시해주어야 한다. (ex. df_user_session_channel.sessioni)

session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
    "userid", df_user_session_channel.sessionid, "channel", "ts"
)

 

5. 가장 많은 체널이 어떤것인지 count 메소드로 집계하고 내림차순으로 정렬한다.

channel_count_df = session_df.groupby("channel").count().orderBy("count", ascending=False)
channel_count_df.show()

6. 년월별 mau를 계산한다

  • date_format함수를 활용하여 ts컬럼을 년월(yyyy-MM)으로 바꿔주고 month라고 지정한다.
  • countDistinct : Unique한 user 수를 count한다.
  • alias("mau") : countDistinct("userid")한 컬럼의 이름을 "mau"로 지정한다.
from pyspark.sql.functions import date_format, asc, countDistinct

session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
    agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()

728x90
반응형