PythonのコマンドラインパーサーであるClickを使ってみた

Pythonコマンドラインインターフェースを作ろうとした時、わりと最近はコマンドラインパーサーにclickが使われていることが少しだけ多い印象?だったので、個人的に少し触ってみた。

コマンドラインパーサーとしては他にも標準ライブラリに組み込まれているargparseなどがある。

Clickとは

Clickは必要なコードを最小限に抑えて、コマンドラインインターフェースを作成するためのパッケージ。

デコレータをちょちょっとつけるだけでいい感じにコマンドラインインターフェースが作れて便利な印象。

click.palletsprojects.com

基本的な使い方

PyPI経由でインストール

$ pip install click

@click.command()デコレータを使って関数を修飾し、コマンドを登録する

import click


@click.command()
def hello():
    click.echo('Hello, World!')


if __name__ == '__main__':
    hello()

実行してみると

$ python hello.py
Hello, World!

これだけだと普通のpythonスクリプトを実行しているのと同じだが、--helpオプションをつけて実行すると

$ python hello.py --help
Usage: hello.py [OPTIONS]

Options:
  --help  Show this message and exit.

CLIっぽくhelpが表示されているのがわかる。

オプションを追加する

@click.optionデコレータを使って関数を修飾することで簡単にオプションを追加することができる。

オプションで指定した値が関数の引数となり、関数内で使用できるようになる。

オプションが指定されなかった場合、変数の値はNoneとなる。

import click


@click.command()
@click.option('--count', default=1, help='Number of greetings.')
@click.option('--name', '-n', help='The person to greet.')
def hello(count, name):
    """Simple program that greets NAME for a total of COUNT times."""
    for x in range(count):
        click.echo(f'Hello {name}')


if __name__ == '__main__':
    hello()

この例だとcountnameの二つのオプションを追加している。

nameオプションは--name-nの両方で指定できるように

@click.option('--name', '-n', help='The person to greet.')

のような感じでデコレータにパラメータを渡している。

また、helpパラメータでオプションの説明を追加、defaultパラメータでオプションが指定されてなかった時のデフォルト値を指定することができる。

実行してみるとこんな感じ

$ python option.py --count 3 -n Ryo
Hello Ryo
Hello Ryo
Hello Ryo

--helpオプションを渡して実行するとhelpパラメータで追加したオプションの説明がここで表示される。

$ python option.py --help
Usage: option.py [OPTIONS]

  Simple program that greets NAME for a total of COUNT times.

Options:
  --count INTEGER  Number of greetings.
  -n, --name TEXT  The person to greet.
  --help           Show this message and exit.

引数を追加する

こちらも@click.argumentデコレータを使って関数を修飾するこで簡単に追加できる。

オプションと同様、引数で指定した値が関数の引数となり、関数内で使用できるようになる。

import click


@click.command()
@click.argument('name')
def hello(name):
    click.echo(f'Hello, {name}')


if __name__ == '__main__':
    hello()

実行してみるとこんな感じ

$ python arguments.py Ryo
Hello, Ryo

ここでは、オプションと違って、引数の指定がなかった場合はエラーとなる。

$ python arguments.py
Usage: arguments.py [OPTIONS] NAME
Try 'arguments.py --help' for help.

Error: Missing argument 'NAME'.

サブコマンドの実装

clickではコマンドをグループ化することができ、これによってサブコマンドを作ることができる。

@click.groupデコレータを追加してエントリポイントとなる関数を修飾する。

import click


@click.group()
def cli():
    pass


@cli.command()
def command1():
    click.echo('execute command1')


@cli.command()
def command2():
    click.echo('execute command2')


if __name__ == '__main__':
    cli()

@<関数名>.commandとしてデコレータを追加してあげることで、それぞれのサブコマンドを登録することができる。

実行するとこんな感じ。

$ python subcommand.py command1
execute command1

$ python subcommand.py command2
execute command2

--hellpオプションを渡して実行してみると、サブコマンドの説明も表示されるようになる。

$ python subcommand.py
Usage: subcommand.py [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  command1
  command2

まとめ

argparseを使用したことがないので、単純に比較はできないが、デコレータを追加していくだけで、いろいろ出来るのでとても使いやすい。

次回はさらに細かくパラメータを指定したり、CLIのテストについてもまとめていきたいと思う。

サンプルコード

今回使ったサンプルコードはこちら。 github.com

大名エンジニアカレッジ(第1回 セキュリティ講座)を受けた

先日、大名エンジニアカレッジのWebエンジニア発展コースを受講してきた。

実はもともとはインフラ講座がお目当てだったが、想像以上にセキュリティについての講義が面白かったので、引き続きこの分野についても深く学習を続けていきたいなと思った。

大名エンジニアカレッジついて

daimyo-college.pepabo.com

なぜ受講したか

いくつかコースがあるのだが、今回僕が受講したWebエンジニア発展コースの対象者は

  • Webアプリは作れるようになったつもりなので、発展的な知識を求めている方
  • 特に、インフラの勉強や、セキュリティの勉強のきっかけが欲しい方

となっており、業務でインフラやセキュリティをしっかりやっていきたいという気持ちから受講することにした。

事前準備

今回はWebGoatという、わざと脆弱性を含ませたWebアプリケーションで、レッスン形式でセキュリティを学べるアプリケーションを使用した。

WebGoat用のDockerイメージが用意されているので、ローカルで構築して学習を進めることができた。

github.com

Webセキュリティについて

ここではWebセキュリティの重要性について学ぶことができた。

Webシステム・Webサービスは常に悪意のある攻撃者から狙われている。 scan.netsecurity.ne.jp

なぜセキュリティ事故が起こってしまうのかというと、

  • サーバー等の設定ミス・OSやソフトウェアの脆弱性
  • Webアプリケーションの脆弱性
    • 設計上の欠陥
    • プログラムのバグ

などが原因となっており、基本的にはシステムに欠陥があってそれが悪用されている。

Webアプリケーションの脆弱性には以下のようにさまざまな種類があり、 講座では実際の攻撃手法や対策方法について学ぶことができた。

SQLインジェクション

攻撃者がアプリケーションのセキュリティ上の不備を意図的に利用し、アプリケーションが想定しないSQL文を実行させることにより、データベースを不正に操作する攻撃手法。またはその攻撃を可能とする脆弱性

SQLインジェクションにより発生しうる脅威は

  • データベースに蓄積された非公開情報の閲覧・改ざん・消去
  • 認証回避による不正ログイン
  • ストアドプロシージャ等を利用したOSコマンドの実行

などがある

原理としてはアプリケーションが入力値を適切にエスケープしないままSQL中に展開することで発生する。

例えば次のように

SELECT * FROM users WHERE name = '<入力値>';

アプリケーションが入力値を適切にエスケープしていなければ、

入力値に't OR 't' = 'tのような文字列を与えると以下のようなSQLが発行されてしまい

SELECT * FROM users WHERE name = 't' OR 't' = 't';

条件文が常にとなるため全レコードが抽出されることになる。

SQLインジェクションの主な対策としては

などがあげられる。

パストラバーサルディレクトリトラバーサル

攻撃者がアプリケーションの実行場所以外からファイルやディレクトリにアクセスしたり、保存したりできる脆弱性

パストラバーサルにより発生しうる脅威は

  • ファイルアップロードの場合、重要なシステムファイルが上書きされる可能性がある
  • 個人情報や企業データといった機密情報の漏洩・流出
  • クレデンシャル情報の漏洩による横展開

などがあり、主な対策としては

などがあげられる。

yamory.io

XSSクロスサイトスクリプティング

スクリプトをWebサイトに送り込み、スクリプトを含むHTMLを出力し、ブラウザ上で実行させる。

  • セッション/クッキーの値が盗まれ、不正ログインされる恐れがある
  • サイト利用者の権限でWebアプリケーションの機能を悪用される。
  • Webサイトの内容が書き換えられ、フィッシングにより個人情報が盗まれる。

主な対策としては、入力値の制限やエスケープ(サニタイジング)を行うことが有用。

普段はLaravelやRuby on Railsと言ったフレームワークを使う事が多く、こういった対策がデフォルトとして用意されている?と思うので、 あまり意識したことなかったので、かなり勉強になった。

CSRF(クロスサイトフォージェリ)

ユーザーがログイン状態を保持したまま、別のサイトに用意したコンテンツ上の罠のリンクを踏ませること等をきっかけとして、インターネットショッピングの最終決済や退会等Webアプリケーションの「重要な処理」を呼び出すようユーザーを誘導する攻撃。

本人が意図しない形で情報・リクエストを送信されてしまうので、リクエスト強要とも呼ばれている。

主な対策として、

  • ワンタイムトークンを利用して正しいリクエストを識別する
  • カスタムHTTPヘッダを追加する

などがあげられる。

こちらのCSRF対策に関する具体的な実装例がとても参考になった。

yamory.io

脆弱性診断ツールを使って自動診断を体験

ここでは実際に脆弱性診断ツールを使用してわざと脆弱性を含ませたWebアプリケーションを自動診断させてみた。 今回はOWASP ZAPというツールを使用した。

OWASP ZAPは誰でも無料でWebアプリケーションの脆弱性をチェックできるツール。

使い方を間違えると、法に触れてしまうこともあるようなので、使用する際はかなり注意が必要!!!

qiita.com

まとめ

セキュリティについては、今後やらないといけないよなーとか思いつつなかなか手をつけられてなかったので、 今回講座を受けたことにより今後しっかり勉強するためのきっかけとなった。

実際にWebアプリケーションに手を加えながら脆弱性を体感していけたので、非常にわかりやすかった。

来週の第2回インフラ講座も非常に楽しみ🙌

Azure Storage Blobs client library for Pythonを使ってBlobをいろいろ操作してみる

以前のブログで、Azure SDK for PythonのBlob Storage Clientを使ってpandasのDataFrameをcsv形式でBlobストレージへのアップロード方法について書いたのだが、Blob Storage Clientには他にもたくさんいろいろなメソッドが用意されていたりするので、今回は個人的に結構使いそうなメソッドについて試してみる。

rnakamine.hatenablog.com

環境

  • python 3.8.2
  • azure-storage-blob 12.8.0

事前準備

ストレージアカウントの資格情報を取得

Azure Potalからストレージアカウントの接続文字列を取得する。 f:id:rnakamine:20210328070514p:plain

取得した接続文字列を環境変数に追加

$ export AZURE_STORAGE_CONNECTION_STRING="<yourconnectionstring>"

パッケージのインストール

$ pip install azure-storage-blob

コンテナの作成

BlobServiceClientクラスのcreate_container()を使ってコンテナを作成

import os

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')


def main():
    container_name = 'testnakamine'
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_service_client.create_container(container_name)


if __name__ == '__main__':
    main()

コンテナのリストを表示

BlobServiceClientクラスのlist_containers()メソッドを使ってコンテナのリストを表示できる。

list_containers()メソッドでContainerPropertiesインスタンスを返すイテレータを取得できるので、これをfor文とかで回してリストを表示させることができる。

def main():
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    containers = blob_service_client.list_containers()
    for container in containers:
        print(
            f"Name: {container['name']} Last Modified: {container['last_modified']}")

上記の様に、返されたContainerPropertiesインスタンスからはdictを用いて様々な値にアクセスできる。

docs.microsoft.com

blobのアップロード

BlobServiceClientクラスのget_blob_client()メソッドを使ってBlobClinetインスタンスを取得し、update_blob()メソッドを使ってローカルのファイルをアップロードする。

def main():
    container_name = 'testnakamine'
    blob_name = 'sample.txt'

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(
        container_name, blob=blob_name)

    with open(blob_name, 'rb') as data:
        blob_client.upload_blob(data)

blobの存在確認

上記のupload_blob()メソッドだと、すでにBlobが存在する場合はエラーになる。

BlobClientインスタンスからexists()メソッドを使用し、Blobが存在しているか確認することができる。

def main():
    container_name = 'testnakamine'
    blob_name = 'sample.txt'

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(
        container_name, blob=blob_name)

    if blob_client.exists():
        print('Exists')
    else:
        print('Not exists')

コンテナ内のblobのリストを表示

まずBlobServiceClientクラスのget_blob_client()メソッドを使ってBlobClientインスタンスを取得する。

BlobClientインスタンスlist_blobs()メソッドを使用してBlobPropertiesインスタンスを返すイテレータを取得できるので、それをfor文で回してリストを表示させる。

def main():
    container_name = 'testnakamine'
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    container_client = blob_service_client.get_container_client(container_name)

    blobs = container_client.list_blobs()
    for blob in blobs:
        print(blob['name'], blob['last_modified'])

ContainerPropertiesと同様にdictを用いて様々な値にアクセスできる。

docs.microsoft.com

blobのダウンロード

BlobClientインスタンスdownload_blob()メソッドを使って、Blobをダウンロードしそれをファイルに書き込む。

def main():
    container_name = 'testnakamine'
    blob_name = 'sample.txt'
    file_name = 'download.txt'

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(
        container_name, blob=blob_name)

    with open(file_name, 'wb') as download_file:
        download_file.write(blob_client.download_blob().readall())

今回はわかりやすいようにダウンロードしたBlobを別名(download.txt)で保存し確認する。

$ cat download.txt
Hello, World!

blobの削除

BlobClientインスタンスdelete_blob()メソッドを使って、Blobを削除する。

def main():
    container_name = 'testnakamine'
    blob_name = 'sample.txt'

    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(
        container_name, blob=blob_name)

    blob_client.delete_blob()

サンプルコード

上記で使用したサンプルコード

github.com

今作ってるもの

これらを使って、ターミナルからUNIXコマンドでファイルやディレクトリを扱う様なCLIツールを作っている途中。 使い勝手としてはaws cliのs3と同じ感じしようかと思っている。

github.com

まだ途中だが、ある程度できた段階でブログにも書いていきたい。

DigdagからPostgreSQLに接続した際にconnectionが枯渇する現象を解決した話

docker-compose上で、Digdagをサーバーモードで起動し、PostgreSQLに接続した際に以下のようなログが多発した。

digdag      | 2021-01-26 23:23:30 +0000: Digdag v0.9.42
digdag      | 2021-01-26 23:23:32 +0000 [INFO] (main): secret encryption engine: disabled
digdag      | 2021-01-26 23:23:32 +0000 [INFO] (main): XNIO version 3.3.6.Final
digdag      | 2021-01-26 23:23:32 +0000 [INFO] (main): XNIO NIO Implementation Version 3.3.6.Final
digdag      | 2021-01-26 23:23:32 +0000 [INFO] (main): Starting server on 0.0.0.0:65432
digdag      | 2021-01-26 23:23:32 +0000 [INFO] (main): Bound on 0.0.0.0:65432 (api)
postgres_1  | 2021-01-26 23:23:32.952 UTC [130] FATAL:  sorry, too many clients already
postgres_1  | 2021-01-26 23:23:33.208 UTC [131] FATAL:  sorry, too many clients already
postgres_1  | 2021-01-26 23:23:33.592 UTC [132] FATAL:  sorry, too many clients already
postgres_1  | 2021-01-26 23:23:34.163 UTC [133] FATAL:  sorry, too many clients already
postgres_1  | 2021-01-26 23:23:35.011 UTC [134] FATAL:  sorry, too many clients already

どうやらDigdagからPostgreSQLに対してのコネクションが多発しているような感じたったのだが、特に理由がわからず。。。

うぁーとか思ってこんなツイートをすると

すかさず@hiroysatoさんが丁寧に教えてくださった!(本当にありがとうございます🙇‍♂️)

コネクション数はデフォルトでCPUコア数 × 32らしく、手元のpcのdocker for macではCPUに4コアを割り当ており、 またPostgreSQLの最大コネクション数は通常でデフォルトが100なっているらしく、それが原因でコネクションが枯渇している感じだった。

www.postgresql.org

解決策

ツイートにもあるが、設定ファイルであるserver.propertiesdatabase.maximumPoolSizeのパラメータを追加して解決した。(今回は32で固定)

docs.digdag.io

サンプル

今回はdocker-composeの上でDigdag、PosgresSQLのサーバーを立てて検証した。 github.com

Dockerコンテナを使って手元でさくっとEmbulkを実行しAzure SQL Database間でデータを転送させる

業務でよくAzure SQL Databaseを使っていて、DB間でさくっとデータを転送したい場面が度々あり、以前はAzure Data Factoryとか使っていたのだが たまにしか使わないし設定面倒だしなぁとか思っていた。

Embulkだと環境用意するの大変かなとか思っていたが、割とDocker使えばいけるじゃんと思ったので、自分用でとりあえずリポジトリ作ってすぐ使えるように用意した

github.com

使い方

まずはgit clone

$ git clone git@github.com:rnakamine/docker-embulk-sqlserver.git

次にそれぞれDBの接続情報やテーブル情報などを書いていく。

config.yml

in:
  type: sqlserver
  host: localhost
  user: myuser
  password: ""
  database: my_database
  table: my_table
  select: "col1, col2, col3"
  where: "col4 != 'a'"
  order_by: "col1 DESC"

out:
  type: sqlserver
  host: localhost
  user: myuser
  password: ""
  database: my_database
  table: my_table
  mode: insert

今回はSQLServerのEmbulkプラグインを使っており、その他のオプションはこちらを参考にすると良い

Docker Composeで実行できるようにしたので、以下のコマンドを実行するだけ

$ docker-compose run --rm embulk run config.yml

Dockerfileの中身

OpenJDKのベースイメージを用意してEmbulkをwgetでインストールする。

ちなみにEmbulkのv0.9およびv0.10はJava8で動かす必要があるようでJava9は公式にはサポートされていない。

FROM openjdk:8-jre-alpine

ENV EMBULK_VERSION=0.9.23
RUN wget -q https://dl.embulk.org/embulk-${EMBULK_VERSION}.jar -O /usr/local/bin/embulk \
    && chmod +x /usr/local/bin/embulk

RUN apk add --no-cache libc6-compat \
    && embulk gem install \
    embulk-input-sqlserver \
    embulk-output-sqlserver

ENV TZ=Asia/Tokyo

WORKDIR /work

ENTRYPOINT ["java", "-jar", "/usr/local/bin/embulk"]

今回はAzure SQL Databaseを使うためにSQL Serverのinput/outputプラグインをインストールしているが、 そこは用途に合わせて必要なプラグインをインストールすれば良い。

参考文献

qiita.com

CircleCIのManual Approvalを得てAzure App Serviceのデプロイスロットをスワップさせる

この記事はCircleCI Advent Calendar 2020 - Qiitaの19日目の記事です。 qiita.com

今回はCircleCIのManual Approvalを使用してAzure App Serviceのデプロイスロットをスワップさせるところをやっていきたいと思います。

やりたいこと

  1. Dockerイメージのビルド、Azure Container RegistroyへDockerイメージをPush
  2. StagingのDockerイメージの参照先の変更(Dockerイメージのバージョン変更)
  3. SlackにManual Approvelの通知
  4. Manual Approvelが承認されれば、ProductionとStagingのデプロイスロットのスワップを実行

Azure App Serviceのスロットの機能に関してはこちらを参考にしてください。 docs.microsoft.com docs.microsoft.com

サンプルコード

Rubyの軽量フレームワークであるSinatraをAzure Web App for Containerにデプロイしてみました。 github.com 今回はこちらを使って、ワークフローの構築をしていきます。

1. DockerイメージのBuild / Push

Dockerコマンドを用いてイメージのBuild、Azure Container RegistroyへのPushを行います。 Dockerイメージのバージョン管理は、CircleCIで用意されている環境変数CIRCLE_SHA1で現在のビルドの最新コミットのSHA1ハッシュを参照できるので、こちらをDockerイメージのタグとして使用します。

setup_remote_dockerを実行することでリモート環境が作成され、完全に隔離された環境でDockerコマンドを実行できます。

jobs:
  build-push:
    steps:
      - checkout
      - setup_remote_docker
      - run:
          name: Build Docker image
          command: docker build -t $ACR_SERVER/$ACR_REPOSITORY:$CIRCLE_SHA1 .
      - run:
          name: Push Docker image
          command: |
            docker login $ACR_SERVER -u $ACR_LOGIN -p $ACR_PASSWORD
            docker push $ACR_SERVER/$ACR_REPOSITORY:$CIRCLE_SHA1

2. StagingのDockerイメージの参照先の変更

StagingのDockerイメージの参照先の変更や、後述のスワップの実行にはAzure CLIを使用していきます。

CircleCIでAzure CLIを実行させるにはOrbsのcircleci/azure-cliを使用するとよさそうです。Orbsめっちゃ便利!!!

az webapp config container setコマンドでStaging環境のDockerイメージの参照先を変更することができます。 docs.microsoft.com

orbs:
  azure-cli: circleci/azure-cli@1.1.0

jobs:
  build-push:
    executor: azure-cli/default
    steps:
      - azure-cli/install
      - azure-cli/login-with-service-principal
      - checkout
      - setup_remote_docker
      - run:
          name: Build Docker image
          command: docker build -t $ACR_SERVER/$ACR_REPOSITORY:$CIRCLE_SHA1 .
      - run:
          name: Push Docker image
          command: |
            docker login $ACR_SERVER -u $ACR_LOGIN -p $ACR_PASSWORD
            docker push $ACR_SERVER/$ACR_REPOSITORY:$CIRCLE_SHA1
            az webapp config container set -c "$ACR_SERVER/$ACR_REPOSITORY:$CIRCLE_SHA1" -p $ACR_PASSWORD -r https://$ACR_SERVER -u $ACR_LOGIN -n $AZURE_APP_NAME -s staging -g $AZURE_RESOURCE_GROUP

executorではazure-cli/azure-dockerを使用すると、Azure CLIが既にインストールされているのでこちらを使いたかったのですが、Dockerコマンドが使えなくdocker build等の作業が行えないので、こちらのJobではazure-cli/defaultの方を使用します。

3. SlackにManual Approvelの通知

Slackの通知にもOrbsが用意されているのでこちらを使用します。 Manual Approval用でワークフローを一時停止しワークフローへのリンクを含むSlack通知を送信するようなJobが最初から用意されてたので、そのまま使いたいと思います。

orbs:
  azure-cli: circleci/azure-cli@1.1.0
  slack: circleci/slack@4.1.4

jobs:

・・・

workflows:
  version: 2
  deploy:
    jobs:
      - build-push:
          filters:
            branches:
              only:
                - main
      - slack/on-hold:
          requires:
            - build-push
      - approval:
          type: approval
          requires:
            - slack/on-hold

こんな感じで承認のためのSlack通知が送信されます。 f:id:rnakamine:20201220094456p:plain

ここからすぐ承認ボタンのところまでいけるのでかなり便利です。 f:id:rnakamine:20201220095027p:plain

4. Manual Approvelが承認されれば、ProductionとStagingのデプロイスロットのスワップを実行

スワップの実行はAzure CLIaz webapp deployment slot swapコマンドを使用して実行させることができます。

docs.microsoft.com

version: 2.1

orbs:
  azure-cli: circleci/azure-cli@1.1.0
  slack: circleci/slack@4.1.4

jobs:

・・・

  swap:
    executor: azure-cli/azure-docker
    steps:
      - azure-cli/login-with-service-principal
      - run:
          name: Slot swap
          command: az webapp deployment slot swap -g $AZURE_RESOURCE_GROUP -n $AZURE_APP_NAME --slot staging

workflows:
  version: 2
  deploy:
    jobs:

・・・

      - swap:
          requires:
            - approval

先ほどの承認をボタンを押すことで、ProductionとStagingのデプロイスロットがスワップされ、ブルーグリーンデプロイ的なことができるようになります。

まとめ

Manual Approval、Orbsを今回初めて使ってみてとても便利だなと感じました。 CircleCI使用歴があまりないので、案外ハマるかなと思っていたのですが、 Configファイルも結構直感的に書けたので特に引っかかることなく進めることができました!

Azure SQL Database(pyodbc)でSQLAlchemyを使う

CODE BASE OKINAWA プログラミングスクール Advent Calendar 2020 - Adventar 1日目の記事です。

adventar.org

私は株式会社EBILABでエンジニアとして働いており、最近では主にインフラ・データ基盤の整備を行っています。 今回はPythonで使用できるORMの一つ、SQLAchemyを使ってみた時のことを書いていこうと思います。

はじめに

Azure Functions(Python)からAzure SQL Databaseに対してクエリを発行したりすることがよくあり、直接SQLを書くかDjangoのORMをStandaloneとして使うことが多々ありました。

djangoのORMをStandaloneとして使う時はこの辺りのリポジトリを参考にしています。 github.com

github.com

SQLAlchemyを使ってみたい動機としては、単純に使ってみたかったのとAzure FunctionsをAzure Monitorで監視する際にDB接続をopencensusを使ってトレースできると便利なのかなっと思ったから。

pypi.org

今回はAzure SQL Databaseに対してSQLAlchemyを使ってDBを操作していきます。

SQLAlchemyとは

Pythonで使用できるORMライブラリの一つでオープンソースで提供されています。SQLを書かずともPythonのコードを用いてDBを操作でき、ORM以外にも様々な機能が豊富で今最も使用されているPythonのORMライブラリとなっているそうです。

公式ドキュメントはこちら docs.sqlalchemy.org

環境

ドライバーのインストール(Linux or Mac

Microsoftの公式ドキュメントで各OS毎のインストール方法が記載されているので、こちらに従ってインストールします。 docs.microsoft.com

DBに接続

PyPIから必要なライブラリをインストール。今回DBへの接続はpyodbcを使用します。

$ pip install pyodbc
$ pip install sqlalchemy

DBの接続文字列を使用してエンジンを作成します。 ここで言うエンジンとは接続を始めとしたSQLAlchemyの機能を使用するための起点となるオブジェクトになります。

odbc_connect = urllib.parse.quote_plus(
    'DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD=' + password)
engine = create_engine('mssql+pyodbc:///?odbc_connect=' + odbc_connect)

以下のようにSQLDatabaseのバージョンとエディションを表示させるクエリを発行し、DBと接続出来るかを確認します。

import urllib
from sqlalchemy import create_engine

server = '<ホスト名>' 
database = '<データベース名>'
username = '<ユーザー名>'
password = '<パスワード>'
odbc_connect = urllib.parse.quote_plus(
    'DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD=' + password)
engine = create_engine('mssql+pyodbc:///?odbc_connect=' + odbc_connect)

with engine.connect() as conn:
    rs = conn.execute('SELECT @@VERSION as version')
    for row in rs:
        print(row['version'])

DBに接続できているのを確認できました。

$ python sample.py
Microsoft SQL Azure (RTM) - 12.0.2000.8
        Oct  1 2020 18:48:35
        Copyright (C) 2019 Microsoft Corporation

次はSQLAlchemyの機能を使ってテーブルを作成していきます。

テーブルの作成

メタデータの機能や関連付けの機能を持つBaseオブジェクトを作成するdeclarative_base()関数を使ってベースなるクラスを作成します。

Base = declarative_base()

Baseクラスを継承して関連付けるテーブルのモデルを作成します。今回の場合は上記で作成したpersonsテーブルをPersonモデルとして作成。 SQLAlchemy(というかORM全般)はこのモデルに対してメソッドを呼び出してデータベースを操作できるようになります。

import urllib
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

・・・


Base = declarative_base()


class Person(Base):
    __tablename__ = 'persons'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(14))

Base.metadata.create_all()メソッドを使ってテーブルを作成。

Base.metadata.create_all(engine)

Azureのpotalで確認すると以下のようにpersonsテーブルが作成されているのが確認できました。 f:id:rnakamine:20201205102613p:plain

次は作成したPersonモデルを使用してデータベースの操作を行っていきます。

DB操作

DBを操作するにあたってsessionを作成する必要があります。 sessionmakerクラスを使用してDBのsessionを作成します。 ここで言うsessionとはDBとの論理的な接続のことでDBとの通信のための情報を保持しています。

# create_engine()で作成したエンジンをbindさせる
Session = sessionmaker(bind=engine)
session = Session()

レコードの追加

session.add()メソッドを使ってレコードを追加することができます。 Personモデルのインスタンスを作成し、session.add()session.commit()で追加していきます。

p1 = Person(name='Mike')
session.add(p1)
p2 = Person(name='Nancy')
session.add(p2)
p3 = Person(name='Jun')
session.add(p3)
session.commit()

Azureのポータル上からレコードが作成できるてるのを確認できました。 f:id:rnakamine:20201205103402p:plain

レコードの読み取り

session.query()を使用して様々なクエリを発行することができます。 今回はall()でテーブルのレコードを全て抽出して、for文で一行ずつ表示させています。 .id.nameなどのプロパティでそれぞれのカラムにアクセスできるようになっています。

persons = session.query(Person).all()
for person in persons:
    print(person.id, person.name)

テーブルの中身を全行抽出できました。

$ python sample.py
1 Mike
2 Nancy
3 Jun

レコードの更新

追加と同じくsession.add()を使ってレコードの更新もできます。

p4 = session.query(Person).filter_by(name='Mike').first()
p4.name = 'Michel'
session.add(p4)
session.commit()

1行目のレコードのnameが変更されました。

$ python sample.py
1 Michel
2 Nancy
3 Jun

オブジェクトの削除

session.delete()メソッドを使用してレコードの削除もできます。

p5 = session.query(Person).filter_by(name='Nancy').first()
session.delete(p5)
session.commit()

2行目のレコードが削除されました。

$ python sample.py
1 Michel
3 Jun

既存テーブルからのAuto Mapping

個人的にはこれがめちゃくちゃ便利だなーと思っていまして、実行時に既存のテーブル情報を読み込んでクラスにマッピングし、上記のようなDB操作を行うことができます。

まずは直接クエリを実行してテーブルの作成・レコードの追加を行います。

CREATE TABLE department(
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [Name] VARCHAR(20) NOT NULL,
    [GroupName] VARCHAR(100) NOT NULL
) ON [PRIMARY]
GO
ALTER TABLE department ADD PRIMARY KEY CLUSTERED 
(
    [id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
GO

INSERT INTO department
    (Name, GroupName)
VALUES
    ('Engineering', 'Research and Development'),
    ('Tool Design', 'Research and Development'),
    ('Sales', 'Sales and Marketing'),
    ('Marketing', 'Sales and Marketing'),
    ('Purchasing', 'Inventory Management')

独自のMetaDataオブジェクトを作成し、refrectメソッドを使ってデータベースから情報を読み込みます。 onlyオプションを使って必要なテーブルだけ読み込むこともできます。

取得したメタデータと、モデルとなるクラスをマッピングすることで、上記でやったのと同じようにデータベースを操作することができるようになります。

import urllib
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

・・・

metadata = MetaData()
metadata.reflect(engine, only=['department'])
Base = automap_base(metadata=metadata)
Base.prepare()
Department = Base.classes.department

Session = sessionmaker(bind=engine)
session = Session()

departments = session.query(Department).all()
for department in departments:
    print(department.Id, department.Name, department.GroupName)

既存のテーブルからデータを抽出することができました。

$ python sample.py
1 Engineering Research and Development
2 Tool Design Research and Development
3 Sales Sales and Marketing
4 Marketing Sales and Marketing
5 Purchasing Inventory Management

まとめ

SQLAlchemy(というかORM全般)を使うとSQLを書かずともDBに対してクエリを発行したりできるし、DBが変わってもほぼ同じように書ける(DB毎の差分を吸収してくれる)のでかなり便利です。

その中でもSQLAlchemyのAuto Mappingの機能を使った既存テーブルの操作は事前にモデルを用意したりする必要がないので、個人的にはかなり良いなと思ってます。

参考文献

tech.chakapoko.com

laplace-daemon.com

mee.hatenablog.com