Spark & Hadoop/Pyspark & SparkSQL

[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법)

YSY^ 2023. 10. 2. 17:57

이번 포스팅에서는 아래 데이터를 활용합니다.

1800.csv
0.06MB
데이터는 이와 같다.

여기서 처리할 데이터는 measure_type가 "TMIN"이고 온도("temperature")가 가장 낮은 "stationID"를 추룰하는 작업을 할 것이다.

 

Pyspark

1. 우선 SparkSession과 SparkConf를 설정한다.

from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

2. csv파일을 데이터프레임으로 로딩한다.

1) read.csv를 활용하여 데이터를 읽어온다.

# 아래 두가지 방법이 있다.
df = spark.read.format("csv").load("1800.csv") 
df = spark.read.csv("1800.csv")

# 컬럼 이름이 없기에 spark에서 임의로 컬럼이름을 세팅하고 type도 임의로 세팅한다.
df.printSchema() 
# root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)
# |-- _c3: string (nullable = true)
# |-- _c4: string (nullable = true)

2) 컬럼이름을 직접 지정해주고, 컬럼의 타입은 spark가 알아서 지정하게 한다.

  • inferSchema : spark가 데이터프레임을 만들때 앞에 몇개 데이터를 샘플링해서 보고 Type를 추정하서 세팅한다.
  • toDF() 메소드를 호출하여 컬럼이름을 지정한다.
df = spark.read.format("csv")\
	.option("inferSchema", "true")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

  데이터 컬럼의 이름과 type 는 아래와 같이 나타난다.

3) 컬럼이름과 타입을 직접 지정해준다.

  • StructField를 사용하면 각 컬럼의 타입을 지정해 줄 수 있다.
  • 각 타입은 pyspark.sql.types에서 import해주어야 한다.
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])
                     
df = spark.read.schema(schema).csv("1800.csv")

CF) 지정할 수 있는 스키마 타입은 아래와 같다. (pyspark.sql.types에서 import한다)

https://spark.apache.org/docs/latest/sql-ref-datatypes.html

 

Data Types - Spark 3.5.0 Documentation

 

spark.apache.org

  • IntegerType
  • LongType 
  • FloatType
  • StringType
  • BooleanType
  • TimestampType
  • DateType
  • ArrayType
  • StructType : StructField의 집합
  • StructField : 컬럼들의 type을 정의
  • MapType

3. filter나 where메소드를 활용하여 데이터를 필터링 한다.

# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")

# Column expression으로 필터링 적용
minTemps = df.where(df.measure_type == "TMIN")

# SQL expression으로 필터링 적용
minTemps = df.where("measure_type = 'TMIN'")  # SQL같은 경우는 '='을 하나만 쓴다.

필터링된 데이터는 이와 같다.

4. 각 stationID에서 가장 낮은 temperature을 찾는다.

  •  groupBy() 메소드를 활용한다.
# Aggregate to find minimum temperature for every station
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()

CF) spark데이터 프레임을 collect함수를 활용하여 파이썬 리스트로 변환할 수 있다.

# Collect, format, and print the results
results = minTempsByStation.collect()

SparkSQL

1. 데이터프레임을 view 테이블로 만들어준다.

df.createOrReplaceTempView("station1800")

2. SQL 쿼리를 작성하여 데이터를 불러오고, collect메소드를 사용하여 python

  • spark.sql 메소드를 사용하면 쿼리를 사용하여 데이터를 select할 수 있다.
results = spark.sql(
	"""
    SELECT  stationID
            , MIN(temperature) AS min_tem
      FROM  station1800
     WHERE  measure_type = 'TMIN'
     GROUP
        BY  1
    """
).collect()

3. For문을 사용하여 데이터를 확인해본다.

# pyspark.sql.Row는 DataFrame의 레코드에 해당하며 필드별로 이름이 존재# 
for r in results:
    print(r)
    
# Row(stationID='ITE00100554', min(temperature)=-148.0)
# Row(stationID='EZE00100082', min(temperature)=-135.0)

 

CF) Pandas

import pandas as pd 

# 데이터를 읽어오고, 컬럼이름을 설정한다.
pd_df = pd.read_csv(
    "1800.csv",
    names=["stationID", "date", "measure_type", "temperature"],
    usecols=[0, 1, 2, 3]
)

#TMIN인 mesasure_Type 데이터만 필터링한다.
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]

# Select only stationID and temperature
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]

# Aggregate to find minimum temperature for every station
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
pd_minTempsByStation.head()

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

728x90
반응형