Cloud Dataflow(Beam)で Pub/Sub からメッセージを受信して複数の動的な BigQuery テーブルへの書き出しを行う

Aug 7, 2018 ( Feb 11, 2022 更新 )

使用例

  • Cloud Pub/Sub からメッセージを取得する
  • 整形して BigQuery の行をつくる
  • 複数の BigQuery のテーブルに挿入する
    • テーブル名は Pub/Sub から取得したデータから動的に決定したい

という場合を想定しています。

                 ┌──────────────────────────┐                 
                 │                          │                 
                 │     get message from     │                 
                 │         Pub/Sub          │                 
                 │                          │                 
                 └──────────────────────────┘                 
                               │                              
                               ▼                              
                 ┌──────────────────────────┐                 
                 │                          │                 
                 │  create row of BigQuery  │                 
                 │                          │                 
                 └──────────────────────────┘                 
                               │                              
          ┌────────────────────┼────────────────────┐         
          │                    │                    │         
          ▼                    ▼                    ▼         
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│                  │ │                  │ │                  │
│  BigQuery table  │ │  BigQuery table  │ │  BigQuery table  │
│                  │ │                  │ │                  │
└──────────────────┘ └──────────────────┘ └──────────────────┘

Pub/Sub には以下のような JSON が Publish されます。

{"type": "EVENT_TYPE", "timestamp": <epochtime>, "payload": "something"}

DynamicDestinations を使う

BigQueryIO.writeTableRows().to(DynamicDestinations) を使うことによって、 出力先テーブル名を動的に決定することができます。

DynamicDestinations (Apache Beam 2.5.0)

以下はサンプルコードです。 Kotlin ですが Java の場合も大体同じ感じだと思います。

以下の例では、あらかじめ BigQuery 側のテーブルは作成してある想定で withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)を指定しています。

interface PubSubBQDynamicDestinationOptions: PipelineOptions {

    @get:Description("Pub/Sub topic")
    @get:Validation.Required
    var pubSubTopic: String
}

class EventLogDestination: DynamicDestinations<TableRow, String>() {

    companion object {
        val project = "projectName"
        val dataset = "detasetName"
        val prefix = "prefix_"
    }

    override fun getTable(destination: String?): TableDestination {
        return TableDestination(destination, "event log table for each event")
    }

    override fun getSchema(destination: String?): TableSchema {
        val schema = TableSchema()
        val fields = listOf(
                TableFieldSchema().setName("type").setType("STRING").setMode("REQUIRED"),
                TableFieldSchema().setName("payload").setType("STRING").setMode("REQUIRED"),
                TableFieldSchema().setName("timestamp").setType("INTEGER").setMode("REQUIRED")
        )
        schema.fields = fields
        return schema
    }

    override fun getDestination(element: ValueInSingleWindow<TableRow>?): String {
        val eventLogType = element!!.value!!.get("type") as String
        return "$project:$dataset.$prefix$type"
    }
}

class MessageToTableRow: DoFn<PubsubMessage, TableRow>() {
    @ProcessElement
    fun process(c: ProcessContext) {
        val messageStr = String(c.element().payload, StandardCharsets.UTF_8)
        val map = ObjectMapper().readValue(messageStr, MutableMap::class.java)
        val tr = TableRow()
        tr.set("timestamp", map["timestamp"])
        tr.set("payload", mapper.writeValueAsString(map["payload"]))
        tr.set("type", map["type"])
        c.output(tr)
    }
}

fun main(args: Array<String>) {
    val options = PipelineOptionsFactory.fromArgs(*args)
            .withValidation()
            .`as`(PubSubBQDynamicDestinationOptions::class.java)

    val p = Pipeline.create(options)

    p.apply("readMessagesFromTopics", PubsubIO.readMessages().fromTopic(options.pubSubTopic))
            // BigQuery の rowを作成
            .apply(ParDo.of(MessageToTableRow()))
            .apply("Fixed window", Window.into<TableRow>(FixedWindows.of(Duration.standardSeconds(30))))
            .apply("writeToBQ",
                    BigQueryIO.writeTableRows()
                            // DynamicDestinations インスタンスを与える
                            .to(EventLogDestination())
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))
    p.run()
}
Return to top