해당 포스팅에서는 텍스트를 정제 후 데이터프레임으로 만들어보겠습니다.
아래 데이터를 활용합니다.
예시) 아래 입력된 텍스트를 정규표현식을 활용하여 파싱 후 다음과 같은 데이터프레임으로 출력합니다.
입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling”
출력
week | departure_zipcode | arrival_zipcode | cost | vendor |
2021-01-04 | 85001 | 85002 | $28.32 | ABC Hauling |
1. SparkSession 생성 및 환경설정
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
2. 데이터 타입을 세팅하고 데이터를 로딩한다.
import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
transfer_cost_df.show(truncate=False) #truncate=False를 세팅하면 모든 데이터를 보여주게된다.
3. 정규표현식을 세팅한다.
- (\S+) : 탭이나 스페이스바 같이 공백을 제외한 문자만 남긴다(non-whitespace character)
- cf) (\s+)(소문자 s) : 공백이나 탭같은 문자만 남긴다 (whitespace character)
- (\d+) : 숫자만 남긴다 (numeric character)
- (.*) : 모든 문자를 가져온다.
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
4. regexp_extract 를 적용하여 텍스트 데이터에서 데이터를 추출한다.
- withColumn(기존/새로운 컬럼 이름, 채울 값) : 새로운 컬럼을 만들거나 존재하는 컬럼을 바꿀 수 있음
- regexp_extract(컬럼 이름, regex expression, 매칭된 것중 가져올 데이터의 순서)
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract('text', regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract('text', regex_str, 3))\
.withColumn('cost', regexp_extract('text', regex_str, 4))\
.withColumn('vendor', regexp_extract('text', regex_str, 5))
df_with_new_columns.printSchema()
CF) 컬럼을 drop한다
final_df = df_with_new_columns.drop("text")
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형
'Spark & Hadoop > Pyspark & SparkSQL' 카테고리의 다른 글
[Pyspark] DataFrame Join 및 Unique User Count (0) | 2023.10.02 |
---|---|
[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy) (1) | 2023.10.02 |
[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select) (0) | 2023.10.02 |
[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법) (0) | 2023.10.02 |