이번 포스팅에서는 아래 데이터를 활용합니다.
여기서 처리할 데이터는 measure_type가 "TMIN"이고 온도("temperature")가 가장 낮은 "stationID"를 추룰하는 작업을 할 것이다.
Pyspark
1. 우선 SparkSession과 SparkConf를 설정한다.
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
2. csv파일을 데이터프레임으로 로딩한다.
1) read.csv를 활용하여 데이터를 읽어온다.
# 아래 두가지 방법이 있다.
df = spark.read.format("csv").load("1800.csv")
df = spark.read.csv("1800.csv")
# 컬럼 이름이 없기에 spark에서 임의로 컬럼이름을 세팅하고 type도 임의로 세팅한다.
df.printSchema()
# root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)
# |-- _c3: string (nullable = true)
# |-- _c4: string (nullable = true)
2) 컬럼이름을 직접 지정해주고, 컬럼의 타입은 spark가 알아서 지정하게 한다.
- inferSchema : spark가 데이터프레임을 만들때 앞에 몇개 데이터를 샘플링해서 보고 Type를 추정하서 세팅한다.
- toDF() 메소드를 호출하여 컬럼이름을 지정한다.
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
데이터 컬럼의 이름과 type 는 아래와 같이 나타난다.
3) 컬럼이름과 타입을 직접 지정해준다.
- StructField를 사용하면 각 컬럼의 타입을 지정해 줄 수 있다.
- 각 타입은 pyspark.sql.types에서 import해주어야 한다.
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField
schema = StructType([ \
StructField("stationID", StringType(), True), \
StructField("date", IntegerType(), True), \
StructField("measure_type", StringType(), True), \
StructField("temperature", FloatType(), True)])
df = spark.read.schema(schema).csv("1800.csv")
CF) 지정할 수 있는 스키마 타입은 아래와 같다. (pyspark.sql.types에서 import한다)
https://spark.apache.org/docs/latest/sql-ref-datatypes.html
- IntegerType
- LongType
- FloatType
- StringType
- BooleanType
- TimestampType
- DateType
- ArrayType
- StructType : StructField의 집합
- StructField : 컬럼들의 type을 정의
- MapType
3. filter나 where메소드를 활용하여 데이터를 필터링 한다.
# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")
# Column expression으로 필터링 적용
minTemps = df.where(df.measure_type == "TMIN")
# SQL expression으로 필터링 적용
minTemps = df.where("measure_type = 'TMIN'") # SQL같은 경우는 '='을 하나만 쓴다.
4. 각 stationID에서 가장 낮은 temperature을 찾는다.
- groupBy() 메소드를 활용한다.
# Aggregate to find minimum temperature for every station
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()
CF) spark데이터 프레임을 collect함수를 활용하여 파이썬 리스트로 변환할 수 있다.
# Collect, format, and print the results
results = minTempsByStation.collect()
SparkSQL
1. 데이터프레임을 view 테이블로 만들어준다.
df.createOrReplaceTempView("station1800")
2. SQL 쿼리를 작성하여 데이터를 불러오고, collect메소드를 사용하여 python
- spark.sql 메소드를 사용하면 쿼리를 사용하여 데이터를 select할 수 있다.
results = spark.sql(
"""
SELECT stationID
, MIN(temperature) AS min_tem
FROM station1800
WHERE measure_type = 'TMIN'
GROUP
BY 1
"""
).collect()
3. For문을 사용하여 데이터를 확인해본다.
# pyspark.sql.Row는 DataFrame의 레코드에 해당하며 필드별로 이름이 존재#
for r in results:
print(r)
# Row(stationID='ITE00100554', min(temperature)=-148.0)
# Row(stationID='EZE00100082', min(temperature)=-135.0)
CF) Pandas
import pandas as pd
# 데이터를 읽어오고, 컬럼이름을 설정한다.
pd_df = pd.read_csv(
"1800.csv",
names=["stationID", "date", "measure_type", "temperature"],
usecols=[0, 1, 2, 3]
)
#TMIN인 mesasure_Type 데이터만 필터링한다.
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]
# Select only stationID and temperature
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]
# Aggregate to find minimum temperature for every station
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
pd_minTempsByStation.head()
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형
'Spark & Hadoop > Pyspark & SparkSQL' 카테고리의 다른 글
[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy) (1) | 2023.10.02 |
---|---|
[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select) (0) | 2023.10.02 |
[Pyspark] pyspark설치 및 데이터 읽어오기, RDD&Python객체&DataFrame 변환 (0) | 2023.09.30 |