반응형

전체 글 343

[Pyspark] 데이터 프레임 집계 방법 (groupBy) (집계 컬럼 이름 지정)

해당 포스팅에서는 아래 데이터를 활용합니다. 사전작업 1. spark session을 세팅한다. from pyspark.sql import SparkSession spark = SparkSession.builder\ .master("local[*]")\ .appName('PySpark DataFrame #2')\ .getOrCreate() 2. 스키마 이름과 타입을 지정하여 데이터를 불러온다 from pyspark.sql import SparkSession from pyspark.sql import functions as func from pyspark.sql.types import StructType, StructField, StringType, FloatType schema = StructType([..

[Pyspark] 데이터 프레임에서 특정 컬럼만 필터링하는 방법 (select)

해당 포스팅은 데이터 프레임에서 특정 컬럼만 필터링하는 방법을 소개한다. 활용 데이터 사전 작업 from pyspark.sql.types import StringType, IntegerType, FloatType from pyspark.sql.types import StructType, StructField schema = StructType([ \ StructField("stationID", StringType(), True), \ StructField("date", IntegerType(), True), \ StructField("measure_type", StringType(), True), \ StructField("temperature", FloatType(), True)]) df = spark..

[Pyspark/SparkSQL] Header가 없는 csv파일 처리 (데이터프레임 컬럼이름 및 컬럼 타입 지정 방법)

이번 포스팅에서는 아래 데이터를 활용합니다. 여기서 처리할 데이터는 measure_type가 "TMIN"이고 온도("temperature")가 가장 낮은 "stationID"를 추룰하는 작업을 할 것이다. Pyspark 1. 우선 SparkSession과 SparkConf를 설정한다. from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spark.app.name", "PySpark DataFrame #1") conf.set("spark.master", "local[*]") spark = SparkSession.builder\ .config(conf=conf)\ .getOrCreate()..

[Pyspark] pyspark설치 및 데이터 읽어오기, RDD&Python객체&DataFrame 변환

Local Standalone Spark Spark Cluster Manager로 local[n] 지정 master를 local[n]으로 지정 master는 클러스터 매니저를 지정하는데 사용 주로 개발이나 간단한 테스트 용도 하나의 JVM에서 모든 프로세스를 실행 하나의 Driver와 하나의 Executor가 실행됨 1+ 쓰레드가 Executor안에서 실행됨 Executor안에 생성되는 쓰레드 수 local:하나의 쓰레드만 생성 local[*]: 컴퓨터 CPU 수만큼 쓰레드를 생성 Package 설치 PySpark + Py4J를 설치 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가 Spark Web UI는 기본적으로는 접근 불가 ngrok을 ..

[Spark] SparkSession 생성과 환경변수 세팅

Spark Session 생성 Spark 프로그램의 시작은 SparkSession을 만드는 것 프로그램마다 하나를 만들어 Spark Cluster와 통신: Singleton 객체 Spark 2.0에서 처음 소개됨 Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용 DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신 config 메소드를 이용해 다양한 환경설정 가능 (단계별로 하나씩 설정도 가능)단 RDD와 관련된 작업을 할때는 SparkSession 밑의 sparkContext 객체를 사용Spark Session API Document : https://spark.apache.org/docs/3.1.1/api/python/reference/api/py..

[Spark] Spark 데이터 구조 (RDD, DataFrame, Dataset)

Spark 데이터 구조  RDD, DataFrame, Dataset (Immutable Distributed Data)RDD가 가장 밑바닥에 있고, 그 위해 DataFrame과  Dataset가 있음RDD는 할 수 있는 것은 많지만, 프로그래밍 생산성이 떨어짐.python을 쓴다면 Dataframe, java/scaler로 한다면 Dataset을 씀2016년에 DataFrame과 Dataset은 하나의 API로 통합됨모두 파티션으로 나뉘어 Spark에서 처리됨DataFrame Code나 Sparksql을 효율적인 자바 바이트 코드(Java bytecode) 로 만들어주는 과정Cody Analysis : 코드 분석하여 어떤 테이블과, 컬럼을 쓰는지 결정하고, 사용자가 없는 테이블과 컬럼을 쓰면 에러를 냄L..

[Spark] Spark 데이터 처리 방식 (Partitioning, Shuffling, Spill) (InputPartition, OutputPartition, ShufflePartition)(Range partition, Hashing partition, Data Skewness)

Spark 데이터 시스템 아키텍처큰데이터를 ETL하거나 일시적으로 데이터를 select할때는 hive나 presto도 상관 없음하지만 spark가 각광받는 이유는 하나의 시스템으로 다양한 것들을 할 수 있기 때문외부데이터의 예 : RDS, Nocurl 등외부데이터를 주기적으로 ETL을 통해 HDFS로 가져와주어야 함.이를 위해1. 주기적으로 ETL작업 진행 (보통 Airflow를 활용)2. 필요할 때 바로 spark에서 로딩내부든 외부든 spark에 올라가는 순간 똑같은 데이터가 됨 데이터 병렬처리와 파티션(Partition)데이터가 먼저 분산되어야함하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정Spark에서..

[Python/GoogleSpreadSheet API] GoogleSpreadSheet 필터 삽입, 필터 삭제

이번 포스팅에서는 구글 스프레드 시트에 필터를 세팅하거나 필터를 삭제하는 방법을 알아보겠습니다. Api 세팅 def api_setting(): scope = ['https://www.googleapis.com/auth/spreadsheets' ,'https://www.googleapis.com/auth/drive'] json_file_name = 'asset_management_key.json' credentials = ServiceAccountCredentials.from_json_keyfile_name(json_file_name, scope) gc = gspread.authorize(credentials) creds = None creds = service_account.Credentials.from..

[Python/GoogleSpreadSheet API] GoogleSpreadSheet 데이터 삽입, 삭제

이번 포스팅에서는 googlespreadsheet api를 활용한 데이터 insert, 데이터 제거에 대해 알아봅니다. Api 세팅 def api_setting(): scope = ['https://www.googleapis.com/auth/spreadsheets' ,'https://www.googleapis.com/auth/drive'] json_file_name = 'asset_management_key.json' credentials = ServiceAccountCredentials.from_json_keyfile_name(json_file_name, scope) gc = gspread.authorize(credentials) creds = None creds = service_account.Cre..

[Python/GoogleSpreadSheet API] GoogleSpreadSheet 시트이름 및 시트탭 변경

Api 세팅 def api_setting(): scope = ['https://www.googleapis.com/auth/spreadsheets' ,'https://www.googleapis.com/auth/drive'] json_file_name = 'asset_management_key.json' credentials = ServiceAccountCredentials.from_json_keyfile_name(json_file_name, scope) gc = gspread.authorize(credentials) creds = None creds = service_account.Credentials.from_service_account_file( json_file_name, scopes=scope ) ..

반응형