Spark & Hadoop/Pyspark & SparkSQL

[Pyspark] pyspark설치 및 데이터 읽어오기, RDD&Python객체&DataFrame 변환

YSY^ 2023. 9. 30. 17:56

Local Standalone Spark

  • Spark Cluster Manager로 local[n] 지정
    • master를 local[n]으로 지정
    • master는 클러스터 매니저를 지정하는데 사용
  • 주로 개발이나 간단한 테스트 용도
  • 하나의 JVM에서 모든 프로세스를 실행
    • 하나의 Driver와 하나의 Executor가 실행됨
    • 1+ 쓰레드가 Executor안에서 실행됨
  • Executor안에 생성되는 쓰레드 수 
    • local:하나의 쓰레드만 생성
    • local[*]: 컴퓨터 CPU 수만큼 쓰레드를 생성

Spark 잡을 실행할 때 master를 local[3]으로 지정한 경우

Package 설치

  • PySpark + Py4J를 설치
  • 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
  • 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
  • Spark Web UI는 기본적으로는 접근 불가
    • ngrok을 통해 억지로 열 수는 있음
  • Py4J
    • 파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌

Pyspark 설치

pip install pyspark
pip install py4j

Spark Session 생성

  • SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다.
  • SparkSession을 이용해 RDD, 데이터 프레임등을 만든다.
  • SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다
  • local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executor로 동작한다
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark Tutorial')\
        .getOrCreate()

spark에 대한 정보를 알아본다.

spark

아래코드로 사양을 볼 수 있다.

!lscpu

 

Python <> RDD <> DataFrame

Python 객체를 RDD로 변환

1. Python 리스트 생성

2. 파이썬 리스트를 RDD로 변환

RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션)

name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
# 파이썬 리스트를 RDD로 변환
rdd = spark.sparkContext.parallelize(name_list_json)
# 위 리스트는 string형태인데 이를 Json 형식으로 바꾼다.
import json
parsed_rdd = rdd.map(lambda el:json.loads(el))
parsed_rdd.collect() #파이썬으로 가져온다.
# [{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]

파이썬 리스트를 데이터프레임으로 변환하기

  • spark.createDataFrame을 사용한다. 이때 2번째 인자로 스키마를 지정해주어야 한다. 아래의 경우 string type
    • 데이터 타입은 pyspark.sql.types에서 import해온다.
  • printSchema : 스키마를 볼 수 있다.
from pyspark.sql.types import StringType

df = spark.createDataFrame(name_list_json, StringType())
df.printSchema()
# root |-- value: string (nullable = true)
 
 df.select('*').collect()
 # [Row(value='{"name": "keeyong"}'),Row(value='{"name": "benjamin"}'), Row(value='{"name": "claire"}')]

RDD를 DataFrame으로 변환

  • 위에서 만든 RDD를 toDF()를 활용하여 DataFrame으로 변환한다.
df_parsed_rdd = parsed_rdd.toDF()
df_parsed_rdd.select('name').collect()
# [Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]

Spark 데이터프레임으로 로드해보기

name_gender.csv
0.00MB

  • spark.read.csv로 csv데이터를 읽어올 수 있다.
df = spark.read.csv("name_gender.csv")
df.printSchema()
# root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
  • csv파일의 header를 인식하고 싶으면 option을 사용한다. 
df = spark.read.option("header", True).csv("name_gender.csv")
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- gender: string (nullable = true)
  • show()를 사용하면 데이터를 볼 수 있다. (python의 print와 같은 기능의 함수)
df.show()
  • gender 별 개수를 보고 싶다면 groupby 메소드를 활용한다.
    • 또한 해당작업은 spark cluster에 있는 작업이기에 python으로 가져오기 위해서는 collect()를 사용하여야한다.
df.groupby(["gender"]).count().collect()
# [Row(gender='F', count=65),
# Row(gender='M', count=28),
# Row(gender='Unisex', count=7)]
  • 데이터 프레임의 파티션 개수를 확인한다.
    • 해당데이터는 크기가 작아 파티션이 1개로 확인된다.
df.rdd.getNumPartitions()
# 1

위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다

728x90
반응형