Flink基础(五):DS简介(5) 编写第一个Flink程序
1 在IDEA中编写Flink程序
Scala版Flink程序编写
本项目使用的Flink版本为最新版本,也就是1.11.0。现在提供maven项目的配置文件。
- 使用Intellij IDEA创建一个Maven新项目
- 勾选
Create from archetype
,然后点击Add Archetype
按钮 GroupId
中输入org.apache.flink
,ArtifactId
中输入flink-quickstart-scala
,Version
中输入1.11.0
,然后点击OK
- 点击向右箭头,出现下拉列表,选中
flink-quickstart-scala:1.11.0
,点击Next
Name
中输入FlinkTutorial
,GroupId
中输入com.atguigu
,ArtifactId
中输入FlinkTutorial
,点击Next
- 最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击
Finish
,等待一会儿,项目就创建好了
编写WordCount.scala
程序
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamingJob { /** Main program method */ def main(args: Array[String]) : Unit = { // get the execution environment StreamExecutionEnvironment env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment // get input data by connecting to the socket val text: DataStream[String] = env .socketTextStream("localhost", 9999, '\n') // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") // print the results with a single thread, rather than in parallel windowCounts .print() .setParallelism(1) env.execute("Socket Window WordCount") } /** Data type for words with count */ case class WordWithCount(word: String, count: Long) }
打开一个终端(Terminal),运行以下命令
$ nc -lk 9999
接下来使用IDEA
运行就可以了。
Java版Flink程序编写
- 使用Intellij IDEA创建一个Maven新项目
- 勾选
Create from archetype
,然后点击Add Archetype
按钮 GroupId
中输入org.apache.flink
,ArtifactId
中输入flink-quickstart-java
,Version
中输入1.11.0
,然后点击OK
- 点击向右箭头,出现下拉列表,选中
flink-quickstart-java:1.11.0
,点击Next
Name
中输入FlinkTutorial
,GroupId
中输入com.atguigu
,ArtifactId
中输入FlinkTutorial
,点击Next
- 最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击
Finish
,等待一会儿,项目就创建好了
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0modelVersion> <groupId>org.examplegroupId> <artifactId>flink_testartifactId> <version>1.0-SNAPSHOTversion> <dependencies> <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-streaming-java_2.11artifactId> <version>1.12.3version> <scope>providedscope> dependency> dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-shade-pluginartifactId> <version>3.1.1version> <executions> <execution> <phase>packagephase> <goals> <goal>shadegoal> goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305exclude> <exclude>org.slf4j:*exclude> <exclude>log4j:*exclude> excludes> artifactSet> <filters> <filter> <artifact>*:*artifact> <excludes> <exclude>META-INF/*.SFexclude> <exclude>META-INF/*.DSAexclude> <exclude>META-INF/*.RSAexclude> excludes> filter> filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>my.programs.main.clazzmainClass> transformer> transformers> configuration> execution> executions> plugin> plugins> build> project>
编写WordCount.java
程序
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountFromSocket { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamstream = env.socketTextStream("localhost", 9999); stream.flatMap(new Tokenizer()).keyBy(r -> r.f0).sum(1).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class Tokenizer implements FlatMapFunction > { @Override public void flatMap(String value, Collector > out) throws Exception { String[] stringList = value.split("\\s"); for (String s : stringList) { // 使用out.collect方法向下游发送数据 out.collect(new Tuple2(s, 1)); } } } }
2 下载Flink运行时环境,提交Jar包的运行方式
下载链接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
然后解压
$ tar xvfz flink-1.11.1-bin-scala_2.11.tgz
启动Flink集群
$ cd flink-1.11.1
$ ./bin/start-cluster.sh
可以打开Flink WebUI查看集群状态:http://localhost:8081
在IDEA
中使用maven package
打包。
提交打包好的JAR
包
$ cd flink-1.11.1
$ ./bin/flink run 打包好的JAR包的绝对路径
停止Flink集群
$ ./bin/stop-cluster.sh
查看标准输出日志的位置,在log
文件夹中。
$ cd flink-1.11.1/log