Kafka Streams DSLのうち、ステートレスな操作(branch,map,mergeなど)を実際に触り、動作を確認します。 また最後に、本稿で登場する関数を使用してストリーム処理のFizzBuzzを実装してみます。

前回の記事(準備編)のプロジェクトが作成済みである事を前提とします。

実際にやってみる

Filter

Java StreamのByPredicateと同じと思って差し支えありません。Java StreamのPredicateと紛らわしいのでimport対象に注意しましょう。

key, valueを引数にbooleanを返し、falseの場合はレコードが除外されます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import org.apache.kafka.streams.kstream.Predicate;
...

  private static Predicate<String, String> predicate = (key, value) -> value.startsWith("あ");

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) //使用するデシリアライザにStringを明示指定します
        .filter(predicate)
        .to("output-topic");

    return builder.build();
  }

ここでConsumed.with(...)が新たに登場しました。Predicateの引数がString型なので、デシリアライザも明示指定する必要があるためです。

また.filter((key, value)-> value.startsWith("あ"))のように直接ラムダ式を記述することももちろん可能です。

さて、テストを実行してみましょう。

1
2
3
4
5
6
7
  @Test
  void test() {
    inputRecord("input-topic", "key1", "あけまして");
    System.out.println(getOutputRecord("output-topic"));
    inputRecord("input-topic", "key1", "おめでとう");
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
topic=output-topic, key=key1, value=あけまして
null

“あ"から始まるvalueを持つレコードだけがfilterを通過することが確認できました。

Branch

Java関数型には今の所存在しない機能です。 複数のkstream.Predicateを引数に持ち、ストリームを分岐させることができます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  private static Predicate<String, String> predicateA = (key, value) -> value.startsWith("あ");
  private static Predicate<String, String> predicateB = (key, value) -> value.startsWith("い");

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    @SuppressWarnings("unchecked")
    KStream<String, String>[] branchedStream = builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
        .branch(predicateA, predicateB);

    branchedStream[0].to("output-topicA");
    branchedStream[1].to("output-topicB");

    return builder.build();
  }

branchを用いるとKStream型の配列を得ることができます。一番目のPredicate(predicateA)を通過したレコードは[0]、一番目は通過せず二番目(predicateB)を通過したPredicateが[1]に流れていきます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
  @Test
  void test() {
    inputRecord("input-topic", "key1", "あひる");
    inputRecord("input-topic", "key1", "いんこ");
    inputRecord("input-topic", "key1", "からす");
    System.out.println(getOutputRecord("output-topicA"));
    System.out.println(getOutputRecord("output-topicA"));
    System.out.println(getOutputRecord("output-topicB"));
    System.out.println(getOutputRecord("output-topicB"));
  }
1
2
3
4
topic=output-topicA, key=key1, value=あひる
null
topic=output-topicB, key=key1, value=いんこ
null

“あひる"はoutput-topicA、“いんこ"はoutput-topicB、“からす"は除外される事が確認できました。 この記事の後半に登場するmergeと供に、非常に便利な機能です。

Map

レコードの内容を変換する関数です。 key,valueを引数にとるKeyValueMapper、valueのみを引数にとるValueMapperが用意されており、それぞれJava StreamのBiFunctionFunctionとほぼ同じです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  private static KeyValueMapper<String, String, KeyValue<String, String>> keyValueMapper =
      (key, value) -> KeyValue.pair(key + "_hoge", value + "_fuga");

  private static ValueMapper<String, String> valueMapper = (value) -> value + "_foo";

  private static KeyValueMapper<String, String, String> keyMapper =
      (key, value) -> key + "_" + value + "_bar";

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

    stream
        .map(keyValueMapper)
        .to("output-topicA");

    stream
        .mapValues(valueMapper)
        .to("output-topicB");

    stream
        .selectKey(keyMapper)
        .to("output-topicC");

    return builder.build();
  }

ここでは、keyとvalueを両方変換(output-topicA)、valueのみ変換(output-topicB)、keyのみ変換(output-topicC)の3パターンを試してみます。 それぞれ.map,.mapValues,.selectKeyで呼び出しますが、.selectKeyの引数はKeyValueMapperなのが少し紛らわしいところです。

keyとvalueを両方変換する場合、KeyValueMapperの帰り値はKeyValue型を使用します。入力と異なる型を返す場合は終端処理.toでシリアライザを指定する必要があります。
(例:.to("output-topic",Produced.with(Serdes.Integer(),Serdes.Integer()))

これまでkeyに関して何も触れて来ませんでしたが、ステートフルな処理で多用するためその折に解説します。またKafkaクラスタを組んだ際、レコードがどのノードに送られるかを決定する値でもあります。

1
2
3
4
5
6
7
  @Test
  void test() {
    inputRecord("input-topic", "key1", "value1");
    System.out.println(getOutputRecord("output-topicA"));
    System.out.println(getOutputRecord("output-topicB"));
    System.out.println(getOutputRecord("output-topicC"));
  }
1
2
3
topic=output-topicA, key=key1_hoge, value=value1_fuga
topic=output-topicB, key=key1, value=value1_foo
topic=output-topicC, key=key1_value1_bar, value=value1

key&value、valueのみ、keyのみの変換がそれぞれ行われている事が確認できました。

FlatMap

Java Streamと同様にflatMapも用意されています。 1つのレコードをIterableな値に変換する事で、レコードごと複数に分割する機能です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  private static ValueMapper<String, List<String>> valueMapper =
      (value) -> Arrays.asList(value.split(":"));

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

    stream
        .flatMapValues(valueMapper)
        .to("output-topic");

    return builder.build();
  }
1
2
3
4
5
6
7
  @Test
  void test() {
    inputRecord("input-topic", "key1", "あ:い:う");
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
3
topic=output-topic, key=key1, value=あ
topic=output-topic, key=key1, value=い
topic=output-topic, key=key1, value=う

ForeachAction

Java StreamのBiConsumerと同等で、戻り値のない処理を行います。 .peekでは、レコードをread onlyとして何らかの処理を行います。(主にロギングに使用されるかと思います) .forEach.toと同様の終端処理として機能します。 終端処理が実施されるとKafkaにおいてそのレコードのConsumeが完了したと見なされます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  private static ForeachAction<String, String> foreachAction =
      (key, value) -> System.out.println("print (" + key + " : " + value + ")");

  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

    stream
        .peek(foreachAction)
        .map((key, value) -> KeyValue.pair(key + "_changed", value + "_changed"))
        .foreach(foreachAction);

    return builder.build();
  }
1
2
3
4
  @Test
  void test() {
    inputRecord("input-topic", "key1", "ほげほげ");
  }
1
2
print (key1 : ほげほげ)
print (key1_changed : ほげほげ_changed)

レコードの内容がコンソールに出力されました。またforeachによってレコードのConsumeは完了し、Kafkaのオフセットが進行します。

through

レコードの内容をトピックに書き込み、さらに後続処理に渡します。 処理途中のデータから別のストリームを開始したい場合に使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

    stream
        .through("output-topicA")
        .map((key, value) -> KeyValue.pair(key + "_changed", value + "_changed"))
        .to("output-topicB");

    return builder.build();
  }
1
2
3
4
5
6
  @Test
  void test() {
    inputRecord("input-topic", "key1", "ほげほげ");
    System.out.println(getOutputRecord("output-topicA"));
    System.out.println(getOutputRecord("output-topicB"));
  }
1
2
topic=output-topicA,  key=key1, value=ほげほげ
topic=output-topicB,  key=key1_changed, value=ほげほげ_changed

1つのレコードが形を変え2つのトピックに送られました。

merge

2つのストリームを1つに結合します。
とりあえず実行してみましょう。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> streamA = builder
        .stream("input-topicA", Consumed.with(Serdes.String(), Serdes.String()));

    KStream<String, String> streamB = builder
        .stream("input-topicB", Consumed.with(Serdes.String(), Serdes.String()));

    streamA
        .merge(streamB)
        .to("output-topic");

    return builder.build();
  }
1
2
3
4
5
6
7
  @Test
  void test() {
    inputRecord("input-topicA", "key1", "ほげほげ");
    inputRecord("input-topicB", "key1", "ふがふが");
    System.out.println(getOutputRecord("output-topic"));
    System.out.println(getOutputRecord("output-topic"));
  }
1
2
topic=output-topic, key=key1, value=ほげほげ
topic=output-topic, key=key1, value=ふがふが

異なるトピックに入れたレコードが一つのストリームになりました。

Kafka Streams FizzBuzzを実装する

最後に、これまでの知識を無駄に使って冗長なFizzBuzzを実装してみましょう。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  @SuppressWarnings("unchecked")
  public static Topology getTopology() {
    StreamsBuilder builder = new StreamsBuilder();

    KStream<Integer, String> inputStream = builder
        .stream("input-topic", Consumed.with(Serdes.String(), Serdes.Integer()))
        .map((key, value) -> KeyValue.pair(value, ""));
    
    KStream<Integer, String> branchFizz[] = inputStream
        .branch((key, value) -> key % 3 == 0,
            (key, value) -> true);

    KStream<Integer, String> branchBuzz[] = branchFizz[0]
        .mapValues(value->"Fizz")
        .merge(branchFizz[1])
        .branch((key, value) -> key % 5 == 0,
            (key, value) -> true);
    
    KStream<Integer, String> branchFizzBuzz[] = branchBuzz[0]
        .mapValues(value -> value + "Buzz")
        .merge(branchBuzz[1])
        .branch((key,value)->value.equals(""),
            (key,value)->true);
    
    branchFizzBuzz[0]
        .mapValues((key, value) -> String.valueOf(key))
        .merge(branchFizzBuzz[1])
        .to("output-topic", Produced.with(Serdes.Integer(), Serdes.String()));

    return builder.build();
  }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
  private void inputRecord(String topicName, String key, Integer value) {
    testDriver.pipeInput(
        new ConsumerRecordFactory<String, Integer>(Serdes.String().serializer(),
            Serdes.Integer().serializer()).create(topicName, key, value));
  }

  private ProducerRecord<Integer, String> getOutputRecord(String topicName) {
    return testDriver.readOutput(topicName, Serdes.Integer().deserializer(),
        Serdes.String().deserializer());
  }

  @Test
  void test() {
    for (int i = 1; i <= 30; i++) {
      inputRecord("input-topic", "key1", i);
      System.out.println(getOutputRecord("output-topic").value());
    }
  }

いかがでしたでしょうか。
次回はいよいよステートフルな処理を試してみようと思います。