Blog

2022.09.12

Engineering

操作ログ方式に基づくOptunaストレージ実装とNFS経由の分散最適化

Masashi Shibata

本記事はアルバイトとして勤務されていた大滝理貴さんによる寄稿です。

概要

ハイパーパラメータ最適化のライブラリであるOptuna のストレージにこれまでの方法とは異なる方式でデータを保存する JournalStorage を追加しました。次のマイナーバージョンであるOptuna v3.1.0でリリースが予定されており、現在はmasterブランチからのみ利用可能です。従来のデータの保存様式では、あるレコードに変更を加えた場合、変更前に書かれていた値を変更後の値で上書きします。今回実装した JournalStorage では、従来のように「値」を記録し上書きするのではなく、値の変更の要因となった「操作」を記録します。Optuna でいう操作とは、例えば Study や Trial の作成、目的関数の評価値の記録等を指します。値そのものではなく、より抽象的な操作を記録することで IO の回数を必要最小限に抑え、また従来のストレージAPI より簡素なインターフェースの提供を可能にしました。 

# JournalStorageの利用方法例
import optuna

def objective(trial):
    ...

storage = optuna.storages.JournalStorage(
    optuna.storages.JournalFileStorage("./journal.log")
)
study = optuna.create_study(storage=storage)
study.optimize(objective)

Optuna Storageが抱えている3つの課題

バックエンドの追加が難しい

Optuna Storage は、既存のバックエンド(RDB, In-memory, Redis) 以外にも自前のバックエンドの追加が可能な仕様になっています。BaseStorage の API を実装すれば原理的にはどのようなストレージにも対応可能ですが、実際には次の理由により難しいと考えられています。

  1. BaseStorage には 19 個の抽象メソッドが存在し、実装者への負担が大きい。
  2. BaseStorage の各抽象メソッドの満たすべき規約が複雑であり、特にマルチスレッドによる並行処理に対応するには、気をつけるべき点が多く存在する。

したがって Optuna Storage に精通していないと気軽に自前のバックエンドを追加するのは難しい状況でした。

CachedStorage が扱いにくい

Optuna には CachedStorage というストレージAPIの呼び出し結果をメモリ上でキャッシュするクラスが存在します。しかしながら、Optuna のような分散アプリケーションにおけるキャッシュは本質的に難しく、その実装は開発者にとって非自明で扱いにくいコンポーネントとなってしまっていました。また RDB や Redis はキャッシュがあることで実用的なレイテンシを達成するものの、その導入によって、変更を加えることのできるデータに本来不必要だった制約を加えてしまうなど、CachedStorage はいくらか問題を抱えていました。

NFS上での大規模な並行最適化ができない

Optuna のユーザには、Postgres や MySQL、 Redis の環境が用意できない方も存在します。そのような方向けの永続化手法が SQLite3 のみであるものの、SQLite3 はネットワークファイルシステム (NFS) に対応していないため[1]、ネットワーク越しの大規模な並行最適化が実現できないという問題がありました。

以上の背景のもと、JournalStorage を Optuna に取り込みました。そのバックエンドとして NFSに対応したファイルストレージを提供することで、特殊な環境下での並行最適化が実現できるようになりました。

JournalStorageの設計

JournalStorage は Takeru Ohta (@sile) さんがプロトタイプとして開発した optjournal を参考に設計・実装されました。Optunaでは次のマイナーリリースである Optuna v3.1からサポートする予定です。従来のストレージに対して、 JournalStorage がどのように異なるのかを以下に説明します。

JournalStorage はデータベースへの操作を記録する

Optuna の BaseStorage のメソッドは、ストレージへのアクセス方法という点で create 系、get 系、set 系の 3 種類に大別できます。従来の Optuna Storage ではまず create 系のメソッドを呼ぶことで Study や Trial のレコードをストレージに保存し、get 系のメソッドでそのレコードのフィールド情報を取り出し、set 系のメソッドでそのレコードのフィールド情報を上書きしていました。

# BaseStorageの抽象メソッドは次の19個
class BaseStorage:
    def create_new_study(self, study_name):
    def delete_study(self, study_id):
    def set_study_user_attr(self, study_id, key, value):
    def set_study_system_attr(self, study_id, key, value):
    def set_study_directions(self, study_id, directions):
    def get_study_id_from_name(self, study_name):
    def get_study_name_from_id(self, study_id):
    def get_study_directions(self, study_id):
    def get_study_user_attrs(self, study_id):
    def get_study_system_attrs(self, study_id):
    def get_all_studies(self):
    def create_new_trial(self, study_id, template_trial):
    def set_trial_param(self, trial_id, param_name, ... ):
    def set_trial_state_values(self, trial_id, state, values):
    def set_trial_intermediate_value(self, trial_id, step, ... ):
    def set_trial_user_attr(self, trial_id, key, value):
    def set_trial_system_attr(self, trial_id, key, value):
    def get_trial(self, trial_id):
    def get_all_trials(self, study_id):

JournalStorage では、create 系と set 系のメソッドは全て一行の JSON の永続化という処理に集約されます。例えば、 JournalStorage.create_new_study(study_name = “foo”) というメソッドを呼ぶと、 バックエンドには ”{operation: create_new_study, study_name: “foo”}” という辞書型の JSON が追記され、JournalStorage.set_study_user_attrs(study_id = 1, key = “key”, value = “value”) を呼ぶと、”{operation: set_study_user_attrs, study_id: 1, key: “key”, value = “vallue”}” が追記されます。(実際にはもっと複雑な構造をしています。)このように、メソッドを辞書型の JSON にシリアライズしたものを、操作ログまたはログと呼びます。操作ログをデシリアライズし、古い順に実行(リプレイ)すればインメモリにデータベースのスナップショットを構築することができます。JournalStorage では、get 系のメソッドが呼ばれた時、まだインメモリに適用していないログをバックエンドから取得してリプレイします。そしてリプレイ結果を参照して結果を得ることができます。

# 実際の操作ログ
{"op_code": 0, "pid": "...", "study_name": "sample_study"}
{"op_code": 4, "pid": "...", "study_id": 0, "directions": [1]}
{"op_code": 5, "pid": "...", "study_id": 0, "datetime_start": "2022-08-22T14:49:42.105062"}
{"op_code": 6, "pid": "...", "trial_id": 0, "param_name": "x", ...}
{"op_code": 7, "pid": "...", "trial_id": 0, "state": 1, "values": [0.0], ...}
{"op_code": 5, "pid": "...", "study_id": 0, "datetime_start": "2022-08-22T14:49:42.118206"}

操作ログ形式でのデータの保存は、従来の手法と比べていくつかメリットがあります。

メリット1: バックエンド API の簡素化

1つ目は、バックエンドのAPIを簡素化できることです。Optuna は BaseStorage のメソッド数が多く、自前のバックエンドを実装するのは可能ではあるものの困難という状況でした。しかしこれは仕方のない問題でした。なぜならOptuna には大きく分けて Study と Trial という2つのエンティティが存在し、それぞれに対して、レコード create の操作、 種々のフィールドの set と get の操作があるからです。操作ログ形式でデータを保存する JournalStorage の導入によってこの問題を緩和することができます。先例のとおり、操作ログ形式では、データベースに対するいかなるデータの書き込み (create, set) も「辞書型の JSON の追記」として扱われます。そのため、バックエンドに必要な API は、JSON を書き出して、読み出すメソッドの 2 つのみになります。2つの API を実装するだけで新たなバックエンドに対応できるのは、JournalStorage の大きなメリットです。

# BaseJournalLogStorageの抽象メソッドは2個のみ
class BaseJournalLogStorage:
    def read_logs(self, log_number_from):
    def append_logs(self, logs):

メリット2: キャッシュが不要になる

2つ目のメリットはキャッシュの実装が必要ないことです。こちらはユーザ視点と言うよりも開発者視点でのメリットです。Optuna では CachedStorage というクラスでデータベースをラップすることで RDB と Redis の高速化を行っています。データベースから取得した結果をキャッシュせずに毎回取り出すと、 BaseStorage.get_trials() や BaseStorage.get_all_trials() のレイテンシが大きくなってしまうため、実用的には内部でのキャッシュの利用は避けられません。しかしながら、一般に分散システムでのキャッシュは、「あるワーカーのキャッシュラインが別ワーカーによってアップデートされ、 “dirty” になったらどのように対処するか?」などの問題を考える必要があり、実装が複雑になります。Optuna も例外ではありません。 Optuna ではこのような問題を緩和するために 「一度 COMPLETE になった Trial の変更を禁止する」とし、Optuna の機能に制限を加えることでキャッシュの実装を軽量化しています。しかしながら、CachedStorage は依然として実装が込み入っており、リファクタリングを加えたりメンテナンスするのが難しいモジュールになっています。一方 JournalStorage では、キャッシュは必要ありません。JournalStorage では、インメモリにデータベースのスナップショットを保持しており、新たに永続化されたログを適用することで最新の状態に逐一更新します。そのため、 get 系の関数は、少ない IO で高速にデータを読むことができます。

デメリット: データの初期ロードに時間がかかる

JournalStorage にはデメリットもあります。操作ログ形式では、一度永続化したデータベースを復元するためには、ログをはじめから走査し逐一インメモリに適用する必要があり、初期ロードの時間は従来のバックエンドより長いです。例えば、あるキーに対して値が 1 -> 10 -> 100 -> 1000 とアップデートされたとします。従来のデータベースでは、最終的な値である 1000 を読み込めば良いだけですが、操作ログ形式では、まず 1 -> 10 のログを適用し、そののち 10 -> 100 のログを適用し、100 -> 1000 のログを適用する必要があります。従来のデータベースでは最新の値のみをインメモリに復元すれば良いものの、操作ログ形式では、ログが増えれば増えるほど 、データをメモリ上に復元するのに時間がかかります。

また現在の JournalFileStorage は、全てのログを一つのファイルに保存しています。扱うファイルが少ないのはシンプルで良いのですが、欠点もあります。例えば Study の数が 1000 個あり、そのうち一つの Study 情報のみが必要な場合でも全ての Study をメモリ上に復元しないとデータを読めません。

これらの問題は、操作ログに加え、スナップショットを永続化することで緩和できます。例えば、ログがある一定の量を超えた場合に従来の方式でスナップショットを永続化し、不必要になったログを削除します。そうすることで、インメモリのスナップショットを高速に構築することができるでしょう。スナップショットの機能は Future Work として開発していく予定です。

JournalStorage に関するより詳しい資料に関しては、Takeru Ohtaさんが書かれた記事 [2]などがあります。

NFSのサポート

JournalStorage のデフォルトのバックエンドとしてファイルストレージを実装しました。これまでは、 RDB や Redis 環境を用意できない場合、並行最適化を行うには SQLite3 を用いるしか方法がありませんでした。また、SQLite3 は NFS 環境で用いることが推奨されておらず、Optuna で RDB や Redis を用いずにネットワーク越しに並行最適化をする手段はこれまで提供されていませんでした。今回導入した JournalStorage のファイルバックエンドはNFS に対応していますので、これらのユースケースをカバーできます。

複数のプロセスから NFS 上のファイルを読み書きするためには、マルチプロセスを並行制御するためのファイルロックが必要です。ファイルロックを用いてプロセスを排他することで、安全に「操作」ログを書き込むことができます。通常、ファイルロックを取得するには、flock(2) を用います。しかし flock(2) のマニュアルにあるように、NFS の古いバージョンでは flock(2) はプロセスを排他できません。

NFS でファイルロックを実現する手法はいくつかあるとされていますが、NFSの実装依存も大きく明確な正解は見つかりませんでした。本記事では、私が試した 3 つの方法(symlink(2) を使ったロック、open(2) を使ったロック、 link(2) を使ったロック)でファイルロックを実装し、その安全性の検証結果と速度の比較結果を示します。まずそれぞれの概要を示します。

symlink システムコール用いたファイルロックについて

NFS v2 では rename(2) と symlink(2) がアトミックに実行されるため [3] 、これらをファイルロックの実装に活用できます。あるファイルに対して、同一名のシンボリックリンクを複数プロセスで作成し、リンク作成に成功したプロセスがロックの取得に成功、失敗したらリトライするというロジックでプロセスを排他するメカニズムを実現しました。これは同一名のシンボリックリンクはただ一つのみ作れることを利用しています。例えば、ファイル log_file に対して、すべてのプロセスは log_file.lock というシンボリックリンクを作ろうとします。成功したプロセスはクリティカルセクションに入り、すでにファイルが存在していたら、プロセスはエラー(EEXIST)を受け取り、リトライします。ロックを解放する際は、作成したシンボリックリンクを一意な名前のファイルへ rename(2) し削除します。

class SymlinkLock(BaseLock):
    def acquire(self):
        while True:
            try:
                os.symlink(self._lock_target_file, self._lockfile)
                return True
            except OSError as err:
                if err.errno == errno.EEXIST:
                    continue
                raise err

    def release(self):
        try:
            os.rename(self._lockfile, self._lockrenamefile)
            os.unlink(self._lockrenamefile)
        except OSError:
            raise RuntimeError("Error: did not possess lock")

open システムコールを使ったファイルロックについて

NFS v3 では rename(2) と symlink(2) に加えて、create(2) をアトミックに実行できます[1]。open(2) を O_CREAT と O_EXCL オプションをつけて呼ぶと、ファイルがなければ作り、あればエラー( EEXIST )で失敗するという挙動、すなわち create(2) をします。これを利用してプロセスを排他することができます。ファイルを作れた場合、クリティカルセクションに入り、作れなかった場合もう一度リトライします。シンボリックリンクを用いたロック同様、同一名のファイルはただ 1 つしか作れないことを利用したファイルロックです。シンプルでわかりやすいのですが、こちらは NFS v2 以前ではサポートされていないという問題があります。

class OpenLock(BaseLock):
    def acquire(self):
        while True:
            try:
                open_flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY
                os.close(os.open(self._lockfile, open_flags))
                    return True
            except OSError as err:
                if err.errno == errno.EEXIST:
                    continue
                raise err

    def release(self):
        try:
            os.rename(self._lockfile, self._lockrenamefile)
            os.unlink(self._lockrenamefile)
        except OSError:
            raise RuntimeError("Error: did not possess lock")

link システムコールを使ったファイルロックについて

正式なドキュメントを見つけることはできなかったのですが、多くの NFS では link(2) はアトミックで、ハードリンクを用いることでファイルロックを実現できると書かれている文献[4]が見つかりました。あるファイルに対して、プロセスごとに異なる名前でハードリンクを作成します。作成後 stat(2) を用いてファイルのハードリンクの数を数えることで、もしそのプロセスのみがハードリンクを作ることに成功していればロックの取得に成功したというロジックでファイルロックを実現できます。例えば、排他したいファイル log_file に対して、プロセス 1 は log_file.lock1 というリンクを作り、プロセス 2 は log_file.lock2 というリンクを作ります。この場合、プロセスごとに別の名前でハードリンクを作成するので link(2)  は EEXIST で失敗することはありません。リンク作成後、ファイルのハードリンクの数を stat(2) を用いて数え、ちょうど 2 つであればクリティカルセクションに入ります。2 つというのは、元々のファイルとそのプロセスが作ったハードリンクを指します。

class LinkLock(BaseLock):
    def acquire(self):
        while True:
            os.link(self._lock_target_file, self._lockfile)
            if os.stat(self._lock_target_file_fd.fileno()).st_nlink == 2:
                return True
            self.release()

    def release(self):
        try:
            os.rename(self._lockfile, self._lockrenamefile)
            os.unlink(self._lockrenamefile)
        except OSError:
            raise RuntimeError("Error: did not possess lock")

上記の3つの方法でファイルロックを作り、社内のNFS上でその安全性の検証と速度比較を行いました。実験内容は以下のとおりです。

  1. NFS をマウントしたディレクトリに、ログファイルを作成。
  2. ログファイルに 0 を書き込む。
  3. 10 プロセスを立ち上げ、ログファイルの末尾に書かれた値を読み、その値をインクリメントした値をログファイルの末尾に追記する。この際、ログファイルを読み、新たな値を書き込むまでを上記のファイルロックで排他する。
  4. 合計で 1000 回書き込むのにかかった時間を計測する。また、最終的なファイルの末尾が 1000 であるか検証する。(ファイルロックが正しくファイルへの読み書きを排他できているのかを検証する。)
  5. 1. ~ 4. を合計 10 回実行し、平均経過時間を計測する。

実験結果は次のとおりです。

ファイルロックの種類 平均経過時間
symlink(2) を使ったロック 0.318 (+/- 0.029) s
open(2) を使ったロック 3.517 (+/- 2.131) s
link(2) を使ったロック 10 s 以内には終了せず

symlink(2) を使ったロックは安定して速く、その次が open(2) を使ったロックで、link(2) を使ったロックは一番遅い結果となりました。link(2) を使ったロックは、他の 2 つよりロック取得の条件が厳しいため、このような結果に至ったと考えられます。symlink(2) を使ったロックも open(2) を使ったロックも、ある 1 つのファイルの作成の可否でロックを取得できたかどうかが決まります。それに対して、link(2) を使ったロックはプロセスはそれぞれ異なる名前のファイルを好きなタイミングで作成できるので、ちょうどハードリンクの数が 2 つである瞬間を確認するのが難しいと考えられます。参考までに、link(2) を使ったロックでプロセス数を減らして実験を行ったところ、1プロセスだと 0.417 (+/- 0.064)s で、2 プロセスだと  3.941 (+/- 2.242)s となりました。 

以上より JournalStorage はデフォルトで symlink(2) を使ったロックをサポートし、状況に応じて open(2) を使ったロックを用いることができるようにしました。link(2) を使ったロックは、他の 2 つと比べ性能が安定せず、また NFS 上での link(2) のアトミック性が保証できていないないため今回は採用しませんでした。

おわりに

JournalStorage を実装し、そのバックエンドとしてファイルストレージを追加しました。これにより、RDBや Redis のない環境でも手軽に並行分散最適化を行うことができるようになりました。また、ファイルストレージは、symlink(2) や open(2) を用いたファイルロックを利用しているため、NFSに対応しており、ネットワーク越しでの並行分散最適化も可能となりました。

参考

[1] SQLite Frequently Asked Questions
[2] 操作ログ方式でOptuna用ストレージを実装してみた話
[3] NFS illustrated by Brent Callaghan
[4] PHP: flock – Manual

  • Twitter
  • Facebook