Spark & Hadoop/Pyspark & SparkSQL

[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract)

YSY^ 2023. 10. 2. 18:37

해당 포스팅에서는 텍스트를 정제 후 데이터프레임으로 만들어보겠습니다.

아래 데이터를 활용합니다.

transfer_cost.txt
0.27MB

예시) 아래 입력된 텍스트를 정규표현식을 활용하여 파싱 후 다음과 같은 데이터프레임으로 출력합니다.

입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling”

출력

week departure_zipcode arrival_zipcode cost vendor
2021-01-04 85001 85002 $28.32 ABC Hauling

 

1. SparkSession 생성 및 환경설정

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

2. 데이터 타입을 세팅하고 데이터를 로딩한다.

import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
transfer_cost_df.show(truncate=False) #truncate=False를 세팅하면 모든 데이터를 보여주게된다.

3. 정규표현식을 세팅한다.

  • (\S+) : 탭이나 스페이스바 같이 공백을 제외한 문자만 남긴다(non-whitespace character)
    • cf) (\s+)(소문자 s) : 공백이나 탭같은 문자만 남긴다 (whitespace character)
  • (\d+) : 숫자만 남긴다 (numeric character)
  • (.*) : 모든 문자를 가져온다.
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

출처 : https://github.com/aloverso/SoftwareSystems/wiki/Deep-Dive-Haley-Grep,-Sed,-Awk

4. regexp_extract 를 적용하여 텍스트 데이터에서 데이터를 추출한다.

  • withColumn(기존/새로운 컬럼 이름, 채울 값)  : 새로운 컬럼을 만들거나 존재하는 컬럼을 바꿀 수 있음
  • regexp_extract(컬럼 이름, regex expression, 매칭된 것중 가져올 데이터의 순서) 
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract('text', regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract('text', regex_str, 3))\
    .withColumn('cost', regexp_extract('text', regex_str, 4))\
    .withColumn('vendor', regexp_extract('text', regex_str, 5))
    
df_with_new_columns.printSchema()

CF) 컬럼을 drop한다

final_df = df_with_new_columns.drop("text")

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

 

728x90
반응형