Apache beamのJava quickstartがいまいち分かりづらかったため、最小コードとデプロイ手順(Google Cloud Dataflow, AWS EMR)を備忘録としてまとめる
WordCountサンプル#
https://github.com/aaaanwz/beam-wordcount-sample
1
2
3
4
5
6
7
8
9
10
| .
├── pom.xml
└── src
└── main
└── java
├── core
│ └── WordCount.java
└── dafn
├── ExtractWordsFn.java
└── FormatAsTextFn.java
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
| <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>aaaanwz</groupId>
<artifactId>beam-wordcount-sample</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<beam.version>2.16.0</beam.version>
</properties>
<profiles>
<profile>
<id>direct-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>dataflow-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.9</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>core.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| package core;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import dafn.ExtractWordsFn;
import dafn.FormatAsTextFn;
public class WordCount {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
p.apply("ReadLines",
Create.of("a a a a a b b b b b c c c ").withCoder(StringUtf8Coder.of()))
.apply(ParDo.of(new ExtractWordsFn())).apply(Count.perElement())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to("counts"));
p.run().waitUntilFinish();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| package dafn;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
public class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
private static final long serialVersionUID = 1L;
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
|
デプロイ方法#
ローカル実行#
1
2
| mvn package
java -jar ./target/beam-wordcount-sample-0.1-shaded.jar
|
Google Cloud Dataflow#
- gcloud CLIのインストール・設定済みであることが前提
1
2
| mvn package -Pdataflow-runner
java -jar ./target/beam-wordcount-sample-0.1-shaded.jar --runner=DataflowRunner --project=xxxx --tempLocation=gs://<YOUR_GCS_BUCKET>/temp/
|
AWS EMR#
1
2
| mvn package -Pflink-runner
scp -i ~/.ssh/keypair.pem ./target/beam-wordcount-sample-0.1-shaded.jar ec2-user@ec2-xxx-xxx-xxx:/home/hadoop
|
1
2
3
4
| JAR の場所:command-runner.jar
メインクラス:なし
引数:flink run -m yarn-cluster -yn 2 /home/hadoop/beam-wordcount-sample-0.1-shaded.jar --runner=FlinkRunner
失敗時の操作:次へ
|
(参考)