Azure SQL Database(pyodbc)でSQLAlchemyを使う

CODE BASE OKINAWA プログラミングスクール Advent Calendar 2020 - Adventar 1日目の記事です。

adventar.org

私は株式会社EBILABでエンジニアとして働いており、最近では主にインフラ・データ基盤の整備を行っています。 今回はPythonで使用できるORMの一つ、SQLAchemyを使ってみた時のことを書いていこうと思います。

はじめに

Azure Functions(Python)からAzure SQL Databaseに対してクエリを発行したりすることがよくあり、直接SQLを書くかDjangoのORMをStandaloneとして使うことが多々ありました。

djangoのORMをStandaloneとして使う時はこの辺りのリポジトリを参考にしています。 github.com

github.com

SQLAlchemyを使ってみたい動機としては、単純に使ってみたかったのとAzure FunctionsをAzure Monitorで監視する際にDB接続をopencensusを使ってトレースできると便利なのかなっと思ったから。

pypi.org

今回はAzure SQL Databaseに対してSQLAlchemyを使ってDBを操作していきます。

SQLAlchemyとは

Pythonで使用できるORMライブラリの一つでオープンソースで提供されています。SQLを書かずともPythonのコードを用いてDBを操作でき、ORM以外にも様々な機能が豊富で今最も使用されているPythonのORMライブラリとなっているそうです。

公式ドキュメントはこちら docs.sqlalchemy.org

環境

ドライバーのインストール(Linux or Mac

Microsoftの公式ドキュメントで各OS毎のインストール方法が記載されているので、こちらに従ってインストールします。 docs.microsoft.com

DBに接続

PyPIから必要なライブラリをインストール。今回DBへの接続はpyodbcを使用します。

$ pip install pyodbc
$ pip install sqlalchemy

DBの接続文字列を使用してエンジンを作成します。 ここで言うエンジンとは接続を始めとしたSQLAlchemyの機能を使用するための起点となるオブジェクトになります。

odbc_connect = urllib.parse.quote_plus(
    'DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD=' + password)
engine = create_engine('mssql+pyodbc:///?odbc_connect=' + odbc_connect)

以下のようにSQLDatabaseのバージョンとエディションを表示させるクエリを発行し、DBと接続出来るかを確認します。

import urllib
from sqlalchemy import create_engine

server = '<ホスト名>' 
database = '<データベース名>'
username = '<ユーザー名>'
password = '<パスワード>'
odbc_connect = urllib.parse.quote_plus(
    'DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD=' + password)
engine = create_engine('mssql+pyodbc:///?odbc_connect=' + odbc_connect)

with engine.connect() as conn:
    rs = conn.execute('SELECT @@VERSION as version')
    for row in rs:
        print(row['version'])

DBに接続できているのを確認できました。

$ python sample.py
Microsoft SQL Azure (RTM) - 12.0.2000.8
        Oct  1 2020 18:48:35
        Copyright (C) 2019 Microsoft Corporation

次はSQLAlchemyの機能を使ってテーブルを作成していきます。

テーブルの作成

メタデータの機能や関連付けの機能を持つBaseオブジェクトを作成するdeclarative_base()関数を使ってベースなるクラスを作成します。

Base = declarative_base()

Baseクラスを継承して関連付けるテーブルのモデルを作成します。今回の場合は上記で作成したpersonsテーブルをPersonモデルとして作成。 SQLAlchemy(というかORM全般)はこのモデルに対してメソッドを呼び出してデータベースを操作できるようになります。

import urllib
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

・・・


Base = declarative_base()


class Person(Base):
    __tablename__ = 'persons'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(14))

Base.metadata.create_all()メソッドを使ってテーブルを作成。

Base.metadata.create_all(engine)

Azureのpotalで確認すると以下のようにpersonsテーブルが作成されているのが確認できました。 f:id:rnakamine:20201205102613p:plain

次は作成したPersonモデルを使用してデータベースの操作を行っていきます。

DB操作

DBを操作するにあたってsessionを作成する必要があります。 sessionmakerクラスを使用してDBのsessionを作成します。 ここで言うsessionとはDBとの論理的な接続のことでDBとの通信のための情報を保持しています。

# create_engine()で作成したエンジンをbindさせる
Session = sessionmaker(bind=engine)
session = Session()

レコードの追加

session.add()メソッドを使ってレコードを追加することができます。 Personモデルのインスタンスを作成し、session.add()session.commit()で追加していきます。

p1 = Person(name='Mike')
session.add(p1)
p2 = Person(name='Nancy')
session.add(p2)
p3 = Person(name='Jun')
session.add(p3)
session.commit()

Azureのポータル上からレコードが作成できるてるのを確認できました。 f:id:rnakamine:20201205103402p:plain

レコードの読み取り

session.query()を使用して様々なクエリを発行することができます。 今回はall()でテーブルのレコードを全て抽出して、for文で一行ずつ表示させています。 .id.nameなどのプロパティでそれぞれのカラムにアクセスできるようになっています。

persons = session.query(Person).all()
for person in persons:
    print(person.id, person.name)

テーブルの中身を全行抽出できました。

$ python sample.py
1 Mike
2 Nancy
3 Jun

レコードの更新

追加と同じくsession.add()を使ってレコードの更新もできます。

p4 = session.query(Person).filter_by(name='Mike').first()
p4.name = 'Michel'
session.add(p4)
session.commit()

1行目のレコードのnameが変更されました。

$ python sample.py
1 Michel
2 Nancy
3 Jun

オブジェクトの削除

session.delete()メソッドを使用してレコードの削除もできます。

p5 = session.query(Person).filter_by(name='Nancy').first()
session.delete(p5)
session.commit()

2行目のレコードが削除されました。

$ python sample.py
1 Michel
3 Jun

既存テーブルからのAuto Mapping

個人的にはこれがめちゃくちゃ便利だなーと思っていまして、実行時に既存のテーブル情報を読み込んでクラスにマッピングし、上記のようなDB操作を行うことができます。

まずは直接クエリを実行してテーブルの作成・レコードの追加を行います。

CREATE TABLE department(
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [Name] VARCHAR(20) NOT NULL,
    [GroupName] VARCHAR(100) NOT NULL
) ON [PRIMARY]
GO
ALTER TABLE department ADD PRIMARY KEY CLUSTERED 
(
    [id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
GO

INSERT INTO department
    (Name, GroupName)
VALUES
    ('Engineering', 'Research and Development'),
    ('Tool Design', 'Research and Development'),
    ('Sales', 'Sales and Marketing'),
    ('Marketing', 'Sales and Marketing'),
    ('Purchasing', 'Inventory Management')

独自のMetaDataオブジェクトを作成し、refrectメソッドを使ってデータベースから情報を読み込みます。 onlyオプションを使って必要なテーブルだけ読み込むこともできます。

取得したメタデータと、モデルとなるクラスをマッピングすることで、上記でやったのと同じようにデータベースを操作することができるようになります。

import urllib
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

・・・

metadata = MetaData()
metadata.reflect(engine, only=['department'])
Base = automap_base(metadata=metadata)
Base.prepare()
Department = Base.classes.department

Session = sessionmaker(bind=engine)
session = Session()

departments = session.query(Department).all()
for department in departments:
    print(department.Id, department.Name, department.GroupName)

既存のテーブルからデータを抽出することができました。

$ python sample.py
1 Engineering Research and Development
2 Tool Design Research and Development
3 Sales Sales and Marketing
4 Marketing Sales and Marketing
5 Purchasing Inventory Management

まとめ

SQLAlchemy(というかORM全般)を使うとSQLを書かずともDBに対してクエリを発行したりできるし、DBが変わってもほぼ同じように書ける(DB毎の差分を吸収してくれる)のでかなり便利です。

その中でもSQLAlchemyのAuto Mappingの機能を使った既存テーブルの操作は事前にモデルを用意したりする必要がないので、個人的にはかなり良いなと思ってます。

参考文献

tech.chakapoko.com

laplace-daemon.com

mee.hatenablog.com