心跳一般是指客户端(也可以是服务器端)向对方每隔一段时间发送一个应用层的自定义指令,以确保连接的有效性。因为是固定间隔,同时是检测存活,就像人的心跳一样,顾名思义,称为心跳包。一般是用于长连接,对消息实时性要求比较高的服务中,比如 IM 服务,推送服务。
在即时通讯领域和推送服务中,对消息的实时性和可用性要求非常高,建立长连接,可以有效节省 DNS 解释时间,TCP/IP 三次握手时间,同时为了保证连接是可用的,不至于经常发了消息对方无法收到,必须要有一种机制检测连接的有效性。TCP 是一个基于连接的协议,连接是由一个状态机进行维护,当连接建立成功后,双方都处于 established ,除非我们进行主动调用,否则状态一直不会变化,即使中间路由已经崩溃,网线已经被剪断。TCP 有一种 KeepAlive 机制,TCP 层在定时时间发送相应的 KeepAlive 探针以确保连接的可用性,默认每 7200 秒发送一次,超过 75 秒没有返回就超时,超时后重试 10 次,虽然可以修改默认值,但仍然无法满足要求。尤其是考虑到一种特殊情况,TCP 连接存活,但是主机不处于存活状态,比如 CPU 负载到 100%,无法响应任何请求。这时候,就需要客户端主动切断连接,主动切换到其他备用机。
通常,我们一个家庭里面只接入一根网线,所有设备通过路由器共用一个出口 IP,路由器就是一个 NAT 设备,NAT 设备在 IP 封包流过设备的时候,自动修改源和目标地址,家用路由器甚至基于 NAPT 修改端口号,路由器内部会维护一个 NAT 映射表
比如内网里面的 172.1.1.2:7777 对应外网 221.22.2.1:8888 等。我们的手机接入的蜂窝网络后,运营商就会给我们分配一个内网 IP(类似 10.2.2.3),由运营商的网管维护一个 NAT 的映射表,确保手机能接入互联网。大部分运营商会在手机一段时间没有数据通讯的时候,会把设备从 NAT 表中剔除,造成了连接中断,但是对 TCP 连接的双方是不可感知的,服务端就无法给客户端发送消息。像中国移动和中国联通的 NAT 超时时间是 5 分钟,国际上运营商普遍都是大于 28 分钟。
心跳太短保证不了可靠性,太频繁会带来高耗电和大量的流量消耗,这在移动设备上面是不可接受的。最合理的解决方案是设定一个合理的间隔,一般可以根据程序状态进行调整,逐步拉长心跳间隔,5 分钟,10 分钟,甚至 15 分钟。服务端进行可靠性判断的时候也可以放宽标准,只有 N 次超时才被认为是连接已经断开。心跳的周期以最后一条指令为准,而非固定间隔。
在 DEMO 中,双方约定一个协议,发送方先对管道写入一个 8 位的 byte 值,接收方只要一接收到数据,马上按照 byte 类型标准读取前 8 位,通过这一个字节的值来确定对方现在发过来的是什么类型的数据。为什么要选择 byte 呢?因为 byte 足够短,只占用一个字节,尽量减少数据传输量,可以通过一个字节表达 256 种情况。当然根据实际业务需求,选择 int,long 类型也是完全没问题的。
在这个例子中,我们约定 byte 的值是 1 的话,那么我们解释为心跳包,后面不再有数据,直接在屏幕中打印收到客户端的心跳包,byte 的值是 2 的话,我们知道对方要发一个字符串过来,那么需要进一步处理,再次调用 readUTF 方法,读取一个 UTF-8 字符串
服务端建立一个类,采用同步多线程模式,主类负责接收 socket 请求,子线程 Worker 类负责处理业务逻辑
- public class Server {
- public static void main(String[] args) {
- try {
- ServerSocket serverSocket = new ServerSocket(30000); //实例化ServerSocket,绑定监听本机的30000端口
- while (true) {
- Socket socket = serverSocket.accept(); //这个是阻塞方法,只有监听到客户端连接过来了,才会继续往下走。
- System.out.println(socket.getInetAddress().getHostName() + "连接到服务器...");
- //Worker线程启动代码
- Worker worker = new Worker(socket);
- new Thread(worker).start();
- }
- } catch(Exception e) {
- System.out.println("主线程抛出异常");
- e.printStackTrace();
- }
- }
- }
Worker 线程
- class Worker implements Runnable {
- private Socket socket;
- private InputStream in ;
- private OutputStream out;
- private ObjectInputStream ois;
- private boolean flag = true;
- public Worker(Socket socket) {
- try {
- this.socket = socket; //要获得一个从主线程传过来的客户端socket实例,每个客户端都不一样
- in =socket.getInputStream(); //从客户端实例中,获取输入流实例
- out = socket.getOutputStream(); //获取输出流实例
- ois = new ObjectInputStream( in ); //实例化ObjectInputStream
- } catch(Exception e) {
- System.out.println("worker构造函数抛出异常");
- e.printStackTrace();
- }
- }
- public void run() {
- try {
- while (flag) {
- //协议的第一位是数字,先读取第一位
- int type = ois.readByte();
- if (type == 1) {
- //第一位是1的话,就直接当心跳包处理
- System.out.println("收到" + socket.getInetAddress().getHostAddress() + "发送过来的心跳包");
- } else if (type == 2) {
- //第一位是2的话,我们可以知道,对方发过来的是UTF-8格式的String,所以可以调用readUTF方法继续读取
- System.out.println(socket.getInetAddress().getHostAddress() + "说:" + ois.readUTF());
- }
- }
- } catch(EOFException e) {
- System.out.println("对方已关闭连接");
- flag = false;
- } catch(IOException ioe) {
- ioe.printStackTrace();
- }
- }
- }
- public class Client {
- private static final String host = "127.0.0.1"; //目标地址,这里是本机
- private static final int port = 30000; //目标端口
- public static void main(String[] args) {
- Socket socket = new Socket();
- try {
- socket.connect(new InetSocketAddress(host, port)); //建立socket连接
- OutputStream out = socket.getOutputStream(); //从socket中获取读取流的实例
- ObjectOutputStream oos = new ObjectOutputStream(out); //实例化ObjectOutputStream ,用于自定义的传输协议
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System. in )); //用来读取键盘输入,用了缓冲类
- TimeStore timeStore = new TimeStore(); //建立一个类来存储数据最后发送时间
- new Thread(new SendHeartbeat(oos, timeStore)).start(); //启动心跳业务线程
- String line = new String();
- //下面几行代码用于获取用户输入
- while ((line = bufferedReader.readLine()) != null) {
- oos.writeByte(2); //要发送的是自定义协议的字符串,先写入一个2,告诉服务端,准备发送字符串数据
- oos.writeUTF(line); //写入一个UTF字符串到流中
- oos.flush();
- timeStore.setLastSendTime(System.currentTimeMillis()); //记录最后的写入时间到时间存储类
- }
- oos.close();
- } catch(IOException e) {
- System.out.println("数据写入IO异常");
- } finally {
- try {
- socket.close();
- } catch(IOException e2) {
- e2.printStackTrace();
- }
- }
- }
- }
心跳专门开一条线程来发送,这样不受主线程业务的堵塞代码影响
- class SendHeartbeat implements Runnable {
- private ObjectOutputStream oos;
- private TimeStore timeStore;
- public SendHeartbeat(ObjectOutputStream oos, TimeStore timeStore) {
- this.oos = oos;
- this.timeStore = timeStore;
- }
- public void run() {
- try {
- while (true) {
- Thread.sleep(1000); //死循环,每秒启动一次
- //当上次发送时间是在10秒或之前,才发送心跳
- if ((System.currentTimeMillis() - timeStore.getLastSendTime()) >= 10 * 1000) {
- //写入1,告诉服务端发送的是心跳包
- oos.writeByte(1);
- oos.flush();
- //记录时间
- timeStore.setLastSendTime(System.currentTimeMillis());
- }
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
- class TimeStore {
- private long lastSendTime;
- //多线程下读取需要加锁
- public synchronized long getLastSendTime() {
- return lastSendTime;
- }
- //同样,多线程下写入需要加锁
- public synchronized void setLastSendTime(long lastSendTime) {
- this.lastSendTime = lastSendTime; //把时间放到私有属性
- System.out.println("最后一次发包时间" + new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date(lastSendTime))); //把发包时间打印到屏幕上
- }
- }
2013 年,中国移动曾把刀口指向了微信,正是因为心跳包可能会引起的信令风暴,微信占用了中移动 60% 的信令资源,但仅带来 10% 的移动数据流量。每次发送心跳包,都需要移动通信网络为用户分配资源,分配的过程体现在信令的发送和接收上。一次心跳包的发送过程,牵涉的信令多达几十条。后来微信对心跳间隔进行了优化才暂时平息了这场风波。微信采用的方案是当微信处于前台活跃状态时,使用固定心跳。微信进入后台(或者前台关屏)时,先用几次最小心跳维持长链接。然后进入后台自适应心跳计算。这样做的目的是尽量选择用户不活跃的时间段,来减少心跳计算可能产生的消息不及时收取影响。详看微信心跳包优化方案
来源: http://www.cnblogs.com/jaychan/p/7168869.html