目的
pythonからmysql/MariaDBにアクセスしてテーブルを作ったりINSERTしたりSELECTしたりする
プログラム
使用モジュール
- mysql-connector-python
pythonでmysqlやmariaDBを操作するモジュールは他に「PyMySQL」というものがあるようだが、
https://albnote.com/python/mysql-connector/
こちらの記事によると「mysql-connector-python」のほうが処理速度が早いらしい。
環境次第かもしれないが、今回はこっちを使用。
インストール方法
# pip install mysql-connector-python
python3 を利用する場合はpip3でインストールする。
心配なら両方しとく。
ソース
dbtest.py
# coding:utf-8
import mysql.connector as mydb
import sys
## DBコネクターを作成する
# 引数: ([str]ホスト名(IP), [str or int]ポート番号, [str]ユーザー名, [str]パスワード, [str]DB名)
# 戻値: mysql.connectオブジェクト
def createMysqlConnecter(_host, _port, _user, _passwd, _dbname):
# DB接続に失敗した場合の例外対策
try:
resconn = mydb.connect(
host=_host,
port=_port,
user=_user,
password=_passwd,
database=_dbname
)
except Exception as e:
print('[DB Connection Error]', e)
sys.exit(1) # プログラムをエラー終了
# 接続が切れた場合に自動的に再接続する
resconn.ping(reconnect=True)
return resconn
## テーブルを作成するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def createTable(_conn, _query):
# CREATEのクエリかどうかを判別
if _query.split(' ')[0].upper() != 'CREATE':
print('[CREATE Error] Query is not create.', _query)
sys.exit(1)
cur = _conn.cursor() # カーソル作成
try:
cur.execute(_query) # sqlの実行
except Exception as e:
print('[Table Create Error]', e)
sys.exit(1)
## データをインサートするクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def insertData(_conn, _query):
# INSERTのクエリかどうかを判別
if _query.split(' ')[0].upper() != 'INSERT':
print('[INSERT Error] Query is not insert.', _query)
sys.exit(1)
cur = _conn.cursor() # カーソル作成
try:
cur.execute(_query) # sqlの実行
_conn.commit() # コミットする
except Exception as e:
print('[Insert Data Error]', e)
_conn.rollback() # ロールバックする
sys.exit(1)
## データを参照するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: ([Arr(data)Select結果])
def selectData(_conn, _query):
# SELECTのクエリかどうかを判別
if _query.split(' ')[0].upper() != 'SELECT':
print('[SELECT Error] Query is not select.', _query)
sys.exit(1)
cur = _conn.cursor() # カーソル作成
res = [] # 戻り値用変数
try:
cur.execute(_query) # sqlの実行
res = cur.fetchall()
except Exception as e:
print('[Select Data Error]', e)
return res
def main():
print('----- mysql connector test -----')
host = '192.168.0.10'
user = 'dbuser'
password = 'passwoed'
dbname = 'testdb'
conn = createMysqlConnecter(host, 3306, user, password, dbname)
print('DB conected [' + user + '@' + host + '] ... :', conn.is_connected())
#Table作成用SQL
createTableSql = '''CREATE TABLE TBL_TEST (
id int(11) NOT NULL AUTO_INCREMENT,
env_name varchar(255) NOT NULL,
comment text DEFAULT NULL,
created_at datetime NOT NULL DEFAULT current_timestamp(),
update_at datetime DEFAULT NULL ON UPDATE current_timestamp(),
delete_at datetime DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
'''
# insert用SQL
insertSql = '''INSERT INTO TBL_TEST (env_name, comment) VALUES ('PRD', '本番環境')
'''
# select用SQL
selectSql = '''Select * from TBL_TEST;
'''
createTable(conn, createTableSql)
insertData(conn, insertSql)
rows = selectData(conn, selectSql)
print('result:')
for r in rows:
print(r)
if __name__ == "__main__":
main()
さらっと書いてみたので例外処理等甘いところはあるが、できるだけやったつもり。
解説
# coding:utf-8
import mysql.connector as mydb
import sys
おまじない。
今回はsysモジュールはsys.exit関数を使うためだけにインポート。
## DBコネクターを作成する
# 引数: ([str]ホスト名(IP), [str or int]ポート番号, [str]ユーザー名, [str]パスワード, [str]DB名)
# 戻値: mysql.connectオブジェクト
def createMysqlConnecter(_host, _port, _user, _passwd, _dbname):
# DB接続に失敗した場合の例外対策
try:
resconn = mydb.connect(
host=_host,
port=_port,
user=_user,
password=_passwd,
database=_dbname
)
except Exception as e:
print('[DB Connection Error]', e)
sys.exit(1) # プログラムをエラー終了
# 接続が切れた場合に自動的に再接続する
resconn.ping(reconnect=True)
return resconn
コネクタを作成する関数。
DBに接続するメイン処理。
ここでこけるとなにもできない。
port指定は省略してもいいらしいが、可用性のため一応指定するようにしておく。
## テーブルを作成するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def createTable(_conn, _query):
# CREATEのクエリかどうかを判別
if _query.split(' ')[0].upper() != 'CREATE':
print('[CREATE Error] Query is not create.', _query)
sys.exit(1)
cur = _conn.cursor() # カーソル作成
try:
cur.execute(_query) # sqlの実行
except Exception as e:
print('[Table Create Error]', e)
sys.exit(1)
テーブルを作成する。
冗長して書いているが、本当に必要な部分は
cur = _conn.cursor() # カーソル作成
cur.execute(_query) # sqlの実行
ここだけ。
## データをインサートするクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: なし
def insertData(_conn, _query):
# INSERTのクエリかどうかを判別
if _query.split(' ')[0].upper() != 'INSERT':
print('[INSERT Error] Query is not insert.', _query)
sys.exit(1)
cur = _conn.cursor() # カーソル作成
try:
cur.execute(_query) # sqlの実行
_conn.commit() # コミットする
except Exception as e:
print('[Insert Data Error]', e)
_conn.rollback() # ロールバックする
sys.exit(1)
データINSERT関数。
クエリを実行したあとにコミットする必要がある。
あと、例外エラー吐いたときにロールバックする処理を入れておく。
## データを参照するクエリを実行する
# 引数: ([mysql.connect]コネクタ, [str]クエリ)
# 戻値: ([Arr(data)Select結果])
def selectData(_conn, _query):
# SELECTのクエリかどうかを判別
if _query.split(' ')[0].upper() != 'SELECT':
print('[SELECT Error] Query is not select.', _query)
sys.exit(1)
cur = _conn.cursor() # カーソル作成
res = [] # 戻り値用変数
try:
cur.execute(_query) # sqlの実行
res = cur.fetchall()
except Exception as e:
print('[Select Data Error]', e)
return res
SELECT関数。
クエリ実行後、fetchall()にて検索結果を参照できる。
main部分は省略、というか見ればたぶん理解できるぐらいのことしか書いてない。
クエリは”’で囲うことで改行を含めた文字列を渡すことができる。
今まで複数行のコメント処理のためにしか使ってなかった。また一つ賢くなってしまった。
出力結果
$ python dbtest.py DB conected [dbuser@192.168.0.10] … : True result: (1, 'PRD', '本番環境', datetime.datetime(2020, 9, 10, 18, 6, 6), None, None)
ちなみに、結果はタプル型で表示されるため、
for r in rows:
print(r[1], r[2], r[3])
のようにすると必要な部分だけ取得することができる。
コメント