本文是基于 hadoop 2.7.1, 以及 kafka 0.11.0.0kafka-connect 是以单节点模式运行, 即 standalone
首先, 先对 kafka 和 kafka connect 做一个简单的介绍
kafka:Kafka 是一种高吞吐量的分布式发布订阅消息系统, 它可以处理消费者规模的网站中的所有动作流数据比较直观的解释就是其有一个生产者 (producer) 和一个消费者 (consumer) 可以将 kafka 想象成一个数据容器, 生产者负责发送数据到这个容器中, 而消费者从容器中取出数据, 在将数据做处理, 如存储到 hdfs
kafka connect:Kafka Connect 是一种用于在 Kafka 和其他系统之间可扩展的可靠的流式传输数据的工具它使得能够快速定义将大量数据集合移入和移出 Kafka 的连接器变得简单即适合批量数据导入导出操作
下面将介绍如何用 kafka connect 将数据写入到 hdfs 中包括在这个过程中可能碰到的一些问题说明
首先启动 kafka-connect:
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
这个命令后面两个参数,
第一个是指定启动的模式, 有分布式和单节点两种, 这里是单节点 kafka 自带, 放于 config 目录下
第二个参数指向描述 connector 的属性的文件, 可以有多个, 这里只有一个 connector 用来写入到 hdfs 需要自己创建
接下来看看 connector1.properties 的内容,
- name = "test"#该connector的名字connector.class = hdfs.HdfsSinkConnector#将自己按connect接口规范编写的代码打包后放在kafka / libs目录下,
- 再根据项目结构引用对应connector tasks.max = 1#Task是导入导出的具体实现,
- 这里是指定多少个task来并行运行导入导出作业,
- 由多线程实现由于hdfs中一个文件每次只能又一个文件操作,
- 所以这里只能是1.topics = test#指定从哪个topic读取数据,
- 这些其实是用来在connector或者task的代码中读取的key.converter = org.apache.kafka.connect.converters.ByteArrayConverter#指定key以那种方式转换,
- 需和Producer发送方指定的序列化方式一致value.converter = org.apache.kafka.connect.json.JsonConverter#同上hdfs.url = hdfs: //127.0.0.1:9000#hdfs 的 url 路径, 在 Connector 中会被读取
- hdfs.path = /test/file#hdfs文件路径,
- 同样Connector中被读取key.converter.schemas.enable = true#稍后介绍,
- 可以true也可以false value.converter.schemas.enable = true#稍后介绍,
- 可以true也可以false
接下来看代码, connect 主要是导入导出两个概念, 导入是 source, 导出时 Sink 这里只使用 Sink, 不过 Source 和 Sink 的实现其实基本相同
先是 Connector
- public class HdfsSinkConnector extends SinkConnector {
- // 这两项为配置 hdfs 的 urlh 和路径的配置项, 需要在 connector1.properties 中指定
- public static final String HDFS_URL = "hdfs.url";
- public static final String HDFS_PATH = "hdfs.path";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")
- .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");
- private String hdfsUrl;
- private String hdfsPath;
- @Override
- public String version() {
- return AppInfoParser.getVersion();
- }
- @Override
- public void start(Map<String, String> props) {
- hdfsUrl = props.get(HDFS_URL);
- hdfsPath = props.get(HDFS_PATH);
- }
- @Override
- public Class<? extends Task> taskClass() {
- return HdfsSinkTask.class;
- }
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- ArrayList<Map<String, String>> configs = new ArrayList<>();
- for (int i = 0; i <maxTasks; i++) {
- Map<String, String> config = new HashMap<>();
- if (hdfsUrl != null)
- config.put(HDFS_URL, hdfsUrl);
- if (hdfsPath != null)
- config.put(HDFS_PATH, hdfsPath);
- configs.add(config);
- }
- return configs;
- }
- @Override
- public void stop() {
- // Nothing to do since FileStreamSinkConnector has no background monitoring.
- }
- @Override
- public ConfigDef config() {
- return CONFIG_DEF;
- }
- }
接下来是 Task
- public class HdfsSinkTask extends SinkTask {
- private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class);
- private String filename;
- public static String hdfsUrl;
- public static String hdfsPath;
- private Configuration conf;
- private FSDataOutputStream os;
- private FileSystem hdfs;
- public HdfsSinkTask(){
- }
- @Override
- public String version() {
- return new HdfsSinkConnector().version();
- }
- @Override
- public void start(Map<String, String> props) {
- hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);
- hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);
- System.out.println("----------------------------------- start--------------------------------");
- conf = new Configuration();
- conf.set("fs.defaultFS", hdfsUrl);
- // 这两个是与 hdfs append 相关的设置
- conf.setBoolean("dfs.support.append", true);
- conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
- try{
- hdfs = FileSystem.get(conf);
- // connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);
- os = hdfs.append(new Path(hdfsPath));
- }catch (IOException e){
- System.out.println(e.toString());
- }
- }
- @Override
- public void put(Collection<SinkRecord> sinkRecords) {
- for (SinkRecord record : sinkRecords) {
- log.trace("Writing line to {}: {}", logFilename(), record.value());
- try{
- System.out.println("write info------------------------" + record.value().toString() + "-----------------");
- os.write((record.value().toString()).getBytes("UTF-8"));
- os.hsync();
- }catch(Exception e){
- System.out.print(e.toString());
- }
- }
- }
- @Override
- public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
- try{
- os.hsync();
- }catch (Exception e){
- System.out.print(e.toString());
- }
- }
- @Override
- public void stop() {
- try {
- os.close();
- }catch(IOException e){
- System.out.println(e.toString());
- }
- }
- private String logFilename() {
- return filename == null ? "stdout" : filename;
- }
- }
这里重点提一下, 因为在 connector1.propertise 中设置了 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter, 所以不能用命令行形式的
producer 发送数据, 而是要用程序的方式, 并且在 producer 总也要设置 key 的序列化形式为 org.apache.kafka.common.serialization.ByteArraySerializer
编码完成, 先用 idea 以开发程序与依赖包分离的形式打包成 jar 包, 然后将程序对应的 jar 包 (一般就是项目名. jar) 放到 kafka/libs 目录下面, 这样就能被找到
接下来对这个过程的问题做一个汇总
1. 在 connector1.properties 中的 key.converter.schemas.enable=false 和 value.converter.schemas.enable=false 的问题
这个选项默认在 connect-standalone.properties 中是 true 的, 这个时候发送给 topic 的 Json 格式是需要使用 avro 格式, 具体情况可以百度, 这里给出一个样例
- {
- "schema": {
- "type": "struct",
- "fields": [{
- "type": "int32",
- "optional": true,
- "field": "c1"
- },
- {
- "type": "string",
- "optional": true,
- "field": "c2"
- },
- {
- "type": "int64",
- "optional": false,
- "name": "org.apache.kafka.connect.data.Timestamp",
- "version": 1,
- "field": "create_ts"
- },
- {
- "type": "int64",
- "optional": false,
- "name": "org.apache.kafka.connect.data.Timestamp",
- "version": 1,
- "field": "update_ts"
- }],
- "optional": false,
- "name": "foobar"
- },
- "payload": {
- "c1": 10000,
- "c2": "bar",
- "create_ts": 1501834166000,
- "update_ts": 1501834166000
- }
- }
主要就是 schema 和 payload 这两个, 不按照这个格式会报错如下
- org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
- at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
如果想发送普通的 json 格式而不是 avro 格式的话, 很简单 key.converter.schemas.enable 和 value.converter.schemas.enable 设置为 false 就行这样就能发送普通的 json 格式数据
2. 在启动的过程中出现各种各样的 java.lang.ClassNotFoundException
在启动 connector 的时候, 一开始总是会报各个各样的 ClassNotFoundException, 不是这个包就是那个包, 查找问题一直说要么缺少包要么是包冲突这个是什么原因呢?
其实归根结底还是依赖冲突的问题, 因为 kafka 程序自定义的类加载器加载类的目录是在 kafka/libs 中, 而写到 hdfs 需要 hadoop 的包
我一开始的做法是将 hadoop 下的包路径添加到 CLASSPATH 中, 这样子问题就来了, 因为 kafka 和 hadoop 的依赖包是有冲突的, 比如 hadoop 是 guava-11.0.2.jar, 而 kafka 是 guava-20.0.jar, 两个 jar 包版本不同, 而我们是在 kafka 程序中调用 hdfs, 所以当 jar 包冲突时应该优先调用 kafka 的但是注意 kafka 用的是程序自定义的类加载器, 其优先级是低于 CLASSPATH 路径下的类的, 就是说加载类时会优先加载 CLASSPATH 下的类这样子就有问题了
我的解决方案时将 kafka 和 hadoop 加载的 jar 包路径都添加到 CLASSPATH 中, 并且 kafka 的路径写在 hadoop 前面, 这样就可以启动 connector 成功
来源: https://www.cnblogs.com/listenfwind/p/8610487.html