传统的大数据处理方式一般是批处理式的, 也就是说, 今天所收集的数据, 我们明天再把今天收集到的数据算出来, 以供大家使用, 但是在很多情况下, 数据的时效性对于业务的成败是非常关键的.
Spark 和 Flink 都是通用的开源大规模处理引擎, 目标是在一个系统中支持所有的数据处理以带来效能的提升. 两者都有相对比较成熟的生态系统. 是下一代大数据引擎最有力的竞争者.
Spark 的生态总体更完善一些, 在机器学习的集成和易用性上暂时领先.
Flink 在流计算上有明显优势, 核心架构和模型也更透彻和灵活一些.
本文主要通过实例来分析 flink 的流式处理过程, 并通过源码的方式来介绍流式处理的内部机制.
DataStream 整体概述
主要分 5 部分, 下面我们来分别介绍:
1. 运行环境 StreamExecutionEnvironment
StreamExecutionEnvironment 是个抽象类, 是流式处理的容器, 实现类有两个, 分别是
- LocalStreamEnvironment:
- RemoteStreamEnvironment:
- /**
- * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
- * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
- * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
- *
- * <p>The environment provides methods to control the job execution (such as setting the parallelism
- * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
- *
- * @see org.apache.flink.streaming.API.environment.LocalStreamEnvironment
- * @see org.apache.flink.streaming.API.environment.RemoteStreamEnvironment
- */
2. 数据源 DataSource 数据输入
包含了输入格式 InputFormat
- /**
- * Creates a new data source.
- *
- * @param context The environment in which the data source gets executed.
- * @param inputFormat The input format that the data source executes.
- * @param type The type of the elements produced by this input format.
- */
- public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
- super(context, type);
- this.dataSourceLocationName = dataSourceLocationName;
- if (inputFormat == null) {
- throw new IllegalArgumentException("The input format may not be null.");
- }
- this.inputFormat = inputFormat;
- if (inputFormat instanceof NonParallelInput) {
- this.parallelism = 1;
- }
- }
flink 将数据源主要分为内置数据源和第三方数据源, 内置数据源有 文件, 网络 socket 端口及集合类型数据; 第三方数据源实用 Connector 的方式来连接如 kafka Connector,es connector 等, 自己定义的话, 可以实现 SourceFunction, 封装成 Connector 来做.
3.DataStream 转换
DataStream: 同一个类型的流元素, DataStream 可以通过 transformation 转换成另外的 DataStream, 示例如下
- @link DataStream#map
- @link DataStream#filter
StreamOperator: 流式算子的基本接口, 三个实现类
- AbstractStreamOperator:
- OneInputStreamOperator:
- TwoInputStreamOperator:
- /**
- * Basic interface for stream operators. Implementers would implement one of
- * {@link org.apache.flink.streaming.API.operators.OneInputStreamOperator} or
- * {@link org.apache.flink.streaming.API.operators.TwoInputStreamOperator} to create operators
- * that process elements.
- *
- * <p>The class {@link org.apache.flink.streaming.API.operators.AbstractStreamOperator}
- * offers default implementation for the lifecycle and properties methods.
- *
- * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called concurrently with
- * methods on {@code StreamOperator}.
- *
- * @param <OUT> The output type of the operator
- */
4.DataStreamSink 输出
- /**
- * Adds the given sink to this DataStream. Only streams with sinks added
- * will be executed once the {@link StreamExecutionEnvironment#execute()}
- * method is called.
- *
- * @param sinkFunction
- * The object containing the sink's invoke function.
- * @return The closed DataStream.
- */
- public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
- // read the output type of the input Transform to coax out errors about MissingTypeInfo
- transformation.getOutputType();
- // configure the type if needed
- if (sinkFunction instanceof InputTypeConfigurable) {
- ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
- }
- StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
- DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
- getExecutionEnvironment().addOperator(sink.getTransformation());
- return sink;
- }
5. 执行
- /**
- * Executes the JobGraph of the on a mini cluster of ClusterUtil with a user
- * specified name.
- *
- * @param jobName
- * name of the job
- * @return The result of the job execution, containing elapsed time and accumulators.
- */
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- // transform the streaming program into a JobGraph
- StreamGraph streamGraph = getStreamGraph();
- streamGraph.setJobName(jobName);
- JobGraph jobGraph = streamGraph.getJobGraph();
- jobGraph.setAllowQueuedScheduling(true);
- Configuration configuration = new Configuration();
- configuration.addAll(jobGraph.getJobConfiguration());
- configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
- // add (and override) the settings with what the user defined
- configuration.addAll(this.configuration);
- if (!configuration.contains(RestOptions.BIND_PORT)) {
- configuration.setString(RestOptions.BIND_PORT, "0");
- }
- int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
- MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
- .build();
- if (LOG.isInfoEnabled()) {
- LOG.info("Running job on local embedded Flink mini cluster");
- }
- MiniCluster miniCluster = new MiniCluster(cfg);
- try {
- miniCluster.start();
- configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
- return miniCluster.executeJobBlocking(jobGraph);
- }
- finally {
- transformations.clear();
- miniCluster.close();
- }
- }
6. 总结
Flink 的执行方式类似于管道, 它借鉴了数据库的一些执行原理, 实现了自己独特的执行方式.
7. 展望
Stream 涉及的内容还包括 Watermark,Windows 等概念, 因篇幅限制, 这篇仅介绍 flink DataStream API 使用及原理.
下篇将介绍 Watermark, 下下篇是 Windows 窗口计算.
参考资料
[1]
[2] https://blog.51cto.com/13654660/2087705
来源: https://www.cnblogs.com/davidwang456/p/11046857.html