해당 포스팅에서는 pyspark로 두 데이터를 Join 후, 년월별로 Distinct한 User를 Count하는 방법을 소개한다.
1. sparksession을 세팅한다.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("PySpark DataFrame #5") \
.getOrCreate()
2. 데이터 2개를 호출한다.
df_user_session_channel = spark.read.csv("df_user_session_channel.csv", header=True)
df_session_timestamp = spark.read.csv("df_session_timestamp.csv", header=True)
3. 두 데이터를 join할 key를 선정하고 inner join을 한다.
- 위 데이터를 sessionid를 기준으로 join한다
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
4. join한 데이터에서 필요한 컬럼을 선택한다.
중요한 것은 컬럼을 선택할때, 두 데이터에서 겹치는 컬럼 (ex. key가 되는 sessionid 컬럼)은 무조건 테이블 명을 명시해주어야 한다. (ex. df_user_session_channel.sessioni)
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
"userid", df_user_session_channel.sessionid, "channel", "ts"
)
5. 가장 많은 체널이 어떤것인지 count 메소드로 집계하고 내림차순으로 정렬한다.
channel_count_df = session_df.groupby("channel").count().orderBy("count", ascending=False)
channel_count_df.show()
6. 년월별 mau를 계산한다
- date_format함수를 활용하여 ts컬럼을 년월(yyyy-MM)으로 바꿔주고 month라고 지정한다.
- countDistinct : Unique한 user 수를 count한다.
- alias("mau") : countDistinct("userid")한 컬럼의 이름을 "mau"로 지정한다.
from pyspark.sql.functions import date_format, asc, countDistinct
session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()
728x90
반응형
'Spark & Hadoop > Pyspark & SparkSQL' 카테고리의 다른 글
[SparkSQL] JOIN의 종류 (INNER, LEFT, RIGHT, FULL(OUTER), CROSS, SELF JOIN) (Shuffle JOIN, Broadcast JOIN) (1) | 2023.10.03 |
---|---|
[SparkSQL] SparkSQL이란 (0) | 2023.10.03 |
[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy) (1) | 2023.10.02 |
[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정) (0) | 2023.10.02 |