반응형

spark 26

[Spark] Schema Evolution

Schema Evolution 데이터가 쌓이고 난 이후에 변경되었을때(이전 스키마 버전으로 데이터가 쌓여있고 스키마가 변경되었을때) 어떻게 데이터를 저장하는지에 대한 메커니즘 용어 아래 같은 경우 schema마다 컬럼이 다른데 이를 parquet파일에서 어떻게 처리하는지 알아본다. 1. SparkSession 생성 from pyspark.sql import * from pyspark.sql.functions import * if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("Spark Schema Evolution Demo") \ .master("local[3]") \ .getOrCreate() 데이터 호출 df1 = spark...

[Spark] Spark 파일 Type 종류 및 Pyspark로 데이터 Write하는 방법 (Parquet / AVRO)

데이터 포멧 종류데이터는 디스크에 파일로 저장Unstructured와 Semi-structured만 Human Readable함하려는 일에 맞게 최적화 필요UnstructuredSemi-structured (Type 정보가 없음)Structured (Type 정보가 있음)TextJSONPARQUET XMLAVRO CSVORC  SequenceFileAvro : Apache에서 개발한 파일 포맷Parquet : 트위터와 클라우데라에서 공동 개발한 Hbase에 최적화된 파일 포맷이다ORC:  Hive에서 개발한 Hive에 최적화된 파일 포맷Spark의 주요 파일 타입 특징CSVJSONPARQUETAVRO컬럼 스토리지XXOX압축 가능 OOOO Splittable  OOOOHuman ReadableOOXXNes..

[Pyspark/SparkSQL] Hive-메타스토어 사용 (테이블 생성방법) (Managed Table, External Table)

카탈로그 테이블과 뷰에 관한 메타 데이터 관리 메타 데이터 : 데이터에 관한 구조화된 데이터, 다른 데이터를 설명해 주는 데이터이다. 기본으로 메모리 기반 카탈로그 제공 - 세션이 끝나면 사라짐 Hive와 호환되는 카탈로그 제공 - Persistent 테이블 관리 방식 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리 (2단계) 메모리 기반 테이블/뷰 임시 테이블 스토리지 기반 테이블 기본적으로 HDFS와 Parquet 포맷을 사용 Hive와 호환되는 메타스토어 사용 두 종류의 테이블이 존재 (Hive와 동일한 개념) Managed Table Spark이 실제 데이터와 메타 데이터 모두 관리 Unmanaged (External) Table Spark이 메타 데이터만 관리 Spark SQL 스토리지..

[Pyspark/SparkSQL] UDF(User Defined Function) (사용자 정의 함수)

UDF(User Defined Function) DataFrame이나 SQL에서 적용할 수 있는 사용자 정의 함수 Scalar 함수 vs. Aggregation 함수 Scalar 함수 예: UPPER, LOWER, … Aggregation 함수 (UDAF) 예: SUM, MIN, MAX 함수 등록 pyspark.sql.functions.udf DataFrame에서만 사용 가능 spark.udf.register SQL 모두에서 사용 가능 Sparksession 생성 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark UDF") \ .getOrCreate() 데이터 생성 columns = ["..

[SparkSQL] JOIN의 종류 (INNER, LEFT, RIGHT, FULL(OUTER), CROSS, SELF JOIN) (Shuffle JOIN, Broadcast JOIN)

JOIN SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 Merge 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면 JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성 조인의 방식에 따라 어떤 레코드들이 선택되는지, 어떤 필드들이 채워지가 달라짐 Join의 종류 SparkSession 생성 및 사용할 테이블 생성 from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spark.app.name", "sparksql join") conf.set("spark...

[SparkSQL] SparkSQL이란

Spark SQL Spark SQL은 구조화된 데이터 처리를 위한 Spark 모듈 데이터 프레임 작업을 SQL로 처리 가능 데이터프레임에 테이블 이름 지정 후 sql함수 사용가능 판다스에도 pandasql 모듈의 sqldf 함수를 이용하는 동일한 패턴 존재 HQL(Hive Query Language)과 호환 제공 Hive 테이블들을 읽고 쓸 수 있음 (Hive Metastore) Spark SQL 의 장점 (DataFrame에 비해) SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음 물론 두개를 동시에 사용할 수 있음 Familiarity/Readability SQL이 가독성이 더 좋고 더 많은 사람들이 사용가능 Optimization Spark SQL 엔진이 최적화하기 더 좋음 (SQL..

[Pyspark] DataFrame Join 및 Unique User Count

해당 포스팅에서는 pyspark로 두 데이터를 Join 후, 년월별로 Distinct한 User를 Count하는 방법을 소개한다. 1. sparksession을 세팅한다. from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PySpark DataFrame #5") \ .getOrCreate() 2. 데이터 2개를 호출한다. df_user_session_channel = spark.read.csv("df_user_session_channel.csv", header=True) df_session_timestamp = spark.read.csv("df_session_timestamp.csv", header=True) ..

[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy)

해당 포스팅은 아래 데이터를 활용합니다. 해당 데이터는 프로그램 언어 중 많이 사용되는 언어와, 사용하고 싶은 프로그래밍 언어를 조사한 데이터이다. 여기서 어떤 프로그래밍언어를 사람들이 많이 쓰고 있는지를 알아본다. 1. sparksession을 만든다 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \ .getOrCreate() 2. 데이..

[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract)

해당 포스팅에서는 텍스트를 정제 후 데이터프레임으로 만들어보겠습니다. 아래 데이터를 활용합니다. 예시) 아래 입력된 텍스트를 정규표현식을 활용하여 파싱 후 다음과 같은 데이터프레임으로 출력합니다. 입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling” 출력 week departure_zipcode arrival_zipcode cost vendor 2021-01-04 85001 85002 $28.32 ABC Hauling 1. SparkSession 생성 및 환경설정 from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf(..

[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법)

이번 포스팅에서는 아래 데이터를 활용합니다. 여기서 처리할 데이터는 measure_type가 "TMIN"이고 온도("temperature")가 가장 낮은 "stationID"를 추룰하는 작업을 할 것이다. Pyspark 1. 우선 SparkSession과 SparkConf를 설정한다. from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spark.app.name", "PySpark DataFrame #1") conf.set("spark.master", "local[*]") spark = SparkSession.builder\ .config(conf=conf)\ .getOrCreate()..

반응형