需求
已有 Kafka 服务, 通过 kafka 服务数据 (GPS) 落地到本地磁盘(以文本文件存储). 现要根据 echarts 实现一个实时车辆的地图.
分析
前端实时展现: 使用 websocket 技术, 实现服务器端数据推送到前端展现
通过 Java 的 kafka client 端获取数据, 并且通过 websock 推送到前端.
websocket
简介
websocket 是 html5 开始提供的一种在单位 TCP 连接上进行全双工通讯的协议. 在 websocket api 中, 浏览器和服务器只需要做一次握手的动作, 然后浏览器和服务器之间就形成了一条快速通道. 两者之间就可以数据互相传送.
开发
服务器端
- package com.ykkj.weiyi.socket;
- import org.springframework.stereotype.Component;
- import org.springframework.web.socket.server.standard.SpringConfigurator;
- import javax.websocket.*;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.concurrent.CopyOnWriteArraySet;
- /**
- * @ServerEndpoint 注解是一个类层次的注解, 它的功能主要是将目前的类定义成一个 websocket 服务器端,
- * 注解的值将被用于监听用户连接的终端访问 URL 地址, 客户端可以通过这个 URL 来连接到 WebSocket 服务器端
- */
- @ServerEndpoint(value = "/websocket")
- public class CommodityServer {
- // 静态变量, 用来记录当前在线连接数. 应该把它设计成线程安全的.
- private static int onlineCount = 0;
- //concurrent 包的线程安全 Set, 用来存放每个客户端对应的 MyWebSocket 对象. 若要实现服务端与单一客户端通信的话, 可以使用 Map 来存放, 其中 Key 可以为用户标识
- public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>();
- // 与某个客户端的连接会话, 需要通过它来给客户端发送数据
- private Session session;
- /**
- * 连接建立成功调用的方法
- *
- * @param session 可选的参数. session 为与某个客户端的连接会话, 需要通过它来给客户端发送数据
- */
- @OnOpen
- public void onOpen(Session session) {
- this.session = session;
- webSocketSet.add(this); // 加入 set 中
- addOnlineCount(); // 在线数加 1
- System.out.println("有新连接加入! 当前在线人数为" + getOnlineCount());
- }
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose() {
- webSocketSet.remove(this); // 从 set 中删除
- subOnlineCount(); // 在线数减 1
- System.out.println("有一连接关闭! 当前在线人数为" + getOnlineCount());
- }
- /**
- * 收到客户端消息后调用的方法
- *
- * @param message 客户端发送过来的消息
- * @param session 可选的参数
- */
- @OnMessage
- public void onMessage(String message, Session session) {
- System.out.println("来自客户端的消息:" + message);
- // 群发消息
- for (CommodityServer item : webSocketSet) {
- try {
- item.sendMessage(message);
- } catch (IOException e) {
- e.printStackTrace();
- continue;
- }
- }
- }
- /**
- * 发生错误时调用
- *
- * @param session
- * @param error
- */
- @OnError
- public void onError(Session session, Throwable error) {
- System.out.println("发生错误");
- error.printStackTrace();
- }
- /**
- * 这个方法与上面几个方法不一样. 没有用注解, 是根据自己需要添加的方法.
- *
- * @param message
- * @throws IOException
- */
- public void sendMessage(String message) throws IOException {
- this.session.getBasicRemote().sendText(message);
- //this.session.getAsyncRemote().sendText(message);
- }
- public static synchronized int getOnlineCount() {
- return onlineCount;
- }
- public static synchronized void addOnlineCount() {
- CommodityServer.onlineCount++;
- }
- public static synchronized void subOnlineCount() {
- CommodityServer.onlineCount--;
- }
- }
前端
- <!DOCTYPE html>
- <html>
- <head>
- <title>Java 后端 WebSocket 的 Tomcat 实现</title>
- </head>
- <body>
- Welcome<br/><input id="text" type="text"/>
- <button onclick="send()">发送消息</button>
- <hr/>
- <button onclick="closeWebSocket()">关闭 WebSocket 连接</button>
- <hr/>
- <div id="message"></div>
- </body>
- <script type="text/javascript">
- var websocket = null;
- // 判断当前浏览器是否支持 WebSocket
- if ('WebSocket' in window) {
- websocket = new WebSocket("ws://localhost:8081/onepic/websocket");
- }
- else {
- alert('当前浏览器 Not support websocket')
- }
- // 连接发生错误的回调方法
- websocket.onerror = function () {
- setMessageInnerHTML("WebSocket 连接发生错误");
- };
- // 连接成功建立的回调方法
- websocket.onopen = function () {
- setMessageInnerHTML("WebSocket 连接成功");
- }
- // 接收到消息的回调方法
- websocket.onmessage = function (event) {
- setMessageInnerHTML(event.data);
- }
- // 连接关闭的回调方法
- websocket.onclose = function () {
- setMessageInnerHTML("WebSocket 连接关闭");
- }
- // 监听窗口关闭事件, 当窗口关闭时, 主动去关闭 websocket 连接, 防止连接还没断开就关闭窗口, server 端会抛异常.
- window.onbeforeunload = function () {
- closeWebSocket();
- }
- // 将消息显示在网页上
- function setMessageInnerHTML(innerHTML) {
- document.getElementById('message').innerHTML += innerHTML + '<br/>';
- }
- // 关闭 WebSocket 连接
- function closeWebSocket() {
- websocket.close();
- }
- // 发送消息
- function send() {
- var message = document.getElementById('text').value;
- websocket.send(message);
- }
- </script>
- </html>
测试
注意点
webSocketSet 设置为全局静态变量, 为其他类提供调用
public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>();
服务端都是用注解实现的 @ServerEndpoint @OnOpen @OnClose @OnMessage @OnError
Tomcat7.0.47 以上版本支持 websocket1.0
pom 中添加 jar 支持
- <dependency>
- <groupId>javax</groupId>
- <artifactId>javaee-api</artifactId>
- <version>7.0</version>
- <scope>provided</scope>
- </dependency>
- Kafka
简介
Kafka 是一个分布式的, 可分区的, 可复制的消息系统.
开发
window 系统搭建 kafka 环境, 请参照我的 kafka 环境搭建(windows) https://jinshw.github.io/2018/06/01/kafka环境搭建-windows/#more 笔记
- kafka client for java
- package com.ykkj.weiyi.socket;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.io.IOException;
- import java.util.Arrays;
- import java.util.Properties;
- import static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;
- public class ConsumerKafka extends Thread {
- private KafkaConsumer<String, String> consumer;
- private String topic = "test.topic";
- public ConsumerKafka() {
- }
- @Override
- public void run() {
- // 加载 kafka 消费者参数
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "ytna");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "15000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // 创建消费者对象
- consumer = new KafkaConsumer<String, String>(props);
- consumer.subscribe(Arrays.asList(this.topic));
- // 死循环, 持续消费 kafka
- while (true) {
- try {
- // 消费数据, 并设置超时时间
- ConsumerRecords<String, String> records = consumer.poll(100);
- //Consumer message
- for (ConsumerRecord<String, String> record : records) {
- //Send message to every client
- for (CommodityServer webSocket : webSocketSet) {
- webSocket.sendMessage(record.value());
- }
- }
- } catch (IOException e) {
- System.out.println(e.getMessage());
- continue;
- }
- }
- }
- public void close() {
- try {
- consumer.close();
- } catch (Exception e) {
- System.out.println(e.getMessage());
- }
- }
- // 供测试用, 若通过 tomcat 启动需通过其他方法启动线程
- public static void main(String[] args) {
- ConsumerKafka consumerKafka = new ConsumerKafka();
- consumerKafka.start();
- }
- }
注意 topic 和 bootstrap.servers 配置
调用类
- package com.ykkj.weiyi.socket;
- public class RunThread {
- public RunThread() {
- ConsumerKafka kafka = new ConsumerKafka();
- kafka.start();
- }
- }
web.xml 配置
- <listener>
- <listener-class>com.ykkj.weiyi.socket.RunThread</listener-class>
- </listener>
测试
注意点
ConsumerKafka 需要在 web.xml 中配置监听, 不然在 ConsumerKafka 类中不能获取 webSocketSet 变量
引用 webSocketSet 变量方式
import static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;
注意 topic 和 bootstrap.servers 配置
结束语
这里只是做了技术验证, 还没有真正实现. 真实场景下, kafka 获取到的数据需要进行数据清洗, 把不符合当前规范的数据清洗掉, 并且按照前端展示需要的格式组装数据.
参考网上资料
- https://blog.csdn.net/lw_ghy/article/details/73252904
- https://blog.csdn.net/liu857279611/article/details/70157012
来源: https://juejin.im/post/5b20e2e16fb9a01e2c698c51