Cloud Dataflow の処理を Kotlin で書いてみたかったので、 公式ドキュメントにあるサンプル「WordCount」を Kotlin に移植してみました。1
build.gradle
依存関係を元の WordCount から持ってきているだけです。 あとは一般的な Kotlin アプリケーションのビルドの設定と変わりません。
// build.gradle
buildscript {
ext {
kotlinVersion = '1.2.51'
springBootVersion = '2.1.0.BUILD-SNAPSHOT'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion")
}
}
apply plugin: 'kotlin'
apply plugin: 'idea'
apply plugin: 'application'
group 'com.github.mookjp'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
// For Java class reflection
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"
compile 'com.google.cloud.dataflow:google-cloud-dataflow-java-sdk-all:2.1.0'
// For logging
compile 'org.slf4j:slf4j-api:1.7.14'
compile 'org.slf4j:slf4j-jdk14:1.7.14'
testCompile 'junit:junit:4.12'
testCompile 'org.hamcrest:hamcrest-all:1.3'
}
compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
jar {
manifest {
attributes "Main-Class" : "com.github.mookjp.pg.WordCountKt"
}
from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
mainClassName = "com.github.mookjp.pg.WordCountKt"
run {
if (project.hasProperty('args')) {
args project.args.split('\\s')
}
}
run コマンドでの起動は以下のブログ記事を参考にさせてもらいました。
ビルドツールをGradleに乗り換えた (Kotlinによる開発, 外部依存jarの生成方法も記載) - 日記マン
Kotlin のコード
Kotlin のコードは以下のようになりました。
以下をプロパティの記述にしたので少し書き方が変わってるくらいで、
interface WordCountOptions: PipelineOptions {
@get:Description("Path of the file to read from")
@get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
var inputFile: String
@get:Description("Path of the file to write to")
@get:Validation.Required
var output: String
}
他はほとんど元のコードと変わっていません…。 もう少し Kotlin らしい書き方ができるかもしれません。
package com.github.mookjp.pg
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.metrics.Counter
import org.apache.beam.sdk.metrics.Metrics
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.*
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
interface WordCountOptions: PipelineOptions {
@get:Description("Path of the file to read from")
@get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
var inputFile: String
@get:Description("Path of the file to write to")
@get:Validation.Required
var output: String
}
class ExtractWordsFn: DoFn<String, String>() {
private val emptyLines: Counter = Metrics.counter(ExtractWordsFn::class.java, "emptyLines")
@ProcessElement
fun processElement(c: ProcessContext) {
if (c.element().trim().isEmpty()) {
emptyLines.inc()
}
// tokenizer pattern
val words = c.element().split("[^\\p{L}]+")
words.forEach {
if (!it.isEmpty()) {
c.output(it)
}
}
}
}
class FormatAsTextFn: SimpleFunction<KV<String, Long>, String>() {
override fun apply(input: KV<String, Long>): String {
return "${input.key}: ${input.value}"
}
}
class CountWords: PTransform<PCollection<String>, PCollection<KV<String, Long>>>() {
override fun expand(lines: PCollection<String>): PCollection<KV<String, Long>> {
val words = lines.apply(ParDo.of(ExtractWordsFn()))
val wordCounts = words.apply(Count.perElement())
return wordCounts
}
}
fun main(args: Array<String>) {
val options = PipelineOptionsFactory.fromArgs(*args)
.withValidation()
.`as`(WordCountOptions::class.java)
val p = Pipeline.create(options)
p.apply("ReadLines", TextIO.read().from(options.inputFile))
.apply(CountWords())
.apply(MapElements.via(FormatAsTextFn()))
.apply("writeCount", TextIO.write().to(options.output))
p.run().waitUntilFinish()
}
実行
以下のようなコマンドで実行できます。
./gradlew run -Pargs="--project=<project-name>
--stagingLocation=gs://<path-to-staging-location>
--output=gs://<path-to-output-location>
--runner=DataflowRunner
--jobName=<jobname>"
-
Apache Beam のドキュメントが厚すぎて現実逃避してしまいました…。 ↩︎