Beam のプログラミングモデル

Jul 31, 2018 ( Feb 11, 2022 更新 )

サンプルコードは Beam Programming Guide から引用したものです。

このメモの内容は、上記ドキュメントの内容の抜粋です。

コンセプト

  • Driver program
    • pipeline
      • inputs
      • transforms
      • outputs

Pipeline

  • すべての Driver program は Pipeline を作成しなければいけない
  • Pipeline にはどのように起動するかのオプションを与える

PCollection

  • パイプラインが扱うデータセットを表す
  • ファイルなどが入力になる bounded, subscription などの常に更新されるものが 入力になる unbounded がある

PTransform

  • データ処理を表す
  • PTransform は Pcollection を入力として受け取り、0かそれ以上の PCollection を出力する

I/O transforms

  • BEAMはいくつかの “IOs” を備えている。これは、 PTransform がデータをさまざまな外部のストレージシステムから読み込んだり書き込んだりできるようにする

Beam プログラムがすること

  • Pipeline を作って実行オプションをセットする。それには Pipeline Runner も含まれる

  • 最初の PCollection を作成する。 “IOs” からの外部データや、インメモリのデータから PCollection を作成する

  • それぞれの PCollection に PTransform を適用する。 PTransform は PCollection を変化させたり、グループさせたり、フィルタさせたり、分析したり、なにかしたらの処理を PCollection の要素に行う。

  • PTransform は何も入力の PCollection に手を加えずに新しい PCollection を生成することもできる。 PTransform が1つの直列の処理をするわけではなく複雑なグラフになる場合もある

  • 変更を加えた最終的な PCollection を外部ソースに書き出す処理を IOs を使って作成する

  • Pipeline Runner を使って Pipeline を実行する

  • Beam プログラムを作成すると、 PCollection をもとに Pipeline Runner は Workflow graph を作成する

  • この graph は適切な分散された処理バックエンドによって実行され、 バックエンドでは「非同期ジョブ」的に扱われる

Pipeline の作成

Beam を使うにはまず Pipeline クラスんもインスタンスを作成する。 Pipeline を作成するときには、いくつか設定オプションを与えなければいけない。

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

Pipeline オプションの設定

PipelineOptions を作ってフィールドに設定すると、 Beam SDK 内のコマンドラインパーサは Pipeline Options コマンドライン引数から設定してくれる。 コマンドラインからオプションをとって PipelineOptions をつくるには、以下のように書ける:

PipelineOptionsPipeli  options =
    PipelineOptionsFactory.fromArgs(args).withValidation().create();

これは、 --<option>=<value> で与えられたオプションを変換してくれる。

2. PCollection の作成

Bean の PCollection は Source API か、インメモリのコレクションから作成することができる。 前者は様々なクラウドベースのファイルやデータベースや subscription service に対応していて、後者はテストやデバッグ時に役立つ。

外部のソースから読み込むには、Beamの提供する I/O アダプタを使う。 これらは使用用途が異なるが、すべて何かしらの外部のデータソースから読み込んで PCollection を返す。この PCollection の要素はその外部データソースのレコードを表している。

それぞれのデータソースのアダプタは Read transform を持っている。データを読み込むには、この transform を Pipeline オブジェクトに apply する必要がある。

public static void main(String[] args) {
    // Create the pipeline.
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    // Create the PCollection 'lines' by applying a 'Read' transform.
    PCollection<String> lines = p.apply(
      "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
}

インメモリのコレクションを使う場合も、以下のように同様に apply できる:


publicpublic static void main(String[] args) {
    // Create a Java Collection, in this case a List of Strings.
    static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

    // Create the pipeline.
    PipelineOptions options =
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    // Apply Create, passing the list and the coder, to create the PCollection.
    p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
}

PCollection の特徴

  • PCollection の要素の型は何でもよいが、そのコレクション内の要素はすべて同じ型である必要がある
    • 分散処理のために、実際には個別の要素はバイト文字列であるが Beam SDK はデータエンコーディングの仕組み(カスタマイズ含む)を提供する
  • PCollection は不変である。一度作成したら追加、削除、更新はできない。
    • Beam Transform は PCollection のデータを読み込んで新しい PCollection を生成するかもしれないが、PCollection を変更したり消費することはない
  • Transform は PCollection の要素に独立してアクセスする

bounded

PCollection には bounded と unbounded なものがある。

  • bounded は有限で決まったサイズのセットを表す
  • ファイルやデータベースなど
  • Beam はバッチ処理を行う
    • すべてのデータを1度に読み込む場合もある

unbounded

  • unbounded は無限のサイズ
  • Pub/Sub や Kafka など
  • 継続するストリームジョブで処理を行う。コレクションすべてに対する処理は行われない
  • Beam は論理的な windowing を使ってデータセットを有限サイズにする。この論理的な window は timestamp など要素に基づく
  • Aggregation Transform は window 単位で行われる。あるデータセットが生成されるとき、それらの有限な window に先立って PCollection を処理する

要素の timestamp

PCollection の各要素は固有の timestamp を持っている。 timestamp は PCollection を作るデータソースが付与する。

unbounded な PCollection をつくるデータソースはそれぞれの新しい要素に対して その要素が読まれたり追加されたときの timestamp を付与する。

注意: bounded な PCollection も自動的に timestamp を付与するが、最もよくある振る舞いはすべての要素に同じ timestamp (Long.MIN_VALUE)を付与するというものである。

Transform

PCollection は apply メソッドをチェインすることができる。 以下の例では、処理を直列に実行することができる:

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])

同じ入力から別の PCollection を生成することもできる:

[[OutputOutput PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])

composite transform を使って1つの大きな処理を複数のサブステップに分けることもできる。

ParDo

ParDo は “Map” 処理と同様の処理パラダイム。 ParDo は 各要素を PCollection から受け取って、その要素に処理を加え、 0,1,あるいは複数の要素を PCollection の出力とする。

ParDo 処理をする場合は、 DoFn を実装する必要がある。

// The input PCollection of Strings.
PCollection<String> words = ...;

// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
    ParDo
    .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                            // we define above.

DoFn は1度に PCollection の1要素を処理する。 DoFn には型パラメータとして入力・出力の型を指定する。

static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

Dofn のサブクラスでは、処理を行うメソッドに @ProcessElement アノテーションを付与する。 @ProcessElement メソッドは ProcessContext オブジェクトを引数として受け取る。 ProcessContext オブジェクトは、入力要素と出力へのアクセスを提供する:

static class ComputeWordLengthFn extends DoFn<String, Integer> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    // Get the input element from ProcessContext.
    String word = c.element();
    // Use ProcessContext.output to emit the output element.
    c.output(word.length());
  }
}

注意: もし PCollection の入力が Key-Value のペアであった場合は、 ProcessContext.element().getKey()ProcessContext.element().getValue() メソッドを利用する。

DoFn インスタンスは1回以上要素に対して処理するために起動する。 ただし Beam はその起動回数を保証しない。 もしかしたら複数回あるワーカノードで起動してそれが失敗やリトライ処理である可能性もある。 そのような場合、処理メソッドに対する複数のコールにまたがって情報をキャッシュしておくことができる。 ただし、そのようにしても、実装は起動回数に依存しないことを確認しなければいけない

処理メソッド中では、いくつかの不変性を満たなければいけない:

  • ProcessContext.element()ProcessContext.sideInput() で返る要素を変更していはいけない
  • 1度 ProcessContext.output()ProcessContext.sideOutput() で出力したら、その値を変更することはできない

以下のように無名インナークラスを使うことによって DoFn を使うこともできる:

// The input PCollection.
PCollection<String> words = ...;

// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  "ComputeWordLengths",                     // the transform name
  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner class instance
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(c.element().length());
      }
    }));

同様に MapElements transform を使うこともできる。 MapElements transform は Java の lambda function を使うことができる:

// The input PCollection.// The 
PCollection<String> words = ...;

// Apply a MapElements with an anonymous lambda function to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  MapElements.into(TypeDescriptors.integers())
             .via((String word) -> word.length()));

Java の lambda function は Filter, FlatMapElements, Patition などで使うことができる。

GroupByKey

GroupByKey の入力は、multimap(複数のペアで同じキーがあり、異なる値を持つ)で、 出力としてそれぞれのユニークなキーにひもづく値を集めることができる。

cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...

unbounded な PCollection を使うときは、 GroupByKey か CoGroupByKey を使うために non-glocal windowing か aggregation trigger を使わなければいけない。 bounded GroupByKey か CoGroupByKey は 収集する特定のキーを使ってすべてのデータを待つが、 unbounded コレクションは無限である。そのため、 windowing や trigger に論理的にグルーピングをさせて、 unbounded なデータストリームから有限のデータの集まりを扱う。

GroupByKey や CoGroupByKey を windowing strategy を適用した PCollection をグルーピングするために使う時は、グルーピングしたいすべての PCollection は同じ windowing strategy と window sizing を使わなければいけない。

CoGroupByKey

CoGroupByKey は2つ以上の key/value の 同じ key の型を持つ PCollection をリレーショナルジョインさせる。

CoGroupByKey を使う例は以下のようなものである。 あるファイルにユーザ名とメールアドレスがあって、もう一方のファイルにユーザ名と電話番号があるとする。 ここでユーザ名を共通のキーとしてほかのデータを関連付けられた値として join することができる。

finalfinal  ListList<<KVKV<<StringString, String>> emailsList =
    Arrays.asList(
        KV.of("amy", "amy@example.com"),
        KV.of("carl", "carl@example.com"),
        KV.of("julia", "julia@example.com"),
        KV.of("carl", "carl@email.com"));

final List<KV<String, String>> phonesList =
    Arrays.asList(
        KV.of("amy", "111-222-3333"),
        KV.of("james", "222-333-4444"),
        KV.of("amy", "333-444-5555"),
        KV.of("carl", "444-555-6666"));

PCollection<KV<String, String>> emails = p.apply("CreateEmails", Create.of(emailsList));
PCollection<KV<String, String>> phones = p.apply("CreatePhones", Create.of(phonesList));

final TupleTagTupleT <String> emailsTag = new TupleTag<>();
final TupleTag<String> phonesTag = new TupleTag<>();

PCollection<KV<String, CoGbkResult>> results =
    KeyedPCollectionTuple.of(emailsTag, emails)
        .and(phonesTag, phones)
        .apply(CoGroupByKey.create());

PCollection<String> contactLines =
    results.apply(
        ParDo.of(
            new DoFn<KV<String, CoGbkResult>, String>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                KV<String, CoGbkResult> e = c.element();
                String name = e.getKey();
                Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
                Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
                String formattedResult =
                    Snippets.formatCoGbkResults(name, emailsIter, phonesIter);
                c.output(formattedResult);
              }
            }));

上記の例の結果は以下と同様になる:

final List<String> formattedResults =
    Arrays.asList(
        "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']",
        "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']",
        "james; []; ['222-333-4444']",
        "julia; ['julia@example.com']; []");

Combine

Flatten

Flatten は 同じ型を持つ PCollection に対して整形をする。 Flatten は 複数の PCollection オブジェクトを1つの論理的な PCollectionに変換する。

// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

Partition

Partition は 同じ型の要素を持つ PCollection を整形する。 Partition は1つの PCollection を固定の数の小さなコレクションに分割する。

// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the
// partitioning function. In this example, we define the PartitionFn in-line. Returns a PCollectionList
// containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
    students.apply(Partition.of(10, new PartitionFn<Student>() {
        public int partitionFor(Student student, int numPartitions) {
            return student.getPercentile()  // 0..99
                 * numPartitions / 100;
        }}));

// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);

Beam Transform のコードを書く時に必要なこと

Beam の Transform のコードは複数のマシン上で並列に、独立的に稼働する。 そしてそれらはお互いにコミュニケーションをとることはない。 選んだ Pipeline や バックエンドによって、書いたコードはリトライ処理のために複数回実行される可能性がある。 そのため、コードの状態依存には注意しなければいけない。

一般的には、以下の要求を満たさなければいけない:

  • Function Object が Serializable であること
  • Function Object が スレッドに対応していること。Beam SDKはスレッドセーフではない

加えて、 function object は冪等であることが推奨されている。

これらの要求は、 DoFn, CombineFn, WindoeFn のサブクラスに適用される。

Serializability

function object は fully serializable でなければいけない。 その function のコピーは serialized されて処理するクラスタのリモートワーカへ転送されるからである。 Dofn, CombineFn, WindowFn などのベースクラスはすでに Serializable を実装しているが、そのサブクラスは non-serializable なメンバを追加してはいけない。

  • transient フィールドはワーカインスタンスに転送されない
  • シリアライゼーションの前に大量のデータをフィールドにロードすることを避ける
  • function objkect の独立したインスタンスはデータを共有しない
  • 適用された後の function object の変更では何も起こらない
  • 匿名インナークラスインスタンスを使って function object を宣言する際には注意する。 non-static context では、インナークラスインスタンスは暗黙的に内包しているクラスへのポインタとそのクラスの状態を持っている。 その内包しているクラスもシリアライズされ、このように function class への適用自体と outer class への適用についても考慮する必要がある。

Thread-compatibility

冪等性

Side inputs

Return to top