Data Engineering/Python

[Python] 데이터를 MYSQL DB에 적재/ 업데이트 하는 방법 (MYSQL WORKBENCH / Upsert / to_sql)

YSY^ 2022. 1. 9. 18:11

목적

  • 파이썬으로 데이터를 MYSQL의 DB에 적재하는 여러가지 방법을 알아봅니다.
  • 데이터를 한번에 밀어넣는 방법과, 데이터를 업데이트 시켜주는 upsert 방법을 소개합니다

활용데이터

사전 작업 1: MYSQL WORKBENCH DB 연결

  • workbench에서 local DB를 연결합니다
  • connection name를 지정합니다
  • hostname : 로컬로 연결하려면 로컬 ip인 "127.0.0.1"(자동입력 되어있음), AWS RDS로 연결하려면 RDS의 hostname을 입력합니다
  • username : 기본은 "root" 입니다
  • password : "Store in Valult ..." 버튼을 누르고 mysql을 설치했을 때 입력했던 비밀번호를 입력하면 됩니다.

사전 작업 2 :및 MYSQL WORKBENCH에서 TABLE 생성

  • 테이블 이름과, 컬럼명, 컬럼의 데이터타입을 입력해줍니다.

아래 쿼리를 실행시켜도 됩니다.

CREATE TABLE `sys`.`new_table` (
  `datetime` DATETIME NULL,
  `season` INT NULL,
  `holiday` INT NULL,
  `workingday` INT NULL,
  `weather` INT NULL,
  `temp` FLOAT NULL,
  `atemp` FLOAT NULL,
  `humidity` INT NULL,
  `windspeed` FLOAT NULL,
  `casual` INT NULL,
  `registered` INT NULL,
  `count` INT NULL);

컬럼 필드 생성완료

 

데이터 및 필요한 라이브러리 import

import pandas as pd
from sqlalchemy import create_engine
import pymysql
location = 'train.csv'
bike_data = pd.read_csv(location)

DB 정보를 지정합니다

host = "127.0.0.1"
user = "root"
password = "1111"
database = "sys"

 

1. 데이터를 한번에 DB로 밀어 넣기

  • sqlalchemy 라이브러리의 create_engine을 활용합니다
# DB와 연결 및 데이터 불러오기
pymysql.install_as_MySQLdb()

table_name = "sys_config"
engine = create_engine(f"mysql+mysqldb://{user}:{password}"\
                    f"@{host}:3306/{database}",
                    encoding="utf-8")

## 데이터 적재
bike_data.to_sql(name=table_name,
             con=engine,
             if_exists="append",
             index=False
             )

# mysql DB와의 연결을 종료합니다.
engine.dispose()

데이터가 성공적으로 들어갔습니다.

만약 테이블에 없는 컬럼을 집어넣으려고 한다면 아래와 같은 에러가 뜹니다

  • 없는 컬럼을 테이블에 추가한 후 코드를 실행해주세요

 

2. UPSERT

  • 데이터를 처음 적재할 때는 위에처럼 "to_sql"을 사용하면 편리하나, 데이터를 꾸준히 업데이트 해주어야 할때나, 대용량으로 데이터를 적재할 때는 위 방법은 메모리 문제가 발생할 가능성이 큽니다
  • 따라서 UPSERT 를 활용합니다
  • UPSERT란 : Upsert는 중복되는 값이 있으면 Update를 하고 그렇지 않다면 Insert 합니다. 즉, Unique Key의 값이 중복된다면 Update를 하고, Unique 컬럼의 값이 존재하지 않는다면 insert합니다.

Unique Key 할당

  • 먼저 Unique 키를 할당합니다. 만약 unique key 가 없다면 upsert는 작동하지 않습니다
  • 여기서는 datetime 컬럼을 unique key로 활용합니다

아래 쿼리를 실행시켜주세요

ALTER TABLE `sys`.`new_table` 
CHANGE COLUMN `datetime` `datetime` DATETIME NOT NULL ,
ADD PRIMARY KEY (`datetime`),
ADD UNIQUE INDEX `datetime_UNIQUE` (`datetime` ASC) VISIBLE;
;

데이터 변형

update 된것을 확인하기 위해 데이터를 조금 변형합니다

bike_data["season"] = 6
bike_data["casual"] = 30

변형된 데이터

DB와 연결

아래 코드를 활용하여 DB와 연결합니다

pymysql.install_as_MySQLdb()

connection = pymysql.connect(host=host,
                             user=user,
                             password=password,
                             db = database,
                             )
cursor = connection.cursor()

table_name = "new_table"

1) connect() 함수를 이용하면 MySQL host내 DB와 연결할 수 있습니다.

  • host: DB가 존재하는 host
  • user: user name
  • password: 설정한 패스워드
  • db: 연결할 데이터베이스 이름

2) 연결한 DB와 상호작용하기 위해 cursor 객체를 생성합니다

Upsert 쿼리 제작

CF) 보통 UPSERT를 한다면 아래 쿼리를 소개시켜 주지만 이는 대용량의 데이터를 업데이트를 할때 쓸수가 없습니다

INSERT INTO sys.sample values(1, 'a') 
ON DUPLICATE KEY UPDATE id=1, myname='abc';
query = f"""
    INSERT INTO {table_name} (
        `datetime`,
        `season`,
        `holiday`,
        `workingday`,
        `weather`,
        `temp`,
        `atemp`, 
        `humidity`,
        `windspeed`,
        `casual`,
        `registered`,
        `count`
        )
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
        `season`=VALUES(`season`),
        `holiday`=VALUES(`holiday`),
        `workingday`=VALUES(`workingday`),
        `weather`=VALUES(`weather`),
        `temp`=VALUES(`temp`),
        `atemp`=VALUES(`atemp`),
        `humidity`=VALUES(`humidity`),
        `windspeed`=VALUES(`windspeed`),
        `casual`=VALUES(`casual`),
        `registered`=VALUES(`registered`),
        `count`=VALUES(`count`)
    """
  1. 먼저 데이터를 삽입할 컬럼들을 적어줍니다. (Insert Into table_name ...)
  2. 해당 컬럼들의 개수만큼 placeholder(%s)를 VALUES에 설정합니다
  3. ON DUPLICATE KEY UPDATE 부분에 Unique key를 제외하고 바꿀 컬럼들을 설정합니다
    • 즉 Unique key(datetime 컬럼은) 해당부분에 들어가면 안됩니다.
    • 또한 Unique key말고도 변하지 않는 컬럼이 있다면 해당 컬럼은 따로 설정해주지 않아도 됩니다.

cf) Placeholder 

  • DB 데이터에 대해 삽입/변경/삭제를 해야하는데, 조건이 각각 다르다면 Placeholder를 사용하면 됩니다.
  • Placeholder는 위와 같이 동적 SQL문을 구성할 때 활용하는데, 동적 값이 들어갈 위치에 '%s'를 이용해 SQL문을 만들어 놓습으며, Placeholder의 특징은 다음과 같습니다
    • 데이터 순서대로 SQL이 적용
    • 문자열, 숫자 등에 관계 없이 대치할 값은 모두 %s로 쓰임
    • 문자의 경우 따옴표(") 등의 특수문자들이 자동으로 escape되어 처리
    • %s는 컬럼 값을 대치할 때만 사용

 

Upsert 쿼리 실행

# 데이터를 리스트화
args  = bike_data.values.tolist()

# sql 쿼리 적용
cursor.executemany(query, args)

# sql 쿼리 실행
connection.commit()

# DB연결 종료
connection.close()
  1. 삽입할 데이터를 list로 만듭니다
  2. executemany는 여러번의 SQL 쿼리를 한번에 실행해주는 것 입니다. 한 줄씩 실행하는 것은 execute()함수이지만 대량의 데이터를 update 하기에는 부족합니다. executemany() 함수의 매개변수로 쿼리와, 리스트를 담은 리스트를 주면 됩니다.
  3. connection.commit() 코드를 꼭 실행시켜주어야 위의 SQL 쿼리가 실제로 적용됩니다.
  4. connection.close() 코드로 DB연결을 종료시켜줍니다. 만약 연결을 close하지 않고 계속 쓴다면, connection error 가 발생할 수 있습니다.

코드 실행 결과

 

CF) 코드를 간결하게 짜기

쿼리부분에서 컬럼리스트와 placeholder부분의 경우 컬럼 개수가 수십, 수백개가 되면 하나하나 다 써주기가 힘듭니다.

따라서 데이터를 삽입할 컬럼 리스트, 데이터를 업데이트할 리스트를 따로 메소드로 빼놓고 활용할 수 있습니다

또한 value부분의 placeholder부분은 컬럼 개수 만큼 생성하는 코드를 적용합니다.

참고로 [:-2]가 뒤에 붙는 이유는 맨 뒤에 붙은 공백과 콤마를 제외하기 위함입니다.

def query_col_list():
    query = """
        `datetime`,
        `season`,
        `holiday`,
        `workingday`,
        `weather`,
        `temp`,
        `atemp`, 
        `humidity`,
        `windspeed`,
        `casual`,
        `registered`,
        `count`
        """
    return query

def update_value():
    query = """
        `season`=VALUES(`season`),
        `holiday`=VALUES(`holiday`),
        `workingday`=VALUES(`workingday`),
        `weather`=VALUES(`weather`),
        `temp`=VALUES(`temp`),
        `atemp`=VALUES(`atemp`),
        `humidity`=VALUES(`humidity`),
        `windspeed`=VALUES(`windspeed`),
        `casual`=VALUES(`casual`),
        `registered`=VALUES(`registered`),
        `count`=VALUES(`count`)            
    """
    return query
pymysql.install_as_MySQLdb()

connection = pymysql.connect(host=host,
                             user=user,
                             password=password,
                             db = database,
                             )
cursor = connection.cursor()

table_name = "new_table"

cols = bike_data.columns # 컬럼의 개수 count
value_length = str("%s, "* len(cols))[:-2]

query = f"""
    INSERT INTO {table_name} (
        {query_col_list()}
        )
    VALUES ({value_length})
    ON DUPLICATE KEY UPDATE
        {update_value()}
    """

args  = bike_data.values.tolist()

cursor.executemany(query, args)

connection.commit()
connection.close()
728x90
반응형