Kafka Streams DSLのうち、ステートフルな操作(join,reduce,aggregate,windowingなど)を実際に触り、動作を確認します。

また最後に、本稿と前回で登場した関数を使用してステートフルなストリームFizzBuzzを実装してみます。

実際にやってみる

前々回の記事(準備編)のプロジェクトが作成済みである事を前提とします。

KTable

まずはじめに、KTable,KGroupedStreamについて知っておく必要があります。 KGroupedStreamはkeyの値毎にグループ化されたKStreamで、KTableはkeyとvalueの最新状態を保持するテーブルとして扱えるものです。
KTableはnew StreamsBuilder().table("topic-name")...のように直接トピックから生成したり、KGroupedStreamを集約して生成したりと様々なルートで生成することができます。
公式ドキュメントの以下の図が非常に分かりやすいです。

画像リンク元ページ

Aggregate

KGroupedStreamをkeyごとに集約し、KTableに変換します。 コードと実行結果を見るのが一番早いと思います。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  private static Initializer<String> initializer = () -> "InitVal";

  private static Aggregator<String, String, String> aggregator =
      (key, val, agg) -> agg + " & " + val;

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream("input-topic")
        .groupByKey()
        .aggregate(initializer, aggregator)
        .toStream() // KTableの更新履歴をストリームとして取り出す
        .to("output-topic");

    return builder.build();
  }

Initializerは初期値を返す関数で、Java streamで言うとSupplierです。
Aggregatorkey,value,現在のステートの3つを引数として受け取り、新たなステートを生成するための関数です。 aggregateの結果はKTableですが、更新結果を確認するためにtoStream()を使います。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  @Test
  void test() throws InterruptedException {
    inputRecord("input-topic", "key1", "hoge");
    inputRecord("input-topic", "key1", "fuga");
    inputRecord("input-topic", "key2", "foo");
    inputRecord("input-topic", "key2", "bar");

    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
3
4
topic=output-topic, key=key1, value=InitVal & hoge
topic=output-topic, key=key1, value=InitVal & hoge & fuga
topic=output-topic, key=key2, value=InitVal & foo
topic=output-topic, key=key2, value=InitVal & foo & bar

keyに対応するvalueが、aggregatorに記述した通りにどんどん連結されていっていますね。

簡易版のような機能としてreducecountも用意されています。 reduceはinitializerが無いaggregateのようなもので、特定のkeyに対して最初に来たvalueが初期値になります。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private static Reducer<String> reducer = (val, agg) -> agg + " & " + val;

...

  builder
    .stream("input-topic")
    .groupByKey()
    .reduce(reducer)
    .toStream()
    .to("output-topic");

...
1
2
key=key1, value=hoge
key=key1, value=hoge & fuga

count

簡易版aggregateで、keyに対するレコード数をカウントするKTableを生成します。
元のレコード内容は失われてしまうので、throughなどで別のストリームを生やして使うのが一般的かと思います。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream("input-topic");
        .groupByKey()
        .count()
        .toStream()
        .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

    return builder.build();
  }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  private ProducerRecord<String, Long> getOutputRecord(String topicName) {
    return testDriver.readOutput(topicName, Serdes.String().deserializer(),
        Serdes.Long().deserializer());
  }

  @Test
  void test() throws InterruptedException {
    inputRecord("input-topic", "key1", "hoge");
    inputRecord("input-topic", "key1", "fuga");
    inputRecord("input-topic", "key2", "foo");
    inputRecord("input-topic", "key2", "bar");

    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
3
4
topic=output-topic, key=key1, value=1
topic=output-topic, key=key1, value=2
topic=output-topic, key=key2, value=1
topic=output-topic, key=key2, value=2

Windowing

ストリームを時間枠で切り取ります。Tubling, Hopping, Sliding, Sessionの4種類がありここでは紹介しきれないため、公式ドキュメントの図を見ていただくのが最もわかりやすいかと思います。

今回は1000msのTumblingでレコードをcountしてみます。 windowedByを行うと、Windowedオブジェクトがkeyとして使用されます。 試しに.toString()でkeyの内容も確認してみましょう

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream("input-topic")
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
        .count()
        .toStream()
        .selectKey((key, value) -> key.toString())
        .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

    return builder.build();
  }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  @Test
  void test() throws InterruptedException {
    inputRecord("input-topic", "key1", "hoge");
    inputRecord("input-topic", "key1", "fuga");
    inputRecord("input-topic", "key1", "foo");
    Thread.sleep(1000);
    inputRecord("input-topic", "key1", "bar");

    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
3
4
topic=output-topic, key=[key1@1559886985000/1559886986000], value=1
topic=output-topic, key=[key1@1559886985000/1559886986000], value=2
topic=output-topic, key=[key1@1559886985000/1559886986000], value=3
topic=output-topic, key=[key1@1559886986000/1559886987000], value=1

keyが{元のkey値}@{window開始時刻}/{window終了時刻}になっていますね。 そしてhoge``fuga``fooは同一window内でカウントされ、時間が開いたbarは次のwindowに入っています。

Join

同一のkeyを持つ2つのレコードを組み合わせ、新たなストリームを生成します。 KStream同士、KStreamとKTable、KTable同士それぞれで実行できます。 例としてKStream同士のjoinを行ってみます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  private static ValueJoiner<String, String, String> joiner = (left, right) -> left + " & " + right;

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> streamA = builder.stream("input-topicA");
    KStream<String, String> streamB = builder.stream("input-topicB");    
        
    streamA
        .join(streamB, joiner, JoinWindows.of(Duration.ofMillis(100)))
        .to("output-topic");

    return builder.build();
  }

ValueJoinerは2つのvalueから新たなvalueを生成する関数です。

KStream同士をjoinする場合は、JoinWindowsで2つのレコードが揃うまでの待ち受け時間を定義します。相手がKTableの場合はkeyに対する最後(最新)のvalueの値がjoinされるため、Windowの定義は不要です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  @Test
  void test() throws InterruptedException {
    inputRecord("input-topicA", "key1", "hoge");
    inputRecord("input-topicB", "key1", "fuga");
    inputRecord("input-topicA", "key2", "foo");
    Thread.sleep(200);
    inputRecord("input-topicB", "key2", "bar");

    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
topic=output-topic, key=key1, value=hoge & fuga
null

key1を持つhogefugaが結合しています。また、foobarはJoinWindow内に収まっていないため、joinが実施されていません。

joinの他にleftJoinouterJoinが用意されています。KSQL同様、リレーショナルDBの感覚ですね。

ステートフルFizzBuzzを実装する

さて、長くなりましたがKafka Streams DSLの大方を触ってみました。
これまで見てきた物を使って、ステートフルなFizzBuzzを実装してみましょう。 以下の要件でやってみます。

  • 自然数をconsumeする度に加算していき、総和が3の倍数ならFizz, 5の倍数ならBuzz, 15の倍数ならFizzBuzz、それ以外なら現在の総和をoutput-topicにproduceする。
  • FizzBuzzが出力されるか、5秒以上入力レコードが来なければ総和を0に戻す
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  private static Aggregator<String, Integer, String> aggregator = (k, v, a) -> {
    Integer sum;
    try {
      sum = Integer.valueOf(a);
    } catch (NumberFormatException e) {
      sum = 0;
    }
    sum += v;
    if (sum % 15 == 0) {
      return "FizzBuzz (" + sum + ")";
    } else if (sum % 3 == 0) {
      return "Fizz (" + sum + ")";
    } else if (sum % 5 == 0) {
      return "Buzz (" + sum + ")";
    }
    return sum.toString();
  };

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.Integer()))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
        .windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
        .aggregate(() -> "0", aggregator)
        .toStream()
        .selectKey((k, v) -> k.toString())
        .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
  }

あまり美しくはないですが、最も単純に「前回の計算結果をそのままステートにする」実装にしてみました。 是非実際のKafka環境をセットアップして遊んでみてください。

Consume/Publishで実装するとちょっと面倒な要件も、非常にシンプルに記述できることが実感できたかと思います。 これで完結となります、ご覧いただきありがとうございました!