在没有安装 flink 的情况下, 试运行 wordcount 代码.
代码如下:
- package bigdata.batch;
- import org.apache.flink.API.common.functions.FlatMapFunction;
- import org.apache.flink.API.java.DataSet;
- import org.apache.flink.API.java.ExecutionEnvironment;
- import org.apache.flink.API.java.tuple.Tuple2;
- import org.apache.flink.API.java.utils.ParameterTool;
- import org.apache.flink.util.Collector;
- /**
- * 统计单次词频
- */
- public class WordCount {
- public static void main(String[] args) throws Exception {
- // 解析命令行参数
- ParameterTool params = ParameterTool.fromArgs(args);
- // 获取一个执行环境: 自动识别本地环境 or 集群环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<String> dataSet;
- if(params.has("input")){
- dataSet = env.readTextFile(params.get("input"));
- } else {
- dataSet = WordCountData.getDefaultTextLineDataSet(env);
- }
- // 单次词频统计
- DataSet<Tuple2<String, Integer>> counts = dataSet.flatMap(new Tokenizer())
- .groupBy(0)
- .sum(1);
- if(params.has("input")){
- // 数据输出格式为 CSV 格式
- counts.writeAsCsv(params.get("output"), "\n", " ");
- // 提交执行 flink 应用
- env.execute("wordcount example");
- } else {
- // 打印数据到控制台, 内部封装了 evn.execute() 方法
- counts.print();
- }
- }
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
- // 正则表达式 \W : 匹配任务非单次字符
- String[] tokens = value.toLowerCase().split("\\W+");
- for (String token:tokens){
- out.collect(new Tuple2<>(token,1));
- }
- }
- }
- }
POM 文件如下:
- <!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.dtwave</groupId>
- <artifactId>learning-flink</artifactId>
- <version>1.0</version>
- <packaging>jar</packaging>
- <name>Flink Quickstart Job</name>
- <url>http://www.myorganization.org</url>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <flink.version>1.9.1</flink.version>
- <java.version>1.8</java.version>
- <scala.binary.version>2.12</scala.binary.version>
- <maven.compiler.source>${java.version}</maven.compiler.source>
- <maven.compiler.target>${java.version}</maven.compiler.target>
- </properties>
- <repositories>
- <repository>
- <id>apache.snapshots</id>
- <name>Apache Development Snapshot Repository</name>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
- <dependencies>
- <!-- Apache Flink dependencies -->
- <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.9.1</version>
- <scope>compile</scope>
- </dependency>
- <!-- Add connector dependencies here. They must be in the default scope (compile). -->
- <!-- Example:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- -->
- <!-- Add logging framework, to produce console output when running in the IDE. -->
- <!-- These dependencies are excluded from the application JAR by default. -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.7</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <!-- Java Compiler -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
- <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <!-- Run shade goal on package phase -->
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <excludes>
- <exclude>org.apache.flink:force-shading</exclude>
- <exclude>com.google.code.findbugs:jsr305</exclude>
- <exclude>org.slf4j:*</exclude>
- <exclude>log4j:*</exclude>
- </excludes>
- </artifactSet>
- <filters>
- <filter>
- <!-- Do not copy the signatures in the META-INF folder.
- Otherwise, this might cause SecurityExceptions when using the JAR. -->
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>bigdata.StreamingJob</mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- <pluginManagement>
- <plugins>
- <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <versionRange>[3.0.0,)</versionRange>
- <goals>
- <goal>shade</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <versionRange>[3.1,)</versionRange>
- <goals>
- <goal>testCompile</goal>
- <goal>compile</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- <!-- This profile helps to make things run out of the box in IntelliJ -->
- <!-- Its adds Flink's core classes to the runtime class path. -->
- <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
- <profiles>
- <profile>
- <id>add-dependencies-for-IDEA</id>
- <activation>
- <property>
- <name>idea.version</name>
- </property>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>compile</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
- </project>
通过上面的 pom, 可以编译打包通过 (mvn clean compile), 但是运行时报错:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/API/java/DataSet
原因在于, 如下依赖的 scope 时候 provided, 即 仅仅在编译, 测试阶段使用该依赖, 运行时没有该依赖, 该依赖由环境提供.
- <groupId>
- org.apache.flink
- </groupId>
- <artifactId>
- flink-java
- </artifactId>
- <version>
- ${flink.version}
- </version>
- <scope>
- provided
- </scope>
flink-client 依赖也得是 compile, 不然报错:
- Could not load the executor class (org.apache.flink.client.LocalExecutor). Do you have the 'flink-clients' project in your dependencies?
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.9.1</version>
- <scope>compile</scope>
- </dependency>
来源: http://www.bubuko.com/infodetail-3298292.html