Playful-Core -Scheduler編-
スケジューラ機能が形になったので、ここらで現状をまとめたいと思います。
全体構成
全体構成が少し変わりました。
Schedulerが実は見えてなくて、最初はReservationの中に含まれると思っていたけど、
これが構成を複雑化させていました。
分けることによって、Reservationが担う役割が大幅に減ってシンプルになりました。
スケジューラ機能
今回作ったのは、Schedulerコンポーネント、スケジューラ機能です。
Reservationは録音予約情報を管理するコンポーネントで、
基本的にはデータの登録・更新・削除・参照の管理を行うだけです。
現状まだ修正出来てないですが、Reservationで時刻の考慮があるロジックは、すべてSchedulerに移されます。
こうすることで、いかのような役割分担が実現出来ます。
Reservation
- データの管理
- 予約された、キャンセルされた、をイベント発火
Scheduler
- スケジューリング管理
- 予約された、キャンセルされた、のイベントを購読し、スケジューリングする
- 予約時間に時刻到達イベントを発火
Schedulerから発火された時刻到達イベントをReservationが購読し、
Recording(録音管理)に予約情報を通知します。
そうすることで、録音を開始する仕組みです。
並行処理
今回のキモは、Go言語の並行処理です。
スケジューラを作るに当たって、Go言語の並行処理が必要なのは見えていました。
とにかく、Javaと平行処理の考え方が違うので、それを吸収するまでに難儀しました。
並行処理は以下の書籍を読んで勉強してます。
※現在、4章まで読み終わっています
- 作者:Katherine Cox-Buday
- 発売日: 2018/10/26
- メディア: 単行本(ソフトカバー)
現時点での、Go言語での並行処理の理解は次の通りです。
- クリティカルセクション(同時アクセスを防ぎたい箇所)に悲観排他を仕込むのではなく、クリティカルセクションを単一のGoroutineに閉じ込める(これが、通信によるメモリの共有)
- Goroutineにはチャネルによる入出力を行い、Goroutineをつなぎ合わせてパイプラインを形成する
読んでいて、部分的にJavaのStreamAPIに似ているなぁなんて感じていました。
データの集合を、流れるものに抽象化して、それをパイプの中を通すように処理していくのが、まさにそうですね。
4章が読んでいてとてもおもしろかったのですが、
基本的なパイプラインの構成と、スケールのやりかたとしてのファンアウト・ファンイン、
パイプラインをまとめ上げるブリッジ、キューイング。。。
この手のフローは、Javaだと構築が難しすぎるのですが、Goroutineは非常に簡単です。
JavaやってるとFrameworkの中でやってしまっているコンテキストも、ここで理解できました。
今回の技術話
さて、今回のスケジューラ機能の仕組みの話です。
スケジューラ機能に関しては、正直ビジネスロジックは多くないので、ドメインモデルの構築はしてないです。
ほぼアプリケーションレイヤーのコンポーネントで成り立っています。
スケジューラ機能は以下の機能を備えなければなりません。
- 予約できること
- キャンセルできること
- 指定時間にイベント発火できること
キャンセルできること、が結構厄介で、スケジューリングのパイプラインを生成するのと、
そのパイプラインのコンテキストにキャンセルを実行する契機が、別のリクエストになります。
コンテキストの管理単位は、リクエストスコープが推奨されており、リクエストをまたいだコンテキストのキャンセルは
本来してはいけないはずです。
そこで、今回は少し工夫して、パイプラインのステージを次の通りとしました。
- 管理ステージ
- スケジューリングステージ
- パブリッシュステージ
なお、ソースでいうと、以下のあたりです。
管理ステージはチャネルを一つ受け取ってGoroutineを起動します。
このチャネルにスケジュールの識別情報を流し、次々と後続のステージに流していき、
最後のステージに到達するとイベントを発火します。
管理ステージでコンテキストを生成します。つまり、スケジュールごとにコンテキストを生成し、パイプラインと紐付けます。
チャネルをクローズすることで、コンテキストがキャンセルされ、パイプラインをまるごと終了させます。
コンテキストがリクエストスコープではないですが、スケジューラ機能としてのコンテキスト(文脈)はスケジュール単位なので、
このようにパイプラインごとに管理すると、しっくり来ました。
スケジューリングステージは、OSSのスケジューラを起動して、実際にスケジュールします。
今回のパイプラインは、流れるのがものすごい遅いパイプラインという抽象化により、スケジューリングを実現しています。
つまり、上のステージから流れてきた識別情報をこのステージがせき止め、予約時間に次のステージに流します。
パブリッシュステージは、その名の通り、識別情報からイベントを生成して発行します。
この一連のパイプラインは、スケジューリングされるたびに生成されるため、
そのたびにこのパイプラインの出力チャネルが生成されます。
出力チャネルは、実際にMQTTブローカーにパブリッシュするGoroutineと繋がないといけません。
そこで、出力チャネルを、チャネル型のチャネルに通して、ファンインのGoroutineを起動するブリッジにかまして、
MQTTブローカーのパブリッシュGoroutineにイベントを流します。
以下のあたりですね。
このようにして、スケジューリングを実現しています。