Kafka Streams DSLのうち、ステートレスな操作(branch,map,mergeなど)を実際に触り、動作を確認します。 また最後に、本稿で登場する関数を使用してストリーム処理のFizzBuzzを実装してみます。
前回の記事(準備編)のプロジェクトが作成済みである事を前提とします。
実際にやってみる Filter Java StreamのByPredicateと同じと思って差し支えありません。Java StreamのPredicateと紛らわしいのでimport対象に注意しましょう。
key, valueを引数にbooleanを返し、falseの場合はレコードが除外されます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.apache.kafka.streams.kstream.Predicate; ... private static Predicate<String, String> predicate = (key, value) -> value.startsWith("あ"); public static Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); builder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) //使用するデシリアライザにStringを明示指定します .filter(predicate) .to("output-topic"); return builder.build(); } ここでConsumed.with(...)が新たに登場しました。Predicateの引数がString型なので、デシリアライザも明示指定する必要があるためです。
また.filter((key, value)-> value.startsWith("あ"))のように直接ラムダ式を記述することももちろん可能です。
さて、テストを実行してみましょう。
1 2 3 4 5 6 7 @Test void test() { inputRecord("input-topic", "key1", "あけまして"); System....