KotlinでCloud Dataflowのバッチ処理を書く

Aug 3, 2018 ( Feb 11, 2022 更新 )

Cloud Dataflow の処理を Kotlin で書いてみたかったので、 公式ドキュメントにあるサンプル「WordCount」を Kotlin に移植してみました。1

build.gradle

依存関係を元の WordCount から持ってきているだけです。 あとは一般的な Kotlin アプリケーションのビルドの設定と変わりません。

// build.gradle

buildscript {
    ext {
        kotlinVersion = '1.2.51'
        springBootVersion = '2.1.0.BUILD-SNAPSHOT'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion")
    }
}

apply plugin: 'kotlin'
apply plugin: 'idea'
apply plugin: 'application'

group 'com.github.mookjp'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
    // For Java class reflection
    compile "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"

    compile 'com.google.cloud.dataflow:google-cloud-dataflow-java-sdk-all:2.1.0'

    // For logging
    compile 'org.slf4j:slf4j-api:1.7.14'
    compile 'org.slf4j:slf4j-jdk14:1.7.14'

    testCompile 'junit:junit:4.12'
    testCompile 'org.hamcrest:hamcrest-all:1.3'
}

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}

jar {
    manifest {
        attributes "Main-Class" : "com.github.mookjp.pg.WordCountKt"
    }
    from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}

mainClassName = "com.github.mookjp.pg.WordCountKt"
run {
    if (project.hasProperty('args')) {
        args project.args.split('\\s')
    }
}

run コマンドでの起動は以下のブログ記事を参考にさせてもらいました。

ビルドツールをGradleに乗り換えた (Kotlinによる開発, 外部依存jarの生成方法も記載) - 日記マン

Kotlin のコード

Kotlin のコードは以下のようになりました。

以下をプロパティの記述にしたので少し書き方が変わってるくらいで、

interface WordCountOptions: PipelineOptions {

    @get:Description("Path of the file to read from")
    @get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
    var inputFile: String

    @get:Description("Path of the file to write to")
    @get:Validation.Required
    var output: String
}

他はほとんど元のコードと変わっていません…。 もう少し Kotlin らしい書き方ができるかもしれません。

package com.github.mookjp.pg

import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.metrics.Counter
import org.apache.beam.sdk.metrics.Metrics
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.*
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection

interface WordCountOptions: PipelineOptions {

    @get:Description("Path of the file to read from")
    @get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
    var inputFile: String

    @get:Description("Path of the file to write to")
    @get:Validation.Required
    var output: String
}

class ExtractWordsFn: DoFn<String, String>() {
    private val emptyLines: Counter = Metrics.counter(ExtractWordsFn::class.java, "emptyLines")

    @ProcessElement
    fun processElement(c: ProcessContext) {
        if (c.element().trim().isEmpty()) {
            emptyLines.inc()
        }

        // tokenizer pattern
        val words = c.element().split("[^\\p{L}]+")
        words.forEach {
            if (!it.isEmpty()) {
                c.output(it)
            }
        }

    }
}

class FormatAsTextFn: SimpleFunction<KV<String, Long>, String>() {
    override fun apply(input: KV<String, Long>): String {
        return "${input.key}: ${input.value}"
    }
}

class CountWords: PTransform<PCollection<String>, PCollection<KV<String, Long>>>() {
    override fun expand(lines: PCollection<String>): PCollection<KV<String, Long>> {
        val words = lines.apply(ParDo.of(ExtractWordsFn()))
        val wordCounts = words.apply(Count.perElement())
        return wordCounts
    }
}

fun main(args: Array<String>) {
    val options = PipelineOptionsFactory.fromArgs(*args)
            .withValidation()
            .`as`(WordCountOptions::class.java)
    val p = Pipeline.create(options)

    p.apply("ReadLines", TextIO.read().from(options.inputFile))
            .apply(CountWords())
            .apply(MapElements.via(FormatAsTextFn()))
            .apply("writeCount", TextIO.write().to(options.output))
    p.run().waitUntilFinish()
}

実行

以下のようなコマンドで実行できます。

./gradlew run -Pargs="--project=<project-name>
    --stagingLocation=gs://<path-to-staging-location>
    --output=gs://<path-to-output-location>
    --runner=DataflowRunner
    --jobName=<jobname>"

  1. Apache Beam のドキュメントが厚すぎて現実逃避してしまいました…。 ↩︎

Return to top