【mysql/mariaDB】PythonからMysqlを操作する

mysql_mariaDB

 

目的

pythonからmysql/MariaDBにアクセスしてテーブルを作ったりINSERTしたりSELECTしたりする

 

プログラム

使用モジュール

  • mysql-connector-python

pythonでmysqlやmariaDBを操作するモジュールは他に「PyMySQL」というものがあるようだが、

https://albnote.com/python/mysql-connector/

こちらの記事によると「mysql-connector-python」のほうが処理速度が早いらしい。

環境次第かもしれないが、今回はこっちを使用。

 

インストール方法

# pip install mysql-connector-python

python3 を利用する場合はpip3でインストールする。

心配なら両方しとく。

 

ソース

dbtest.py

# coding:utf-8

import mysql.connector as mydb
import sys

## DBコネクターを作成する
# 引数: ([str]ホスト名(IP), [str or int]ポート番号, [str]ユーザー名, [str]パスワード, [str]DB名)
# 戻値: mysql.connectオブジェクト
def createMysqlConnecter(_host, _port, _user, _passwd, _dbname):
  # DB接続に失敗した場合の例外対策
  try:
    resconn = mydb.connect(
      host=_host,
      port=_port,
      user=_user,
      password=_passwd,
      database=_dbname
    )
  except Exception as e:
    print('[DB Connection Error]', e)
    sys.exit(1) # プログラムをエラー終了
  
  # 接続が切れた場合に自動的に再接続する
  resconn.ping(reconnect=True)
  
  return resconn

## テーブルを作成するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def createTable(_conn, _query):
 # CREATEのクエリかどうかを判別
  if _query.split(' ')[0].upper() != 'CREATE':
    print('[CREATE Error] Query is not create.', _query)
    sys.exit(1)
  
  cur = _conn.cursor() # カーソル作成
  
  try:
    cur.execute(_query) # sqlの実行
  except Exception as e:
    print('[Table Create Error]', e)
    sys.exit(1)

## データをインサートするクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def insertData(_conn, _query):
  # INSERTのクエリかどうかを判別
  if _query.split(' ')[0].upper() != 'INSERT':
    print('[INSERT Error] Query is not insert.', _query)
    sys.exit(1)

  cur = _conn.cursor() # カーソル作成
  
  try:
    cur.execute(_query) # sqlの実行
    _conn.commit() # コミットする
  except Exception as e:
    print('[Insert Data Error]', e)
    _conn.rollback() # ロールバックする
    sys.exit(1)

## データを参照するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: ([Arr(data)Select結果])
def selectData(_conn, _query):
  # SELECTのクエリかどうかを判別
  if _query.split(' ')[0].upper() != 'SELECT':
    print('[SELECT Error] Query is not select.', _query)
    sys.exit(1)
  
  cur = _conn.cursor() # カーソル作成
  
  res = [] # 戻り値用変数
  
  try:
    cur.execute(_query) # sqlの実行
    res = cur.fetchall()
  except Exception as e:
    print('[Select Data Error]', e)
    
  return res

def main():
  print('----- mysql connector test -----')
  host = '192.168.0.10'
  user = 'dbuser'
  password = 'passwoed'
  dbname = 'testdb'
  
  conn = createMysqlConnecter(host, 3306, user, password, dbname)
  print('DB conected [' + user + '@' + host + '] ... :', conn.is_connected())
  
  #Table作成用SQL
  createTableSql = '''CREATE TABLE TBL_TEST (
   id int(11) NOT NULL AUTO_INCREMENT,
   env_name varchar(255) NOT NULL,
   comment text DEFAULT NULL,
   created_at datetime NOT NULL DEFAULT current_timestamp(),
   update_at datetime DEFAULT NULL ON UPDATE current_timestamp(),
   delete_at datetime DEFAULT NULL,
   PRIMARY KEY (id)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8
  '''
  
  # insert用SQL
  insertSql = '''INSERT INTO TBL_TEST (env_name, comment) VALUES ('PRD', '本番環境')
  '''
  
  # select用SQL
  selectSql = '''Select * from TBL_TEST;
  '''
  
  createTable(conn, createTableSql)
  insertData(conn, insertSql)
  rows = selectData(conn, selectSql)
  
  print('result:')
  for r in rows:
    print(r)
  
if __name__ == "__main__":
  main()

さらっと書いてみたので例外処理等甘いところはあるが、できるだけやったつもり。

 

解説

# coding:utf-8

import mysql.connector as mydb
import sys

おまじない。

今回はsysモジュールはsys.exit関数を使うためだけにインポート。

 

## DBコネクターを作成する
# 引数: ([str]ホスト名(IP), [str or int]ポート番号, [str]ユーザー名, [str]パスワード, [str]DB名)
# 戻値: mysql.connectオブジェクト
def createMysqlConnecter(_host, _port, _user, _passwd, _dbname):
  # DB接続に失敗した場合の例外対策
  try:
    resconn = mydb.connect(
      host=_host,
      port=_port,
      user=_user,
      password=_passwd,
      database=_dbname
    )
  except Exception as e:
    print('[DB Connection Error]', e)
    sys.exit(1) # プログラムをエラー終了
  
  # 接続が切れた場合に自動的に再接続する
  resconn.ping(reconnect=True)
  
  return resconn

コネクタを作成する関数。

DBに接続するメイン処理。

ここでこけるとなにもできない。

port指定は省略してもいいらしいが、可用性のため一応指定するようにしておく。

 

## テーブルを作成するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def createTable(_conn, _query):
 # CREATEのクエリかどうかを判別
  if _query.split(' ')[0].upper() != 'CREATE':
    print('[CREATE Error] Query is not create.', _query)
    sys.exit(1)
  
  cur = _conn.cursor() # カーソル作成
  
  try:
    cur.execute(_query) # sqlの実行
  except Exception as e:
    print('[Table Create Error]', e)
    sys.exit(1)

テーブルを作成する。

冗長して書いているが、本当に必要な部分は

cur = _conn.cursor() # カーソル作成
cur.execute(_query) # sqlの実行

ここだけ。

 

## データをインサートするクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def insertData(_conn, _query):
  # INSERTのクエリかどうかを判別
  if _query.split(' ')[0].upper() != 'INSERT':
    print('[INSERT Error] Query is not insert.', _query)
    sys.exit(1)

  cur = _conn.cursor() # カーソル作成
  
  try:
    cur.execute(_query) # sqlの実行
    _conn.commit() # コミットする
  except Exception as e:
    print('[Insert Data Error]', e)
    _conn.rollback() # ロールバックする
    sys.exit(1)

データINSERT関数。

クエリを実行したあとにコミットする必要がある。

あと、例外エラー吐いたときにロールバックする処理を入れておく。

 

## データを参照するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: ([Arr(data)Select結果])
def selectData(_conn, _query):
  # SELECTのクエリかどうかを判別
  if _query.split(' ')[0].upper() != 'SELECT':
    print('[SELECT Error] Query is not select.', _query)
    sys.exit(1)
  
  cur = _conn.cursor() # カーソル作成
  
  res = [] # 戻り値用変数
  
  try:
    cur.execute(_query) # sqlの実行
    res = cur.fetchall()
  except Exception as e:
    print('[Select Data Error]', e)
    
  return res

SELECT関数。

クエリ実行後、fetchall()にて検索結果を参照できる。

 

main部分は省略、というか見ればたぶん理解できるぐらいのことしか書いてない。

クエリは”’で囲うことで改行を含めた文字列を渡すことができる。

今まで複数行のコメント処理のためにしか使ってなかった。また一つ賢くなってしまった。

 

出力結果

$ python dbtest.py
DB conected [dbuser@192.168.0.10] … : True
result:
(1, 'PRD', '本番環境', datetime.datetime(2020, 9, 10, 18, 6, 6), None, None)

ちなみに、結果はタプル型で表示されるため、

for r in rows:
    print(r[1], r[2], r[3])

のようにすると必要な部分だけ取得することができる。

コメント

タイトルとURLをコピーしました