Apache beam サンプルコード

Apache beamのJava quickstartがいまいち分かりづらかったため、最小コードとデプロイ手順(Google Cloud Dataflow, AWS EMR)を備忘録としてまとめる WordCountサンプル https://github.com/aaaanwz/beam-wordcount-sample 1 2 3 4 5 6 7 8 9 10 . ├── pom.xml └── src └── main └── java ├── core │ └── WordCount.java └── dafn ├── ExtractWordsFn.java └── FormatAsTextFn.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 <?...

February 1, 2020

mavenプロジェクト作成からCIOps構築まで

git branchに変更が加わった際、 JUnit test (with MySQL) docker build Kubernetes環境にデプロイ (CIOps) が行われるJavaプロジェクトを構築します。 本番運用ではArgoCDなどgitOps構築をお勧めします 登場するもの OSS Maven MySQL Docker サービス GitHub CircleCI AWS (ECR, EKS) ⇦ 微修正でその他マネージドk8sにも応用可能かと思います。 サンプルプロジェクトの実装 最終的にディレクトリ構成はこんな感じになります。順を追って作っていきます。 GitHubからcloneして頂いても結構です。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 testproject/ ├ src/ │ ├ main/ │ │ └ java/ │ │ └testpackage/ │ │ └Main.java │ └ test/ │ └ java/ │ └testpackage/ │ └MainTest.java ├ ....

July 24, 2019

Kubernetes Liveness ProbeでJavaプロセスを監視する

Javaプロセスを一定時間毎にチェックし、ハングしていればPodを再起動する仕組みの備忘録です。 Kubernetes LivenessProbeに関する詳細はこちらをご参照ください。 Java実装 監視対象クラス テスト用に、インスタンスが生成されてから10秒後に isAlive() == falseになるように実装します。 1 2 3 4 5 6 7 8 9 10 public class SomeResource { final long createdTime; public SomeResource() { this.createdTime = System.currentTimeMillis(); } public boolean isAlive() { return System.currentTimeMillis() - createdTime < 10000; } } 監視用エンドポイント SomeResource#isAlive() == trueの時はレスポンスコード 200, falseの時は 500を返すように実装します。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import com....

July 2, 2019

Javaで文字コードを推測する

juniversalchardetを使用して、 ファイルの文字コードを推測・デコード・コンソールへの表示を行う URLエンコードされた文字列をデコードする の2つのサンプルプログラムを作成してみます。 juniversalchardetとはMozillaによって提供されているライブラリで、バイト列のパターンの出現頻度をもとに文字コードを推測する機能を提供します。現在日本語ではISO-2022-JP, SHIFT-JIS, EUC-JPに対応しています。 開発環境 OpenJDK 11 Maven 3.6 下準備 以下をmaven dependenciesに追加します pom.xml 1 2 3 4 5 <dependency> <groupId>com.googlecode.juniversalchardet</groupId> <artifactId>juniversalchardet</artifactId> <version>1.0.3</version> </dependency> サンプル1. ファイル読み込み Detectorクラス 今回は汎用性のためにInputStreamを引数としてみます。 引数に渡されたInputStreamインスタンスはオフセットが進んでしまう事に注意が必要です。 UniversalDetectorは入力データが全てシングルバイト文字の場合は文字コード判定結果がnullとなります。今回はそのような場合は環境デフォルト値を返すようにしました。 Detector.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import org....

June 27, 2019

Kafka Streams DSLを一通り体験する (3. ステートフル処理実践編)

Kafka Streams DSLのうち、ステートフルな操作(join,reduce,aggregate,windowingなど)を実際に触り、動作を確認します。 また最後に、本稿と前回で登場した関数を使用してステートフルなストリームFizzBuzzを実装してみます。 実際にやってみる 前々回の記事(準備編)のプロジェクトが作成済みである事を前提とします。 KTable まずはじめに、KTable,KGroupedStreamについて知っておく必要があります。 KGroupedStreamはkeyの値毎にグループ化されたKStreamで、KTableはkeyとvalueの最新状態を保持するテーブルとして扱えるものです。 KTableはnew StreamsBuilder().table("topic-name")...のように直接トピックから生成したり、KGroupedStreamを集約して生成したりと様々なルートで生成することができます。 公式ドキュメントの以下の図が非常に分かりやすいです。 画像リンク元ページ Aggregate KGroupedStreamをkeyごとに集約し、KTableに変換します。 コードと実行結果を見るのが一番早いと思います。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static Initializer<String> initializer = () -> "InitVal"; private static Aggregator<String, String, String> aggregator = (key, val, agg) -> agg + " & " + val; public static Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); builder ....

June 7, 2019

Kafka Streams DSLを一通り体験する (2. ステートレス処理実践編)

Kafka Streams DSLのうち、ステートレスな操作(branch,map,mergeなど)を実際に触り、動作を確認します。 また最後に、本稿で登場する関数を使用してストリーム処理のFizzBuzzを実装してみます。 前回の記事(準備編)のプロジェクトが作成済みである事を前提とします。 実際にやってみる Filter Java StreamのByPredicateと同じと思って差し支えありません。Java StreamのPredicateと紛らわしいのでimport対象に注意しましょう。 key, valueを引数にbooleanを返し、falseの場合はレコードが除外されます。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.apache.kafka.streams.kstream.Predicate; ... private static Predicate<String, String> predicate = (key, value) -> value.startsWith("あ"); public static Topology getTopology() { StreamsBuilder builder = new StreamsBuilder(); builder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) //使用するデシリアライザにStringを明示指定します .filter(predicate) .to("output-topic"); return builder.build(); } ここでConsumed.with(...)が新たに登場しました。Predicateの引数がString型なので、デシリアライザも明示指定する必要があるためです。 また.filter((key, value)-> value.startsWith("あ"))のように直接ラムダ式を記述することももちろん可能です。 さて、テストを実行してみましょう。 1 2 3 4 5 6 7 @Test void test() { inputRecord("input-topic", "key1", "あけまして"); System....

June 5, 2019

Kafka Streams DSLを一通り体験する(1. 準備編)

Kafka Streamsを使ってステートフルなストリーム処理を実装したいと思い立ったものの、Kafka Streams Developer guideを読んでもいまいちよくわからなかったため、自分で一通り試してみました。 この記事ではAggregate Reduce Join Windowingなど、Kafka Streams DSLでできる事を順番にテストし、挙動を確認していきます。また、kafka-streams-test-utilsを用いたJUnitの実装についても解説します。 開発環境 OpenJDK 11 Maven 3.6 Kafka 2.1.1 下準備 プロジェクトの作成 以下の依存関係を追加します kafka-streams kafka-streams-test-utils junit-jupiter-api junit-jupiter-engine maven-surefire-plugin 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 <project xmlns="http://maven....

June 4, 2019