猫でもわかるWeb開発・プログラミング

本業エンジニアリングマネージャー。副業Webエンジニア。Web開発のヒントや、副業、日常生活のことを書きます。

Google Cloud Dataflow (Apache Beam) で exactly-once を実装できるのか?

Google Cloud Dataflow で exactly-once の処理は実装できるのか?

Google Cloud Dataflow を使うときに、 exactly-once の処理を実装したくなります。 exactly-once とは、ちょうど1回だけ実行される処理のことです。

Dataflow で Apache Beam のパイプラインを実行する場合、基本的には at-least-once 処理になります。at-least-once は、「最低1回は実行されるが、2回以上実行されることもある」ということ。

Dataflow は、input に対して変換を適用して output を出します。この変換のことを Transform と呼びます。input と output の量が多い場合でも、並列処理によって高速に変換できます。

並列処理を高速に行うため、Dataflow は「多少重複して Transform が実行されても構わない」という方針で実行していきます。ただし、最終的な output は重複しないようにできています。

ただし、この仕様は 副作用 がある場合に問題が生じます。処理の中で外部サービスのAPIを叩く場合などです。Transform は重複する可能性があり、同じ要素に対して API 複数回呼び出されることがあるからです。これは、Dataflowの処理が「at-least-once(最低1回)」保証になっているためです。

これを防ぐ方法はあるのでしょうか。結論から言うと、Dataflow の実装でこれを防ぐ方法はありません。重複は発生してしまうものになります。

唯一解決できる方法としては、叩かれる API 側で重複を排除することです。

もう少し詳しくみてみる

ややこしいことに Google Cloud Dataflow には以下のようなドキュメントがあります。

cloud.google.com

これをみると Dataflow では exactly-once で処理が行われるように読めますが、あくまでも output が重複しないと言っているだけです。

上記ドキュメントの「副作用 (Side effects)」の章を見てみると

副作用が exactly-once セマンティクスを保証するとは限りません。重要な点は、シンクが exactly-once セマンティクスを実装していない限り、外部ストアへの出力の書き込みも含まれることです。

とあります。

「副作用」とは、データの変換以外の処理のことです。ログの書き込みや、外部のデータベースの書き込みは副作用にあたります。

「シンク」 とは、結果を外部のデータベース等に書き込む処理のことです。結果をデータベースに書き込むのは「シンク」でもあるし「副作用」でもあります。

外部ストレージへの書き込み(Google Cloud Storage、AWS S3、Datastore などへの書き込み)は、Google Cloud Storage や Datastore の方に exactly-once の仕組みが実装されていないと、 exactly-once にはできません。

外部のストレージにデータを書き込むケースについては 「exactly-once 出力の配信」 の章に記載があります。

組み込みシンクについて

「exactly-once 出力の配信」の章では、こうも書かれています。

Apache Beam SDK には組み込みシンクが用意されています。これらのシンクは重複が発生しないように設計されています。可能な限り、これらの組み込みシンクを使用してください。

そして、組み込みシンクの一覧として以下のページが紹介されています。

https://beam.apache.org/documentation/io/connectors/

しかし、このドキュメントには誤解があります。 Apache Beam の組み込みシンクの全てが exactly-once ではないのです。

以下ドキュメントの「Google がサポートする I/O コネクタ」にあるものだけが、重複の排除の設計がされているものだと思われます。実際、BigQueryIO や、DatastoreIO は、重複が発生しないようになっています。

cloud.google.com

例えば、Apache Beam のドキュメントに載っている「Web APIs」のコネクタは、Google のドキュメントを見るとサポートされていないので、重複排除には対応していません。実際に試したら、普通に重複して API リクエストが送られてしまいました。

Google がサポートする組み込みシンクにおける重複排除の仕組み

では、データをシンクする際にはどうやって重複排除を実装するのでしょうか?Dataflow のなかで API リクエストをするときに、重複してリクエストされないようにするには、どうしたらいいのでしょうか?

最初に結論を書いた通り、Dataflow 側だけで実装することができまえん。API 側で重複排除の仕組みを実装しなければいけません。

重複排除に関しては、以下の Google Cloud ブログの Part 3 がちょっとだけ参考になります(以下のリンクは Part 1 です)

Exactly-once processing in Google Cloud Dataflow, Part 1 | Google Cloud Blog

この記事の Part 3 では、 Google が実装した組み込みシンクについてどのように exactly-once を実装しているのか書かれています。 BigQueryIO については、 BigQuery の streaming insert API に重複排除の仕組みが実装されておりBigQueryIO からはそれを利用していると書かれています。

つまり、あくまでも BigQuery 側に重複排除の仕組みがないと不可能なのです。

まとめ

Google Cloud Dataflow および Apache Beam には「副作用の重複排除」、つまり「変換を1回だけ行う」という仕組みはない。

API へのリクエストを通して外部へデータを書き込みたい場合は、API 側に重複排除の仕組みがあったり、重複してリクエストが送られてきても問題なく処理できる API になっている必要があります。

www.utakata.work