Kafka Streams DSLを一通り体験する (3. ステートフル処理実践編)

Kafka Streams DSLのうち、ステートフルな操作(join,reduce,aggregate,windowingなど)を実際に触り、動作を確認します。 また最後に、本稿と前回で登場した関数を使用してステートフルなストリームFizzBuzzを実装してみます。 実際にやってみる 前々回の記事(準備編)のプロジェクトが作成済みである事を前提とします。 KTable まずはじめに、KTable,KGroupedStreamについて知っておく必要があります。 KGroupedStreamはkeyの値毎にグループ化されたKStreamで、KTableはkeyとvalueの最新状態を保持するテーブルとして扱えるものです。 KTableはnew StreamsBuilder().table("topic-name")...のように直接トピックから生成したり、KGroupedStreamを集約して生成したりと様々なルートで生成することができます。 公式ドキュメントの以下の図が非常に分かりやすいです。 画像リンク元ページ Aggregate KGroupedStreamをkeyごとに集約し、KTableに変換します。 コードと実行結果を見るのが一番早いと思います。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static Initializer<String> initializer = () -> "InitVal"; private static Aggregator<String, String, String> aggregator = (key, val, agg) -> agg + " & " + val; public static Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); builder ....

June 7, 2019

Kafka Streams DSLを一通り体験する (2. ステートレス処理実践編)

Kafka Streams DSLのうち、ステートレスな操作(branch,map,mergeなど)を実際に触り、動作を確認します。 また最後に、本稿で登場する関数を使用してストリーム処理のFizzBuzzを実装してみます。 前回の記事(準備編)のプロジェクトが作成済みである事を前提とします。 実際にやってみる Filter Java StreamのByPredicateと同じと思って差し支えありません。Java StreamのPredicateと紛らわしいのでimport対象に注意しましょう。 key, valueを引数にbooleanを返し、falseの場合はレコードが除外されます。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.apache.kafka.streams.kstream.Predicate; ... private static Predicate<String, String> predicate = (key, value) -> value.startsWith("あ"); public static Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); builder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) //使用するデシリアライザにStringを明示指定します .filter(predicate) .to("output-topic"); return builder.build(); } ここでConsumed.with(...)が新たに登場しました。Predicateの引数がString型なので、デシリアライザも明示指定する必要があるためです。 また.filter((key, value)-> value.startsWith("あ"))のように直接ラムダ式を記述することももちろん可能です。 さて、テストを実行してみましょう。 1 2 3 4 5 6 7 @Test void test() { inputRecord("input-topic", "key1", "あけまして"); System....

June 5, 2019

Kafka Streams DSLを一通り体験する(1. 準備編)

Kafka Streamsを使ってステートフルなストリーム処理を実装したいと思い立ったものの、Kafka Streams Developer guideを読んでもいまいちよくわからなかったため、自分で一通り試してみました。 この記事ではAggregate Reduce Join Windowingなど、Kafka Streams DSLでできる事を順番にテストし、挙動を確認していきます。また、kafka-streams-test-utilsを用いたJUnitの実装についても解説します。 開発環境 OpenJDK 11 Maven 3.6 Kafka 2.1.1 下準備 プロジェクトの作成 以下の依存関係を追加します kafka-streams kafka-streams-test-utils junit-jupiter-api junit-jupiter-engine maven-surefire-plugin 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 <project xmlns="http://maven....

June 4, 2019