エンジニアの頭の中

フリーランスエンジニアが書く技術系ブログです。

SQLAlchemyでMySQLのINSERT .. DUPLICATE ON KEY UPDATE (UPSERT)を使用する

SQLAlchemyでUPSERTしたい

最近、PythonMySQLデータベースへ接続するコードを書いていたのですが、直接SQLを書くのが嫌になり、SQLAlchemyを使用することにしました。

既存のMySQLへの接続処理を、SQLAlchemyを使用するコードに置き換えていた際に、MySQLへのレコード登録時にキー重複が発生した場合は、該当レコードを更新する「UPSERT」をする必要があり、元々は、MySQLduplicate on key update機能をSQLを使用していたのですが、SQLを書かずにSQLAlchemyで実現する方法を調べてみたので、メモとして残しておきます。

ON DUPLICATE KEY UPDATEはSQLAlchemy 1.2系でサポートされている

SQLAlchemyでは、1.2系から同機能をサポートしているようです。

以下は、bitbucketのチケットです。

zzzeek / sqlalchemy / issues / #4009 - implement MySQL ON DUPLICATE KEY UPDATE — Bitbucket

  こちらは、公式ドキュメント What’s New in SQLAlchemy 1.2? — SQLAlchemy 1.2 Documentation

1.2系は、現時点(2017/11/5)では、プレリリース版です。私が使用しているSQLAlchemyは、1.1系でしたが、この機能を使用したかったので1.2系へ更新してみました。

SQLAlchemy 1.2系をインストール

pipで検索します。

$ pip search SQLAlchemy

1.2.0b3(ベータ版)が出てきました。

SQLAlchemy (1.2.0b3) - Database Abstraction Library
INSTALLED: 1.1.15
LATEST: 1.2.0b3

SQLAlchemyを1.2.0b3のバージョンに更新します。pipでベータ版へアップデートする場合は、通常のアップデート時とは異なり、--preオプションを付与して、install -Uを実行します。

$ pip install SQLAlchemy -U --pre
Collecting SQLAlchemy
Downloading SQLAlchemy-1.2.0b3.tar.gz (5.4MB)
100% |████████████████████████████████| 5.4MB 331kB/s
Installing collected packages: SQLAlchemy
Found existing installation: SQLAlchemy 1.1.15
Uninstalling SQLAlchemy-1.1.15:
Successfully uninstalled SQLAlchemy-1.1.15
Running setup.py install for SQLAlchemy ... done
Successfully installed SQLAlchemy-1.2.0b3

サンプルコードを読む

公式ドキュメントの、Dialect Improvements and Changes - MySQLの項には、upsertを実行するコードとして、以下のサンプルが掲載されています。

from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(my_table). \
values(id='some_id', data='some data to insert')

on_conflict_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)

conn.execute(on_conflict_stmt)

insert(my_table)関数の引数に指定されているmy_tableは、sqlalchemy.Table型のオブジェクトです。 肝心なのは、insert_stmt.on_duplicate_key_updateの部分です。 on_duplicate_key_updateの引数として、datastatusというキーの引数が指定されていますが、これらは、レコードのキー重複が発生した際に、「どのカラムをどんな値で更新するか」を指定するためのものです。

上記のサンプルの場合だと、以下のように解釈できます。

  • dataという名前の列を、insert文のdata列に指定した値で更新する。(つまりvalues関数の引数であるdataに指定された'some data to insert'で更新せよという意味)
  • statusという名前の列を、'U'という値で更新する。

というものです。 datastatusという引数は、on_duplicate_key_update関数特有の引数ではなく、更新対象とするカラムの名前と値を、key=valueの形式で表している点に注意してください。

実際に書いてみる

サンプルに従い、私が使っていたテーブルに合わせて書いてみたコードです。テーブル定義のコードとセットで載せておきます。

# table定義
sample = Table('sample', metadata,
    Column('context_id', Integer, nullable=False),
    Column('name', String(256), nullable=False),
    Column('amount', BigInteger, nullable=False),
    Column('created_at', TIMESTAMP, nullable=False, server_default=text('current_timestamp')),
    Column('updated_at', TIMESTAMP, nullable=False, server_default=text('current_timestamp'), server_onupdate=text('current_timestamp')),
    UniqueConstraint('context_id', 'name')
)

# insertを作成
insert_stmt = insert(sample).values(
    context_id=<context_idの値>, name=<nameの値>, amount=<amountの値>)

# ユニーク制約であるcontext_idとnameの重複が発生した場合は、既存レコードのamount列をinsert文のamountに指定した値で更新する
on_conflict_stmt = insert_stmt.on_duplicate_key_update(
amount=insert_stmt.inserted.amount)

# upsert実行
conn.execute(on_conflict_stmt)

以上です。簡単に書けました。