Local Standalone Spark
- Spark Cluster Manager로 local[n] 지정
- master를 local[n]으로 지정
- master는 클러스터 매니저를 지정하는데 사용
- 주로 개발이나 간단한 테스트 용도
- 하나의 JVM에서 모든 프로세스를 실행
- 하나의 Driver와 하나의 Executor가 실행됨
- 1+ 쓰레드가 Executor안에서 실행됨
- Executor안에 생성되는 쓰레드 수
- local:하나의 쓰레드만 생성
- local[*]: 컴퓨터 CPU 수만큼 쓰레드를 생성
Package 설치
- PySpark + Py4J를 설치
- 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
- 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
- Spark Web UI는 기본적으로는 접근 불가
- ngrok을 통해 억지로 열 수는 있음
- Py4J
- 파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌
Pyspark 설치
pip install pyspark
pip install py4j
Spark Session 생성
- SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다.
- SparkSession을 이용해 RDD, 데이터 프레임등을 만든다.
- SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다
- local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executor로 동작한다
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
spark에 대한 정보를 알아본다.
spark
아래코드로 사양을 볼 수 있다.
!lscpu
Python <> RDD <> DataFrame
Python 객체를 RDD로 변환
1. Python 리스트 생성
2. 파이썬 리스트를 RDD로 변환
RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션)
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
# 파이썬 리스트를 RDD로 변환
rdd = spark.sparkContext.parallelize(name_list_json)
# 위 리스트는 string형태인데 이를 Json 형식으로 바꾼다.
import json
parsed_rdd = rdd.map(lambda el:json.loads(el))
parsed_rdd.collect() #파이썬으로 가져온다.
# [{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]
파이썬 리스트를 데이터프레임으로 변환하기
- spark.createDataFrame을 사용한다. 이때 2번째 인자로 스키마를 지정해주어야 한다. 아래의 경우 string type
- 데이터 타입은 pyspark.sql.types에서 import해온다.
- printSchema : 스키마를 볼 수 있다.
from pyspark.sql.types import StringType
df = spark.createDataFrame(name_list_json, StringType())
df.printSchema()
# root |-- value: string (nullable = true)
df.select('*').collect()
# [Row(value='{"name": "keeyong"}'),Row(value='{"name": "benjamin"}'), Row(value='{"name": "claire"}')]
RDD를 DataFrame으로 변환
- 위에서 만든 RDD를 toDF()를 활용하여 DataFrame으로 변환한다.
df_parsed_rdd = parsed_rdd.toDF()
df_parsed_rdd.select('name').collect()
# [Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]
Spark 데이터프레임으로 로드해보기
- spark.read.csv로 csv데이터를 읽어올 수 있다.
df = spark.read.csv("name_gender.csv")
df.printSchema()
# root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
- csv파일의 header를 인식하고 싶으면 option을 사용한다.
df = spark.read.option("header", True).csv("name_gender.csv")
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- gender: string (nullable = true)
- show()를 사용하면 데이터를 볼 수 있다. (python의 print와 같은 기능의 함수)
df.show()
- gender 별 개수를 보고 싶다면 groupby 메소드를 활용한다.
- 또한 해당작업은 spark cluster에 있는 작업이기에 python으로 가져오기 위해서는 collect()를 사용하여야한다.
df.groupby(["gender"]).count().collect()
# [Row(gender='F', count=65),
# Row(gender='M', count=28),
# Row(gender='Unisex', count=7)]
- 데이터 프레임의 파티션 개수를 확인한다.
- 해당데이터는 크기가 작아 파티션이 1개로 확인된다.
df.rdd.getNumPartitions()
# 1
위 포스팅은 [파이썬으로 해보는 Spark 프로그래밍 with 프로그래머스] 강의를 듣고 정리한 내용입니다
728x90
반응형
'Spark & Hadoop > Pyspark & SparkSQL' 카테고리의 다른 글
[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy) (1) | 2023.10.02 |
---|---|
[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정) (0) | 2023.10.02 |
[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select) (0) | 2023.10.02 |
[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법) (0) | 2023.10.02 |