Spark & Hadoop/Pyspark & SparkSQL

[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정)

YSY^ 2023. 10. 2. 18:17

해당 포스팅에서는 아래 데이터를 활용합니다.

customer-orders.csv
0.14MB

사전작업

1. spark session을 세팅한다.

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark DataFrame #2')\
        .getOrCreate()

2. 스키마 이름과 타입을 지정하여 데이터를 불러온다

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType([ \
                     StructField("cust_id", StringType(), True), \
                     StructField("item_id", StringType(), True), \
                     StructField("amount_spent", FloatType(), True)])
df = spark.read.schema(schema).csv("customer-orders.csv")
df.printSchema()

 

Pyspark에서 groupBy() 활용하여 집계하는 방법

위 데이터에서 cust_id별로 amount_spent의 총합을 계산한다.

1. pandas와 같은 방법으로 groupby후 sum한다.

df_ca = df.groupBy("cust_id").sum("amount_spent")

위와 같이 출력된다. 다만 집계된 컬럼의 이름을 다시 지정해줄 필요가 있어보인다.

2. groupby시 컬럼 이름을 지정한다.

2-1) withColumnRenamed을 사용한다.

  • sum(amount_spent) 컬럼이름을 "sum"으로 대체한다.
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")

2-2) (추천) agg()함수를 사용하여 sum이라는 함수를 불러온다.

import pyspark.sql.functions as f

df_ca = df.groupBy("cust_id") \
   .agg(f.sum('amount_spent').alias('sum'))

 

3. 최대값이나 평균도 추가로 구해본다.

df.groupBy("cust_id") \
   .agg(
       f.sum('amount_spent').alias('sum'),
       f.max('amount_spent').alias('max'),
       f.avg('amount_spent').alias('avg')).collect()

 

SparkSQL

1. 데이터프레임을 table view로 만들어준다.

df.createOrReplaceTempView("customer_orders")

2. 쿼리를 작성하여 데이터를 호출한다.

spark.sql(
    """
    SELECT  cust_id
            , SUM(amount_spent) sum
            , MAX(amount_spent) max
            , AVG(amount_spent) avg
      FROM  customer_orders
     GROUP
        BY  1
    """
 ).head(5)

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

728x90
반응형