Google Cloud PlatformのCloud Machine Learning Engineを使用した機械学習のシステムアーキテクチャについて考える機会があったので紹介したいと思います。
前提条件
- Google Cloud Platformを使用する
- オンライン予測が短時間に集中するが頻度は高くない (1予測バースト/day)
- 使っていないときはシステムを停止させてお金をかけたくない
- 学習用データは月1回程度増えていく
- 専属で運用する人がいない
- オンライン予測するためにREST APIを使いたい
- 学習用データは人が入力する
- 学習用データの入力からREST APIの提供までを自動化したい
システム概要
図1はシステム全体の学習パイプラインです。学習用データを人間が入力したあと、REST APIが公開されるまで自動で実行します。
学習して精度の向上があれば新しいモデルに差し替える方法もありますが、 ここでは学習データが追加されるたびに新しいバージョンに更新しています。
処理の流れ
全体の流れは大まかに分けると以下のようになります。
- 学習用データの入力
- データの結合
- 学習
- 学習モデルを新しいバージョンにアップデート
以下で細かく見ていきます。
1. 学習用データの入力
前面にバリデーション用のWebサービスをおきます。人間が入力するデータには誤りが発生するので、正しいフォーマットで入力されるようにバリデーション用のWebサービスを入力の入り口とします。
2. データの結合
Cloud StorageにアップロードされたらCloud Functionsでトリガーします。
学習用データが増えることに対応するために、オリジナルのデータと結合されたデータの2つをCloud Storageに別々のバケットとして保存します。
データの結合処理は以下の例のようになります。
main.py
from google.cloud import storage from google.cloud.storage import Blob def f(event, context): storage_client = storage.Client() bucket_name = 'my-origin-bucket' bucket = storage_client.get_bucket(event['bucket']) blobs = bucket.list_blobs() # バケットから学習用CSVデータを取得して結合する with open('/tmp/' + "merged.csv", "w") as out_file: for blob in blobs: print(blob.name) with open('/tmp/' + blob.name, "wb") as file_obj: blob.download_to_file(file_obj) # 最終行に改行文字が無ければ入れたい with open('/tmp/' + blob.name) as f: for line in f: if line[-1] == '\n': out_file.write(line) else: out_file.write(line + '\n') # 結合されたCSVをトレーニング用のバケットに保存する store_bucket_name = 'my-bucket' store_bucket = storage_client.get_bucket(store_bucket_name) blob = Blob("data.csv", store_bucket) with open('/tmp/' + "merged.csv", "rb") as f: # working directoryには保存できなので/tmpに保存する blob.upload_from_file(file_obj=f)
requirements.txt
google-cloud-storage>=1.13.2,<1.14.0
3. 学習
同じように結合されたデータがCloud FunctionsでトリガーされCloud Machine Learning Engineで学習が実行されます。Cloud Machine Learning Engineのジョブ実行には専用のSDKは提供されていないようなので、 Python Client Library のREST APIを使ってジョブを実行します。
学習処理は以下のようになります。
main.py
import logging from datetime import datetime from googleapiclient import discovery from googleapiclient import errors def f(event, context): try: response = train() logging.debug(response) except errors.HttpError as err: logging.error('There was an error creating the training job. Check the details:') logging.error(err._get_reason()) def train(): training_inputs = {'scaleTier': 'BASIC', 'packageUris': ['gs://my-ml-bucket/training_package/trainer-0.1.tar.gz'], 'pythonModule': 'xgboost_trainer.training', 'region': 'us-central1', 'jobDir': 'gs://my-ml-bucket/xgboost_job_dir', 'runtimeVersion': '1.12', 'pythonVersion': '2.7'} now = datetime.now().strftime("%Y%m%d_%H%M%S") job_spec = {'jobId': 'grammar_' + now, 'trainingInput': training_inputs} project_name = 'my-project-123' project_id = 'projects/{}'.format(project_name) cloudml = discovery.build('ml', 'v1') request = cloudml.projects().jobs().create(body=job_spec, parent=project_id) return request.execute()
学習用スクリプト
事前に学習用のスクリプトをPython packageとしてCloud Storageにアップロードしておく必要があります。上記の例では 'packageUris': ['gs://my-ml-bucket/training_package/trainer-0.1.tar.gz']
でパッケージを指定しています。今回は紹介していませんが、この部分も学習用スクリプトが更新されたら自動的にパッケージを作成してアップロードできる仕組みを用意しておくとよいでしょう。
Pythonのパッケージを作成するには
xgboost_trainer/training.py setup.py
のようなディレクトリ構成を作成したあと
$ python setup.py sdist
を実行します。パッケージは sdist/ に作成されます。詳しくは setup.py
について調べてみてください。
学習済みモデルの保存
また、学習用スクリプトで学習した学習済みモデルは自動では保存されません。自前でCloud Storageなどに保存する必要があります。
以下の例では学習済みモデルの my_model.bst
をCloud Storageにアップロードしています。
# -*- coding: utf-8 -*- import os import subprocess import sys import pandas as pd gcs_model_path = os.path.join('gs://', 'my-bucket', 'data.csv') subprocess.check_call(['gsutil', 'cp', gcs_model_path, 'data.csv'], stderr=sys.stdout) df = pd.read_csv('data.csv', sep='\t', encoding='utf-8') # train something... # 保存した学習済みモデルをCloud Storageにアップロード model_filename = 'my_model.bst' gcs_model_path = os.path.join('gs://', 'my-ml-model-bucket', model_filename) subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
4. 学習モデルを新しいバージョンにアップデート
学習が終わるとCloud Storageにモデルが保存されます。これをCloud FunctionsでトリガーしてCloud Machine Learning Engineに新しいバージョンとしてデプロイします。デプロイ時に新しいバージョンをデフォルトのバージョンにしておきます。こうすることで、REST APIで特定のバージョンを指定しない限り、常に新しい学習済みモデルを使用して予測ができるようになります。
main.py
import logging import time from datetime import datetime from googleapiclient import discovery from googleapiclient import errors def f(event, context): try: response = deploy() logging.debug(response) except errors.HttpError as err: logging.error('There was an error creating the training job. Check the details:') logging.error(err._get_reason()) def deploy(): project_name = 'my-project-123' model_name = 'my-xgboost-model' # TODO もしCloud Machine Learning Engineにモデルが無ければ先に作成するようにしたい version_name = 'v' + datetime.now().strftime("%Y%m%d_%H%M%S") body = { 'name': version_name, 'deploymentUri': 'gs://my-ml-model-bucket', 'runtimeVersion': '1.12', 'framework': 'XGBOOST', 'pythonVersion': '2.7', } # 新しいバージョンをデプロイする cloudml = discovery.build('ml', 'v1') parent = 'projects/{}/models/{}'.format(project_name, model_name) request = cloudml.projects().models().versions().create(body=body, parent=parent) request.execute() # バージョンのデプロイが完了するのを待つ while True: time.sleep(5) name = 'projects/{}/models/{}/versions/{}'.format(project_name, model_name, version_name) request = cloudml.projects().models().versions().get(name=name) response = request.execute() logging.info(response) if response['state'] == 'READY': break # 新しいバージョンをデフォルトのバージョンにする name = 'projects/{}/models/{}/versions/{}'.format(project_name, model_name, version_name) request = cloudml.projects().models().versions().setDefault(name=name) return request.execute()
デプロイしたバージョンが if response['state'] == 'READY':
でデプロイが完了するまで待っていますが、これには1分ほどかかります。Cloud Functionsのデフォルトの処理のタイムアウト時間が60秒なので120秒ぐらいに変更しておく必要があります。
また、モデルのバージョンの名前には v201901017_092356
のように v + 日時
を使うと管理しやすいです。
これでREST APIとして利用できる準備ができました。あとはApp Engineや外部からCloud Machine Learning Engineのオンライン予測APIにリクエストすれば予測結果が返ってきます。
今後問題となりそうなところ
前提条件の"専属で運用する人がいない"と言うところがまだ解決できていません。ドキュメントの整備はもちろんですが、Google Cloud Platformを構築する時に手間がかかります。この部分もコード化してGoogle Cloud Platform上で全てのサービスをコマンド一発で立ち上げられるようにできると良さそうです。これにはTerraformを使うと良いかな?と思っています。