반응형

pyspark 9

[Spark] Schema Evolution

Schema Evolution 데이터가 쌓이고 난 이후에 변경되었을때(이전 스키마 버전으로 데이터가 쌓여있고 스키마가 변경되었을때) 어떻게 데이터를 저장하는지에 대한 메커니즘 용어 아래 같은 경우 schema마다 컬럼이 다른데 이를 parquet파일에서 어떻게 처리하는지 알아본다. 1. SparkSession 생성 from pyspark.sql import * from pyspark.sql.functions import * if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("Spark Schema Evolution Demo") \ .master("local[3]") \ .getOrCreate() 데이터 호출 df1 = spark...

[Pyspark/SparkSQL] Hive-메타스토어 사용 (테이블 생성방법) (Managed Table, External Table)

카탈로그 테이블과 뷰에 관한 메타 데이터 관리 메타 데이터 : 데이터에 관한 구조화된 데이터, 다른 데이터를 설명해 주는 데이터이다. 기본으로 메모리 기반 카탈로그 제공 - 세션이 끝나면 사라짐 Hive와 호환되는 카탈로그 제공 - Persistent 테이블 관리 방식 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리 (2단계) 메모리 기반 테이블/뷰 임시 테이블 스토리지 기반 테이블 기본적으로 HDFS와 Parquet 포맷을 사용 Hive와 호환되는 메타스토어 사용 두 종류의 테이블이 존재 (Hive와 동일한 개념) Managed Table Spark이 실제 데이터와 메타 데이터 모두 관리 Unmanaged (External) Table Spark이 메타 데이터만 관리 Spark SQL 스토리지..

[Pyspark/SparkSQL] UDF(User Defined Function) (사용자 정의 함수)

UDF(User Defined Function) DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수 Scalar 함수 vs. Aggregation 함수 Scalar 함수 예: UPPER, LOWER, … Aggregation 함수 (UDAF) 예: SUM, MIN, MAX 함수 등록 pyspark.sql.functions.udf DataFrame에서만 사용 가능 spark.udf.register SQL 모두에서 사용 가능 Sparksession 생성 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark UDF") \ .getOrCreate() 데이터 생성 columns = ["..

[Spark] Windows 10에 Spark 설치하기 (Java설치, 환경변수 세팅)

이번 포스팅에서는 Spark를 Windows 10 로컬에 세팅하여 vscode에서 활용하는 방법을 알려드립니다. 자바 설치cmd 창에 아래와 같이  출력이 있어야 하며 없으면 JAVA를 설치해 주어야함.> java -versionhttps://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html Download the Latest Java LTS FreeSubscribe to Java SE and get the most comprehensive Java support available, with 24/7 global access to the experts.www.oracle.com 2. 자바 환경변수 세팅운영체제 어디에서..

[Pyspark] DataFrame Join 및 Unique User Count

해당 포스팅에서는 pyspark로 두 데이터를 Join 후, 년월별로 Distinct한 User를 Count하는 방법을 소개한다. 1. sparksession을 세팅한다. from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PySpark DataFrame #5") \ .getOrCreate() 2. 데이터 2개를 호출한다. df_user_session_channel = spark.read.csv("df_user_session_channel.csv", header=True) df_session_timestamp = spark.read.csv("df_session_timestamp.csv", header=True) ..

[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy)

해당 포스팅은 아래 데이터를 활용합니다. 해당 데이터는 프로그램 언어 중 많이 사용되는 언어와, 사용하고 싶은 프로그래밍 언어를 조사한 데이터이다. 여기서 어떤 프로그래밍언어를 사람들이 많이 쓰고 있는지를 알아본다. 1. sparksession을 만든다 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \ .getOrCreate() 2. 데이..

[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract)

해당 포스팅에서는 텍스트를 정제 후 데이터프레임으로 만들어보겠습니다. 아래 데이터를 활용합니다. 예시) 아래 입력된 텍스트를 정규표현식을 활용하여 파싱 후 다음과 같은 데이터프레임으로 출력합니다. 입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling” 출력 week departure_zipcode arrival_zipcode cost vendor 2021-01-04 85001 85002 $28.32 ABC Hauling 1. SparkSession 생성 및 환경설정 from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf(..

[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select)

해당 포스팅은 데이터 프레임에서 특정 컬럼만 필터링하는 방법을 소개한다. 활용 데이터 사전 작업 from pyspark.sql.types import StringType, IntegerType, FloatType from pyspark.sql.types import StructType, StructField schema = StructType([ \ StructField("stationID", StringType(), True), \ StructField("date", IntegerType(), True), \ StructField("measure_type", StringType(), True), \ StructField("temperature", FloatType(), True)]) df = spark..

[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법)

이번 포스팅에서는 아래 데이터를 활용합니다. 여기서 처리할 데이터는 measure_type가 "TMIN"이고 온도("temperature")가 가장 낮은 "stationID"를 추룰하는 작업을 할 것이다. Pyspark 1. 우선 SparkSession과 SparkConf를 설정한다. from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spark.app.name", "PySpark DataFrame #1") conf.set("spark.master", "local[*]") spark = SparkSession.builder\ .config(conf=conf)\ .getOrCreate()..

반응형