由于一些因素限制, 代码里没有异常处理, 自己实现时请注意异常处理
简介
之前提过我在公司内部做的是一个消息平台, 为各个应用提供消息服务. 前端时间有个领导给了个需求, 能不能自动为所有的消息加上一个消息 ID 的属性, 可以全局唯一定义一个消息.
听到时候的第一个反应是, 消息体的序列化反序列化方法都可以自定义了, 为什么不在自己的消息体里加 MessageID?
领导的说法是因为这样所有客户端在使用的时候还要修改自己的消息体, 多不方便啊
EMMMMMM.. 好吧, 你是领导...
然后我转念一想, 在不破坏消息体的前提下, 给每个消息添加一个消息 ID, 感觉很有趣啊, 于是报着干脆玩一下的心理预期, 开始干活.
基本思路
在 Kafka 客户端中, 会根据用户选择的序列化方式实例化序列化器 (Serializer) 和反序列化器(Deserializer), 要实现这个功能, 基本思路是在用户定义的序列化方式上包上一层自己的序列化方法.
最终实现的效果是:
- //Producer Demo
- public class ProducerDemo{
- @Test
- public void sendTest(){
- KafkaConfigProperties prop = new KafkaConfigProperties("producer.properties");
- ABCProducer<String,ABCMessage<String>> producer = new ABCProducer<>(prop);
- ProducerRecord<String,ABCMessage<String>> record = new ProducerRecord<String,ABCMessage<String>>("testTopic",null,new ABCMessage<String>("hello world!"));
- producer.send(record);
- }
- }
- //Consumer Demo
- public class ConsumerDemo{
- @Test
- public void receiveTest(){
- KafkaConfigProperties prop = new KafkaConfigProperties("consumer.properties");
- ABCConsumer<String,ABCMessage<String>> consumer = new ABCConsumer<>(prop);
- consumer.subscribe(Collections.singletonList("testTopic"));
- ConsumerRecords<String,ABCMessage<String>> records = consumer.poll(10000);
- for (ConsumerRecord<String,ABCMessage<String>> record:records){
- System.out.println(record.getMessageId()+" "+record.getvalue());
- }
- }
- }
而配置的配置文件仍然可以是原生的那些
- Bootstrap.servers=192.168.0.1:9092
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
- acks=all
究其原因, 就是在通过对原生接口的修改, 侵入了原生的 kafka-client 的 API, 本文只实现了在用户序列化方式外面包上一层, 其实还可以实现很多其他的能力.
序列化位置的选择
对一个 kafka 消息来说, ProducerRecord 里写死了很多属性:
- public class ProducerRecord<K, V> {
- private final String topic;
- private final Integer partition;
- private final K key;
- private final V value;
- private final Long timestamp;
- }
这些类型都是 private final 的, 其中 topic,partition 都是用户指定的, 不能修改, key 直接决定了 patition, 可能有业务含义, timestamp 和 value 是可以稍微动一下的, 但是 timestamp 是 long 类型, 不好修改, 最终 message ID 还是要加在 value 上.
value 这个类型是用户指定的, 且指定了序列化方法, 在发送的时候发送的都是 byte[]类型, 所以考虑在用户指定的序列化器做完序列化以后, 在前面加上自己的序列后的 messageID, 重新组合成一个新的 byte[]再发往 kafka.
整个过程是这样的:
侵入序列化方法
设计思路
我们需要自定义 properties(KafkaConfigProperties), 自定义 producer(ABCProducer), 自定义一个新的 Message 类型(ABCMessage), 自定义一个序列化和反序列化方式(ABCMessageSerializer,ABCMessageDeserializer).
每个组件的用途是这样的:
KafkaConfigProperties : 继承 Properties, 用于从用户给的 properties 中进行 value.serializer 的修改, 此外还能加上一些强制鉴权的能力, 参数校验等等..
ABCProducer: 继承 KafkaProducer, 用于在父类 KafkaProducer 初始化以后, 将用户定义的序列化器传入 ABCMessageSerializer
ABCMessage: 包装上 MessageID, 给生产者和消费者对等的消息发送和接收的概念
ABCMessageSerializer: 对数据进行拆分, 并将消息的 message ID 进行基本序列化, 将用户的 value 根据用户的序列化方式进行序列化.
这样设计的原因是, kafka 的客户端封装得太紧密, 所以找了半天没找到漏洞, 最后只能通过反射来破坏 KafkaProducer 中的 valueSerializer 的 private 修饰, 从而在子类中对其进行方法调用.
实现代码
KafkaConfigProperties
根据配置文件, 判断是否需要启动这个功能. 如果启动, 就更改一下 value.serializer 配置或者 value.deserializer 配置
- public class KafkaConfigProperties extends Properties {
- private boolean getBooleanProperties(String key, String defaultValue){
- return Boolean.valueOf(this.getproperty(key,defaultValue));
- }
- public KafkaConfigProperties(Properties prop){
- super();
- this.putAll(prop);
- configMessageId();
- }
- private boolean messageIdEnable;
- public boolean isMessageIdEnable() {
- return messageIdEnable;
- }
- private void configMessageId() {
- // 默认不使用 messageID, 需要显式配置
- messageIdEnable = getBooleanProperties("messageid.enable","false");
- if (messageIdEnable){
- if (this.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)){
- this.put("user.value.serializer",this.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
- this.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"com.abc.ABCMessageSerializer");
- }
- if (this.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)){
- this.put("user.value.deserializer",this.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
- this.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"com.abc.ABCMessageDeserializer");
- }
- }
- }
- }
- ABCMessage
每次 Mesasge 的生成都会通过 MessageUtils 来自动生成一个 messageId, 建议固定长度, 我用的暂时是 UUID,36 位.
- public class ABCMessage<V> {
- private String messageId;
- private V value;
- public ABCMessage(String messageId, V value) {
- this.messageId = messageId;
- this.value = value;
- }
- public ABCMessage(V value) {
- this(MessageUtils.generateMessageId(),value);
- }
- public String getMessageId() {
- return messageId;
- }
- public void setMessageId(String messageId) {
- this.messageId = messageId;
- }
- public V getValue() {
- return value;
- }
- public void setValue(V value) {
- this.value = value;
- }
- }
- ABCProducer
这里用到了反射, 将父类中的 valueSeriazlier 获取到, 然后执行里面的方法
- public class ABCProducer<K,V> extends KafkaProducer<K,V>{
- public ABCProducer(KafkaConfigProperties prop) {
- super(prop);
- if (prop.isMessageIdEnable()){
- Field field = this.getClass().getSuperClass().getDeclareField("valueSerializer");
- field.setAccessible(true);
- ABCMessageSerializer serializer = (ABCMessageSerializer) field.get(this);
- serializer.setUserSerializer(prop.get("user.value.serializer").toString());
- }
- }
- }
- ABCMessageSerializer
- public class ABCMessageSerializer implements Serializer {
- private Serializer valueSerializer;
- private Map configs;
- @Override
- public void configure(Map configs,boolean isKey){
- this.configs = configs;
- }
- @Override
- public byte[] seriazlize(String topic, Object data){
- if (data instanceof ABCMessage){
- byte sep = 0;
- ABCMessage message = (ABCMessage) data;
- byte[] joinByte = EncryptUtils.join(sep,message.getMessageId().getBytes("UTF-8"),
- valueSerializer.serialize(topic,message.getValue()));
- return joinByte;
- }
- }
- @Override
- public void close(){}
- public void setUserSerializer(String userValueSerializer){
- HashMap<String,Object> config = new HashMap<>();
- config.put("value.serializer",Class.forName(userValueSerializer));
- ABCMessageConfig config = new ABCMessageConfig(configProp);
- this.valueSerializer = config.getConfiguredInstance("value.serializer",Serializer.class);
- this.valueSerializer.configure(this.configs,false);
- }
- }
这段代码里有几个需要注意的地方:
serialize 方法里, 进行了一个 messageId 的序列化操作, message.getMessageId().getBytes("UTF-8"), 建议加上 UTF-8;
EncryptUtils.join 这个方法是我前文中对 kafka 的鉴权模式进行自己修改的时候用到的, 作用是字符串拼接. 拼接的两个字符串之间加上分隔符, 也就是 sep, 使用的是 0 这个 byte, 不拼接也行其实;
setUserSerializer 方法中用到了 ABCMessageConfig, 这个 Config 继承自 AbstractConfig 这个抽象类, 仅仅只是进程, 没有任何的重写. 主要用途是模仿 ProducerConfig 和 ConsumerConfig 中对 valuerSeriazlier 的实例化方式. 这里就不上代码了.
记得构造好以后的 valueSerializer 需要调用一下 configure 方法.
我在一开始的时候想在 configure 方法里加上指定用户的 value.serializer, 这样就可以免除后面 setuserSerializer 这个方法的麻烦, 但是发现传入 configure 方法的 configs 经过了一层过滤(ProducerConfig 构造后过滤了), 我们在 properties 中加入的 user.value.serializer 无法被识别. 所以只好用后续添加的方式.
还能做什么
在这个方法里, 破坏了 valueSerializer 的 private 修饰, 我们还能通过破坏其它的修饰来更改 KafaProducer 的方法, 这样修改而不是直接修改 KafkaProduer 源码的好处是可以直接升级 Kafka 客户端的 API 而不需要重新编译新版本的 kafka-clients 源码, 升级更方便.
下一步可以试试让用户加上自定义的 messageId, 但是这样的话不如让用户自己在 value 里定义了吧...
╮(╯╰)╭
来源: http://www.jianshu.com/p/0d287d5c3bbc