반응형

분류 전체보기 339

[SparkSQL] JOIN의 종류 (INNER, LEFT, RIGHT, FULL(OUTER), CROSS, SELF JOIN) (Shuffle JOIN, Broadcast JOIN)

JOIN SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 Merge 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면 JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성 조인의 방식에 따라 어떤 레코드들이 선택되는지, 어떤 필드들이 채워지가 달라짐 Join의 종류 SparkSession 생성 및 사용할 테이블 생성 from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spark.app.name", "sparksql join") conf.set("spark...

[SparkSQL] SparkSQL이란

Spark SQL Spark SQL은 구조화된 데이터 처리를 위한 Spark 모듈 데이터 프레임 작업을 SQL로 처리 가능 데이터프레임에 테이블 이름 지정 후 sql함수 사용가능 판다스에도 pandasql 모듈의 sqldf 함수를 이용하는 동일한 패턴 존재 HQL(Hive Query Language)과 호환 제공 Hive 테이블들을 읽고 쓸 수 있음 (Hive Metastore) Spark SQL 의 장점 (DataFrame에 비해) SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음 물론 두개를 동시에 사용할 수 있음 Familiarity/Readability SQL이 가독성이 더 좋고 더 많은 사람들이 사용가능 Optimization Spark SQL 엔진이 최적화하기 더 좋음 (SQL..

[Spark] Windows 10에 Spark 설치하기 (Java설치, 환경변수 세팅)

이번 포스팅에서는 Spark를 Windows 10 로컬에 세팅하여 vscode에서 활용하는 방법을 알려드립니다. 자바 설치cmd 창에 아래와 같이  출력이 있어야 하며 없으면 JAVA를 설치해 주어야함.> java -versionhttps://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html Download the Latest Java LTS FreeSubscribe to Java SE and get the most comprehensive Java support available, with 24/7 global access to the experts.www.oracle.com 2. 자바 환경변수 세팅운영체제 어디에서..

[Pyspark] DataFrame Join 및 Unique User Count

해당 포스팅에서는 pyspark로 두 데이터를 Join 후, 년월별로 Distinct한 User를 Count하는 방법을 소개한다. 1. sparksession을 세팅한다. from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PySpark DataFrame #5") \ .getOrCreate() 2. 데이터 2개를 호출한다. df_user_session_channel = spark.read.csv("df_user_session_channel.csv", header=True) df_session_timestamp = spark.read.csv("df_session_timestamp.csv", header=True) ..

[Pyspark] 특정 기준으로 text 분리(Split/trim), 리스트 형태의 값을 전개(explode), 데이터 정렬 (sort, orderBy)

해당 포스팅은 아래 데이터를 활용합니다. 해당 데이터는 프로그램 언어 중 많이 사용되는 언어와, 사용하고 싶은 프로그래밍 언어를 조사한 데이터이다. 여기서 어떤 프로그래밍언어를 사람들이 많이 쓰고 있는지를 알아본다. 1. sparksession을 만든다 from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \ .getOrCreate() 2. 데이..

[Pyspark] 정규표현식으로 텍스트 파싱 후 데이터프레임 변환 (regexp_extract)

해당 포스팅에서는 텍스트를 정제 후 데이터프레임으로 만들어보겠습니다. 아래 데이터를 활용합니다. 예시) 아래 입력된 텍스트를 정규표현식을 활용하여 파싱 후 다음과 같은 데이터프레임으로 출력합니다. 입력: “On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling” 출력 week departure_zipcode arrival_zipcode cost vendor 2021-01-04 85001 85002 $28.32 ABC Hauling 1. SparkSession 생성 및 환경설정 from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf(..

[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정)

해당 포스팅에서는 아래 데이터를 활용합니다. 사전작업 1. spark session을 세팅한다. from pyspark.sql import SparkSession spark = SparkSession.builder\ .master("local[*]")\ .appName('PySpark DataFrame #2')\ .getOrCreate() 2. 스키마 이름과 타입을 지정하여 데이터를 불러온다 from pyspark.sql import SparkSession from pyspark.sql import functions as func from pyspark.sql.types import StructType, StructField, StringType, FloatType schema = StructType([..

[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select)

해당 포스팅은 데이터 프레임에서 특정 컬럼만 필터링하는 방법을 소개한다. 활용 데이터 사전 작업 from pyspark.sql.types import StringType, IntegerType, FloatType from pyspark.sql.types import StructType, StructField schema = StructType([ \ StructField("stationID", StringType(), True), \ StructField("date", IntegerType(), True), \ StructField("measure_type", StringType(), True), \ StructField("temperature", FloatType(), True)]) df = spark..

[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법)

이번 포스팅에서는 아래 데이터를 활용합니다. 여기서 처리할 데이터는 measure_type가 "TMIN"이고 온도("temperature")가 가장 낮은 "stationID"를 추룰하는 작업을 할 것이다. Pyspark 1. 우선 SparkSession과 SparkConf를 설정한다. from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spark.app.name", "PySpark DataFrame #1") conf.set("spark.master", "local[*]") spark = SparkSession.builder\ .config(conf=conf)\ .getOrCreate()..

[Pyspark] pyspark설치 및 데이터 읽어오기, RDD&Python객체&DataFrame 변환

Local Standalone Spark Spark Cluster Manager로 local[n] 지정 master를 local[n]으로 지정 master는 클러스터 매니저를 지정하는데 사용 주로 개발이나 간단한 테스트 용도 하나의 JVM에서 모든 프로세스를 실행 하나의 Driver와 하나의 Executor가 실행됨 1+ 쓰레드가 Executor안에서 실행됨 Executor안에 생성되는 쓰레드 수 local:하나의 쓰레드만 생성 local[*]: 컴퓨터 CPU 수만큼 쓰레드를 생성 Package 설치 PySpark + Py4J를 설치 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가 Spark Web UI는 기본적으로는 접근 불가 ngrok을 ..

반응형