需求与场景
上游某业务数据量特别大, 进入到 kafka 一个 topic 中 (当然了这个 topic 的 partition 数必然多, 有人肯定疑问为什么非要把如此庞大的数据写入到 1 个 topic 里, 历史留下的问题, 现状就是如此庞大的数据集中在一个 topic 里). 这就需要根据一些业务规则把这个大数据量的 topic 数据分发到多个 (成百上千)topic 中, 以便下游的多个 job 去消费自己 topic 的数据, 这样上下游之间的耦合性就降低了, 也让下游的 job 轻松了很多, 下游的 job 只处理属于自己的数据, 避免成百上千的 job 都去消费那个大数据量的 topic. 数据被分发之后再让下游 job 去处理 对网络带宽, 程序性能, 算法复杂性都有好处.
这样一来就需要 这么一个分发程序, 把上下游 job 连接起来.
分析与思考
Flink 中有 connect 算子, 可以连接 2 个流, 在这里 1 个就是上面数据量庞大的业务数据流, 另外 1 个就是规则流 (或者叫做配置流, 也就是决定根据什么样的规则分发业务数据)
但是问题来了, 根据规则分发好了, 如何把这些数据 sink 到 kafka 多个 (成百上千)topic 中呢?
首先想到的就是添加多个 sink, 每分发到一个 topic, 就多添加 1 个 addSink 操作, 这对于如果只是分发到 2,3 个 topic 适用的, 我看了一下项目中有时候需要把数据 sink 到 2 个 topic 中, 同事中就有人添加了 2 个 sink, 完全 ok, 但是在这里要分发到几十个, 成百上千个 topic, 就肯定不现实了, 不需要解释吧.
sink 到 kafka 中, 其实本质上就是用 KafkaProducer 往 kafka 写数据, 那么不知道有没有想起来, 用 KafkaProducer 写数据的时候 API 是怎样的, public Future<RecordMetadata> send(ProducerRecord<K, V> record); 显然这里需要一个 ProducerRecord 对象, 再看如何实例化 ProducerRecord 对象, public ProducerRecord(String topic, V value), 也就是说每一个 message 都指定 topic, 标明是写到哪一个 topic 的, 而不必说 我们要写入 10 个不同的 topic 中, 我们就一定 new 10 个 KafkaProducer
到上面这一步, 如果懂的人就会豁然开朗了, 我本来想着可能需要稍微改改 flink-connector-kafka 实现, 让我惊喜的是 flink-connector-kafka 已经留有了接口, 只要实现 KeyedSerializationSchema 这个接口的 String getTargetTopic(T element); 就行
代码实现
先看一下 KeyedSerializationSchema 接口的定义, 我们知道 kafka 中存储的都是 byte[], 所以由我们自定义序列化 key,value
- /**
- * The serialization schema describes how to turn a data object into a different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data to be handed
- * to them in a specific format (for example as byte strings).
- *
- * @param <T> The type to be serialized.
- */
- @PublicEvolving
- public interface KeyedSerializationSchema<T> extends Serializable {
- /**
- * Serializes the key of the incoming element to a byte array
- * This method might return null if no key is available.
- *
- * @param element The incoming element to be serialized
- * @return the key of the element as a byte array
- */
- byte[] serializeKey(T element);
- /**
- * Serializes the value of the incoming element to a byte array.
- *
- * @param element The incoming element to be serialized
- * @return the value of the element as a byte array
- */
- byte[] serializeValue(T element);
- /**
- * Optional method to determine the target topic for the element.
- *
- * @param element Incoming element to determine the target topic from
- * @return null or the target topic
- */
- String getTargetTopic(T element);
- }
重点来了, 实现这个 String getTargetTopic(T element); 就可以决定这个 message 写入到哪个 topic 里.
于是 我们可以这么做, 拿到业务数据 (我们用的是 JSON 格式), 然后根据规则分发的时候, 就在这条 JSON 格式的业务数据里添加一个写到哪个 topic 的字段, 比如说叫 @topic,
然后我们实现 getTargetTopic() 方法的时候, 从业务数据中取出 @topic 字段就行了.
实现如下 (这里我是用 scala 写的, java 类似):
- class OverridingTopicSchema extends KeyedSerializationSchema[Map[String, Any]] {
- override def serializeKey(element: Map[String, Any]): Array[Byte] = null
- override def serializeValue(element: Map[String, Any]): Array[Byte] = JsonTool.encode(element) // 这里用 JsonTool 指代 JSON 序列化的工具类
- /**
- * kafka message value 根据 @topic 字段 决定 往哪个 topic 写
- * @param element
- * @return
- */
- override def getTargetTopic(element: Map[String, Any]): String = {
- if (element != null && element.contains("@topic")) {
- element("@topic").toString
- } else null
- }
- }
之后在 new FlinkKafkaProducer 对象的时候 把上面我们实现的这个 OverridingTopicSchema 传进去就行了.
- public FlinkKafkaProducer(
- String defaultTopicId, // 如果 message 没有指定写往哪个 topic, 就写入这个默认的 topic
- KeyedSerializationSchema<IN> serializationSchema,// 传入我们自定义的 OverridingTopicSchema
- Properties producerConfig,
- Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
- FlinkKafkaProducer.Semantic semantic,
- int kafkaProducersPoolSize) {
- //....
- }
至此, 我们只需要把上面 new 出来的 FlinkKafkaProducer 添加到 addSink 中就能实现把数据 sink 到 kafka 多个 (成百上千)topic 中.
下面简单追踪一下 FlinkKafkaProducer 源码, 看看 flink-connector-kafka 是如何将我们自定义的 KeyedSerializationSchema 作用于最终的 ProducerRecord
- /** 这个是用户可自定义的序列化实现
- * (Serializable) SerializationSchema for turning objects used with Flink into.
- * byte[] for Kafka.
- */
- private final KeyedSerializationSchema<IN> schema;
- @Override
- public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
- checkErroneous();
- // 调用我们自己的实现的 schema 序列化 message 中的 key
- byte[] serializedKey = schema.serializeKey(next);
- // 调用我们自己的实现的 schema 序列化 message 中的 value
- byte[] serializedValue = schema.serializeValue(next);
- // 调用我们自己的实现的 schema 取出写往哪个 topic
- String targetTopic = schema.getTargetTopic(next);
- if (targetTopic == null) {
- // 如果没有指定写往哪个 topic, 就写往默认的 topic
- // 这个默认的 topic 是我们 new FlinkKafkaProducer 时候作为第一个构造参数传入 (见上面的注释)
- targetTopic = defaultTopicId;
- }
- Long timestamp = null;
- if (this.writeTimestampToKafka) {
- timestamp = context.timestamp();
- }
- ProducerRecord<byte[], byte[]> record;
- int[] partitions = topicPartitionsMap.get(targetTopic);
- if (null == partitions) {
- partitions = getPartitionsByTopic(targetTopic, transaction.producer);
- topicPartitionsMap.put(targetTopic, partitions);
- }
- if (flinkKafkaPartitioner != null) {
- record = new ProducerRecord<>(
- targetTopic, // 这里看到了我们上面一开始分析的 ProducerRecord
- flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
- timestamp,
- serializedKey,
- serializedValue);
- } else {
- record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
- }
- pendingRecords.incrementAndGet();
- transaction.producer.send(record, callback);
- }
来源: https://www.cnblogs.com/itwild/p/12313200.html