Spark & Hadoop/Pyspark & SparkSQL

[Pyspark/SparkSQL] UDF(User Defined Function) (사용자 정의 함수)

YSY^ 2023. 10. 3. 17:15

UDF(User Defined Function)

  • DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수
  • Scalar 함수 vs. Aggregation 함수
    • Scalar 함수 예: UPPER, LOWER, …
    • Aggregation 함수 (UDAF) 예: SUM, MIN, MAX

함수 등록

  • pyspark.sql.functions.udf
    • DataFrame에서만 사용 가능
  • spark.udf.register
    • SQL 모두에서 사용 가능

Sparksession 생성

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark UDF") \
    .getOrCreate()

데이터 생성

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

영문자를 대문자로 바꿔주는 UDF 만들기

1. lambda 함수 사용

import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))

2. 파이썬 함수 사용

def upper_udf(s):
    return s.upper()
    
upperUDF = F.udf(upper_udf, StringType())   

df.withColumn("Curated Name", upperUDF("Name")) \
   .show(truncate=False)
  • pyspark의 select 사용
df.select("Name", upperUDF("Name").alias("Curated Name")).show()

  • SparkSQL에서의 사용
df.createOrReplaceTempView("test")
spark.sql("""
SELECT  name
        , upper_udf(name) `Curated Name`
 FROM  test
"""
).show()

3. Python Pandas 함수 사용

  • 집합단위로 conversion이 일어나기에 performance가 좋음
  • pyspark.sql.functions.pandas_udf로 annotation
from pyspark.sql.functions import pandas_udf
import pandas as pd

# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
    return s.str.upper()
  • pyspark의 select 사용
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
df.select("name", upperUDF("name")).show()

위 코드들의 결과는 모두 똑같이 아래처럼 출력된다.

두 수를 더해주는 UDF 만들기

데이터 생성

data = [
    {"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

df = spark.createDataFrame(data)

1. lambda 함수 사용

df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b")).show()

2. 파이썬 함수 사용

def plus(x, y):
    return x + y
  • pyspark의 select 사용
plusUDF = spark.udf.register("plus", plus)
df.withColumn("p", plusUDF("a", "b")).show()
  • SparkSQL에서의 사용
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) p FROM test").show()

평균을 내는 UDF 만들기

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Define the UDF
@pandas_udf(FloatType())
def average_udf_f(v: pd.Series) -> float:
    return v.mean()
  • pyspark의 agg 사용 (집계함수이이기 때문에)
averageUDF = spark.udf.register('average_udf', average_udf_f)
df.agg(averageUDF("b").alias("count")).show()
  • SparkSQL에서의 사용
spark.sql('SELECT average_udf(a) FROM test').show()

 

평균을 내는 UDF를 실제 데이터에서 활용해보기

 

orders.csv
0.09MB

1. 데이터 읽어오기

데이터가 tab으로 구분되어 있으므로 read.option에서 delimiter = '\t'를 세팅한다.

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType

order = spark.read.options(delimiter='\t').option("header","true").csv("orders.csv")
order.show()

2. items 컬럼이 json 형식이기에, explode해서 데이터를 전개한다.

  • from_json : string 데이터를 json형식으로 변환한다.
# 데이터프레임을 이용해서 해보기
struct = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("id", StringType()),
        StructField("quantity", LongType())
    ])
)

order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")
order_items.show()

3. order id별로 quantity의 평균을 구한다

  • 이전에 만든 average_utf를 활용한다.
order_items.createOrReplaceTempView("order_items")

spark.sql("""
    SELECT  order_id
    		, CAST(average_udf(item.quantity) as decimal) avg_count
      FROM  order_items 
     GROUP 
     	BY  1 
     ORDER
     	BY  2 DESC
""").show(5)

 

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

728x90
반응형