hadoop-client 依赖很乱 调试很多次 cdh 版本好多 jar 没有 用 hadoop2.7.3 可以
自定义输出流的池子进行流管理
- public void writeLog2HDFS(String path, byte[] log) {
- try {
- // 得到我们的装饰流
- FSDataOutputStream out = HDFSOutputStreamPool.getInstance().takeOutputStream(path);
- out.write(log);
- out.write("\r\n".getBytes());
- out.hsync();
- out.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * @created by imp ON 2019/3/1
- */
- object KafkaScalaConsumer {
- val write=new HDFSWriter()
- def ZK_CONN = "192.168.121.12:2181"
- def GROUP_ID = "1test-consumer-group109"
- def TOPIC = "eshop"
- def main(args: Array[String]): Unit = {
- //println("开始了")
- val connector = Consumer.create(createConfig())
- val topicCountMap = new HashMap[String, Int]()
- topicCountMap.put(TOPIC, 3) // TOPIC 在创建时就指定了它有 3 个 partition
- val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = connector.createMessageStreams(topicCountMap)
- println("# of streams is" + msgStreams.get(TOPIC).get.size)
- val threadPool:ExecutorService=Executors.newFixedThreadPool(3)
- var index = 0;
- for (stream <- msgStreams.get(TOPIC).get) {
- threadPool.execute(new ThreadDemo("consumer_"+index,stream))
- index+=1;
- }
- }
- class ThreadDemo(threadName:String,stream:KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{
- override def run(): Unit = {
- val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator();
- while(it.hasNext()){
- val data : MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
- val msg=data.message()
- val log = new String(msg)
- val arr = StringUtil.splitLog(log)
- if (arr == null || arr.length < 1) return //todo: continue is not supported
- // 主机名
- val hostname = StringUtil.getHostname(arr)
- // 日期串
- val dateStr = StringUtil.formatYyyyMmDdHhMi(arr)
- //path
- val rawPath = "/spark/eshop/" + dateStr + "/" + hostname + ".log"
- // 写入数据到 hdfs
- System.out.println(log)
- write .writeLog2HDFS(rawPath, msg)
- }
- }
- }
- def createConfig(): ConsumerConfig = {
- val props = new Properties()
- props.put("zookeeper.connect", ZK_CONN)
- // props.put("bootstrap.servers","localhost:9092")
- props.put("group.id", GROUP_ID)
- props.put("zookeeper.session.timeout.ms", "5000")
- props.put("zookeeper.connection.timeout.ms","10000")
- props.put("auto.offset.reset", "smallest")
- props.put("auto.commit.interval.ms", "300")
- props.put("rebalance.backoff.ms","2000")
- props.put("rebalance.max.retries","10")
- props.put("auto.offset.reset", "smallest")
- new ConsumerConfig(props)
- }
- }
来源: http://www.bubuko.com/infodetail-2975449.html