Spark & Hadoop/Pyspark & SparkSQL

[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy)

YSY^ 2023. 10. 2. 21:13

해당 포스팅은 아래 데이터를 활용합니다.

new_customer_survey.csv
2.87MB

해당 데이터는 프로그램 언어 중 많이 사용되는 언어와, 사용하고 싶은 프로그래밍 언어를 조사한 데이터이다.

여기서 어떤 프로그래밍언어를 사람들이 많이 쓰고 있는지를 알아본다.

1. sparksession을 만든다

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
    .getOrCreate()

2. 데이터를 불러온다.

  • 필요한 컬럼만 지정한다.
df = spark.read.csv("survey_results_public.csv", header=True).select('ResponseId', 'LanguageHaveWorkedWith', 'LanguageWantToWorkWith')

3. ":"으로 이어져 있는 데이터를 split 메소드를 활용하여 분리하여 리스트로 만들어준다.

  • split 메소드는 pyspark.sql.functions에 있다.
  • LanguageHaveWorkedWith 컬럼의 데이터를 ':"을 기준으로 SPLIT한 다음 language_have 컬럼으로 만들어준다.
  • trim : 앞이나 뒤의 공백을 제거한다.
import pyspark.sql.functions as F

# LanguageHaveWorkedWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_have 필드로 설정
df2 = df.withColumn(
    "language_have",
    F.split(F.trim(F.col("LanguageHaveWorkedWith")), ";")
)

4. 리스트 데이터를 개별 레코드들로 explode한다.

df_language_have = df3.select(
    df3.ResponseId,
    F.explode(df3.language_have).alias("language_have")
)
df_language_have.show(10)

5. groupby해서 count한다. 

df_language_have.groupby("language_have").count().show(10)

6. 해당 데이터를 sort해서 집계한다.

6-1) sort 메소드 사용

df_language_have.groupby("language_have").count().sort(F.desc("count")).collect()

6-2) orderBy 메소드 활용

df_language_have.groupby("language_have").count().orderBy('count', ascending=False).collect()

sort  결과

CF) 내림차순(desc)을 적용시키는 방법은 두가지가 있다.

1) function 활용 (6-1참고), 오름차순은 "asc"이다.

2) ascending=False (6-2참고)

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

728x90
반응형