使用例
- Cloud Pub/Sub からメッセージを取得する
- 整形して BigQuery の行をつくる
- 複数の BigQuery のテーブルに挿入する
- テーブル名は Pub/Sub から取得したデータから動的に決定したい
という場合を想定しています。
┌──────────────────────────┐
│ │
│ get message from │
│ Pub/Sub │
│ │
└──────────────────────────┘
│
▼
┌──────────────────────────┐
│ │
│ create row of BigQuery │
│ │
└───────────────â──────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ │ │ │
│ BigQuery table │ │ BigQuery table │ │ BigQuery table │
│ │ │ │ │ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Pub/Sub には以下のような JSON が Publish されます。
{"type": "EVENT_TYPE", "timestamp": <epochtime>, "payload": "something"}
DynamicDestinations を使う
BigQueryIO.writeTableRows().to(DynamicDestinations)
を使うことによって、
出力先テーブル名を動的に決定することができます。
DynamicDestinations (Apache Beam 2.5.0)
以下はサンプルコードです。 Kotlin ですが Java の場合も大体同じ感じだと思います。
以下の例では、あらかじめ BigQuery 側のテーブルは作成してある想定で
withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
を指定しています。
interface PubSubBQDynamicDestinationOptions: PipelineOptions {
@get:Description("Pub/Sub topic")
@get:Validation.Required
var pubSubTopic: String
}
class EventLogDestination: DynamicDestinations<TableRow, String>() {
companion object {
val project = "projectName"
val dataset = "detasetName"
val prefix = "prefix_"
}
override fun getTable(destination: String?): TableDestination {
return TableDestination(destination, "event log table for each event")
}
override fun getSchema(destination: String?): TableSchema {
val schema = TableSchema()
val fields = listOf(
TableFieldSchema().setName("type").setType("STRING").setMode("REQUIRED"),
TableFieldSchema().setName("payload").setType("STRING").setMode("REQUIRED"),
TableFieldSchema().setName("timestamp").setType("INTEGER").setMode("REQUIRED")
)
schema.fields = fields
return schema
}
override fun getDestination(element: ValueInSingleWindow<TableRow>?): String {
val eventLogType = element!!.value!!.get("type") as String
return "$project:$dataset.$prefix$type"
}
}
class MessageToTableRow: DoFn<PubsubMessage, TableRow>() {
@ProcessElement
fun process(c: ProcessContext) {
val messageStr = String(c.element().payload, StandardCharsets.UTF_8)
val map = ObjectMapper().readValue(messageStr, MutableMap::class.java)
val tr = TableRow()
tr.set("timestamp", map["timestamp"])
tr.set("payload", mapper.writeValueAsString(map["payload"]))
tr.set("type", map["type"])
c.output(tr)
}
}
fun main(args: Array<String>) {
val options = PipelineOptionsFactory.fromArgs(*args)
.withValidation()
.`as`(PubSubBQDynamicDestinationOptions::class.java)
val p = Pipeline.create(options)
p.apply("readMessagesFromTopics", PubsubIO.readMessages().fromTopic(options.pubSubTopic))
// BigQuery の rowを作成
.apply(ParDo.of(MessageToTableRow()))
.apply("Fixed window", Window.into<TableRow>(FixedWindows.of(Duration.standardSeconds(30))))
.apply("writeToBQ",
BigQueryIO.writeTableRows()
// DynamicDestinations インスタンスを与える
.to(EventLogDestination())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))
p.run()
}