해당 포스팅에서는 아래 데이터를 활용합니다.
사전작업
1. spark session을 세팅한다.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #2')\
.getOrCreate()
2. 스키마 이름과 타입을 지정하여 데이터를 불러온다
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([ \
StructField("cust_id", StringType(), True), \
StructField("item_id", StringType(), True), \
StructField("amount_spent", FloatType(), True)])
df = spark.read.schema(schema).csv("customer-orders.csv")
df.printSchema()
Pyspark에서 groupBy() 활용하여 집계하는 방법
위 데이터에서 cust_id별로 amount_spent의 총합을 계산한다.
1. pandas와 같은 방법으로 groupby후 sum한다.
df_ca = df.groupBy("cust_id").sum("amount_spent")
2. groupby시 컬럼 이름을 지정한다.
2-1) withColumnRenamed을 사용한다.
- sum(amount_spent) 컬럼이름을 "sum"으로 대체한다.
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
2-2) (추천) agg()함수를 사용하여 sum이라는 함수를 불러온다.
import pyspark.sql.functions as f
df_ca = df.groupBy("cust_id") \
.agg(f.sum('amount_spent').alias('sum'))
3. 최대값이나 평균도 추가로 구해본다.
df.groupBy("cust_id") \
.agg(
f.sum('amount_spent').alias('sum'),
f.max('amount_spent').alias('max'),
f.avg('amount_spent').alias('avg')).collect()
SparkSQL
1. 데이터프레임을 table view로 만들어준다.
df.createOrReplaceTempView("customer_orders")
2. 쿼리를 작성하여 데이터를 호출한다.
spark.sql(
"""
SELECT cust_id
, SUM(amount_spent) sum
, MAX(amount_spent) max
, AVG(amount_spent) avg
FROM customer_orders
GROUP
BY 1
"""
).head(5)
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형
'Spark & Hadoop > Pyspark & SparkSQL' 카테고리의 다른 글
[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy) (1) | 2023.10.02 |
---|---|
[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select) (0) | 2023.10.02 |
[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법) (0) | 2023.10.02 |
[Pyspark] pyspark설치 및 데이터 읽어오기, RDD&Python객체&DataFrame 변환 (0) | 2023.09.30 |