UDF(User Defined Function)
- DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수
- Scalar 함수 vs. Aggregation 함수
- Scalar 함수 예: UPPER, LOWER, …
- Aggregation 함수 (UDAF) 예: SUM, MIN, MAX
함수 등록
- pyspark.sql.functions.udf
- DataFrame에서만 사용 가능
- spark.udf.register
- SQL 모두에서 사용 가능
Sparksession 생성
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark UDF") \
.getOrCreate()
데이터 생성
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
영문자를 대문자로 바꿔주는 UDF 만들기
1. lambda 함수 사용
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
2. 파이썬 함수 사용
def upper_udf(s):
return s.upper()
upperUDF = F.udf(upper_udf, StringType())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
- pyspark의 select 사용
df.select("Name", upperUDF("Name").alias("Curated Name")).show()
- SparkSQL에서의 사용
df.createOrReplaceTempView("test")
spark.sql("""
SELECT name
, upper_udf(name) `Curated Name`
FROM test
"""
).show()
3. Python Pandas 함수 사용
- 집합단위로 conversion이 일어나기에 performance가 좋음
- pyspark.sql.functions.pandas_udf로 annotation
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
- pyspark의 select 사용
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
df.select("name", upperUDF("name")).show()
위 코드들의 결과는 모두 똑같이 아래처럼 출력된다.
두 수를 더해주는 UDF 만들기
데이터 생성
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
1. lambda 함수 사용
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b")).show()
2. 파이썬 함수 사용
def plus(x, y):
return x + y
- pyspark의 select 사용
plusUDF = spark.udf.register("plus", plus)
df.withColumn("p", plusUDF("a", "b")).show()
- SparkSQL에서의 사용
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) p FROM test").show()
평균을 내는 UDF 만들기
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(FloatType())
def average_udf_f(v: pd.Series) -> float:
return v.mean()
- pyspark의 agg 사용 (집계함수이이기 때문에)
averageUDF = spark.udf.register('average_udf', average_udf_f)
df.agg(averageUDF("b").alias("count")).show()
- SparkSQL에서의 사용
spark.sql('SELECT average_udf(a) FROM test').show()
평균을 내는 UDF를 실제 데이터에서 활용해보기
1. 데이터 읽어오기
데이터가 tab으로 구분되어 있으므로 read.option에서 delimiter = '\t'를 세팅한다.
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType
order = spark.read.options(delimiter='\t').option("header","true").csv("orders.csv")
order.show()
2. items 컬럼이 json 형식이기에, explode해서 데이터를 전개한다.
- from_json : string 데이터를 json형식으로 변환한다.
# 데이터프레임을 이용해서 해보기
struct = ArrayType(
StructType([
StructField("name", StringType()),
StructField("id", StringType()),
StructField("quantity", LongType())
])
)
order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")
order_items.show()
3. order id별로 quantity의 평균을 구한다
- 이전에 만든 average_utf를 활용한다.
order_items.createOrReplaceTempView("order_items")
spark.sql("""
SELECT order_id
, CAST(average_udf(item.quantity) as decimal) avg_count
FROM order_items
GROUP
BY 1
ORDER
BY 2 DESC
""").show(5)
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형