Internet(全球互联网)是无数台机器基于TCP/IP协议族相互通信产生的。TCP/IP协议族分了四层实现,链路层、网络层、传输层、应用层。
顺序
|
字段名 | 长度(字节) |
字段类型
|
描述
|
1
|
消息长度 |
4(32bit)
|
int | socket报文的长度最长2^31-1字节,大文件传输不使用此字段 |
2
|
行为标识
|
1(8bit)
|
byte
|
用于分支处理数据1字节可标识256种行为,一般够用
|
3
|
加密标识 |
1(8bit)
|
byte
|
区分加密方式0不加密
|
4
|
时间戳 |
8(64bit)
|
long | 消息时间戳,其实也没啥用,加着玩的,忽视掉吧 |
5
|
消息体
|
|
String
|
长度为消息长度-10字节,建议使用json,具体解析行为由行为标识字段定义
|
2、规定通信工作流程
- /** 消息包(报文) **/
- class SocketPackage {
- int length; // 长度
- byte action; // 行为标识
- byte encryption; // 加密标识
- long timestamp; // 时间戳
- String data; // 消息体
- /** TODO:将此消息包转换为适当的byte数组 **/
- byte[] toBytes() {
- byte[] lengthBytes = int2bytes(length);
- // ...将各个字段都做了转换成bytes的操作后,合并byte数组并返回
- }
- /** TODO:读取输入流转换成一个消息包 **/
- static SocketPackage parse(InputStream in ) throws IOException {
- SocketPackage sp = new SocketPackage();
- byte[] lengthBytes = new byte[4]; in .read(lengthBytes); // 未收到信息时此步将会阻塞
- sp.length = bytes2int(lengthBytes);
- // .....其他字段读取就不写了,这里要控制好异常,不要随意catch住,如果发生异常,不是socket坏了就是报文异常了,应当采用拒绝连接的形式向对方跑出异常
- }
- }
- /** 封装下socket,使其可以保存更多的连接信息,不要纠结名字,我纠结了好一会儿不知道怎么命名,反正是伪代码,就这样写着吧 **/
- class NiuxzSocket {
- Socket socket;
- volatile long lastUse; // 上次使用时间
- // ...这里还可以再加其他属性,比如是否是写状态,写操作开始时间,上次非心跳包时间等
- NiuxzSocket(Socket socket) {
- this.socket = socket;
- this.lastUse = System.currentTimeMillis();
- }
- InputStream getIn() {
- return socket.getInputStream();
- }
- void write(byte[] bytes) throws IOException {
- this.socket.getOutputStream().write(bytes);
- }
- }
- /** 封装一个发送信息的接口,提供常用的发送信息方法。 **/
- interface SocketClient {
- SocketPackage sendData(SocketPackage sp);// 发送一个消息包,并等待返回的消息包
- // TODO:还可以根据双方的业务和协议添加几个更方便使用的接口方法。比如只返回消息体字段,或者直接返回json内容的
- void sendHeartBeat(NiuxzSocket socket);// 发送一个心跳包,这个方法后面讲心跳包时会用到
- }
- class DefaultSocketClient implements SocketClient {
- SocketPool socketPool;// 先假装有一个socket连接池,用来管理socket。不使用连接池的话,在这里直接注入一个NiuxzSocket就可以了。下面代码中也直接使用socket,但是一定要在使用时进行加锁操作。否则就会造成多线程访问同一个socket导致数据错乱了。
- /** 此方法就是主动端工作入口了,业务代码可以直接调用这里进行发送数据 **/
- SocketPackage sendData(SocketPackage sp){
- NiuxzSocket niuxzSocket = socketPool.get();//获取一个socket,这里可以看到获取的socket并不是原生的socket,其实是我们自己封装后的socket
- try{
- niuxzSocket.write(sp.toBytes());//阻塞持续写到缓存中
- niuxzSocket.lastUse = System.currentTimeMillis();//根据业务方法更新socket的状态信息
- SocketPackage sp = SocketPackage.parse(niuxzSocket.getIn());//阻塞读,等待消息的返回,因为是单线程操作socket所以不存在消息插队的情况。
- return sp;
- }catch(Exception e){
- LOG.error("发送消息包失败",e);
- socketPool.destroy(niuxzSocket)
- //在发生不可复用的异常时才关闭socket,并销毁这个NiuxzSocke。不可复用异常意思是IO操作到了一半不知道具体到哪了所以整个socket都不可用了。
- }
- finally{
- if(socketPool!=null){
- socketPool.recycle(niuxzSocket );//使用完这个socket后我们不要关闭,因为还要复用,让连接池回收这个socket。recycle内要判断socket是否是销毁状态。
- }
- }
- }
- }
- /** 定义一个连接池接口SocketPool **/
- interface SocketPool {
- /** 获取一个连接 **/
- NiuxzSocket get();
- /** 回收Socket **/
- void recycle(NiuxzSocket ns);
- /** 销毁Socket **/
- void destroy(NiuxzSocket ns);
- }
- /** 实现连接池 **/
- class DefaultSocketPool implements SocketPool {
- BlockingQueue < NiuxzSocket > sockets; // 存放socket的容器,也可以使用数组
- NiuxzSocket get() {
- // TODO:池里有就获取,没有就开一个线程去创建 并且等待创建完成,可使用synchronized/wait或Lock/condition
- }
- // TODO:实现socketPool,实现连接池是属于性能可靠性优化,要做的事情会比较多。偷个懒,大家懂就好,具体实现,等有时间我把我的连接池代码整理后再写一篇文章,有想了解的可以给我评论讨论下。
- }
- /**开启一个ServerSocket并等待连接,联入后开启一个线程进行处理**/
- class NiuxzServer {
- ServerSocket serverSocket;
- HashMap < NiuxzSocket > sockets = new HashMap < NiuxzSocket > ();
- public static AtomicInteger workerCount = 0;
- public Object waitLock = new Object();
- int maxWorkerCount = 100; //允许100个连接进入
- int port; //配置一个端口号
- /**工作入口**/
- void work() {
- serverSocket = new ServerSocket(port);
- while (true) {
- Socket socekt = serverSocket.accept(); //阻塞等待连接
- NiuxzSocket niuxzSocket = new NiuxzSocket(socket);
- sockets.put(niuxzSocket, 1); //将连接放入map中
- Worker worker = new Worker(niuxzSocket); //创建一个工作线程
- worker.start(); //开始线程
- while (true) {
- if (workerCount.incrementAndGet() >= maxWorkerCount) { //如果超过了规定的最大线程数,就进入等待,等待其他连接销毁
- synchronized(waitLock) {
- if (workerCount.incrementAndGet() >= maxWorkerCount) { //double check 确定进入等待前没有正在断开的socket
- waitLock.wait();
- } else {
- break;
- }
- }
- } else {
- break;
- }
- }
- }
- }
- /**销毁一个连接**/
- void destroy(NiuxzSocket socket) {
- synchronized(waitLock) {
- sockets.remove(socket); //从池子里删除
- workerCount.decrementAndGet(); //当前连接数减一
- waitLock.notify(); //通知work方法 可以继续接受请求了
- }
- }
- /**创建一个工作者线程类,处理连入的socket**/
- class Worker extends Thread {
- HashMap < Integer,
- SocketHandler > handlers; //针对每种行为标识做的消息处理器。
- NiuxzSocket socket;
- Worker(NiuxzSocketsocket) { //构造函数
- this.socket = socket;
- }
- void run() {
- try {
- while (true) {
- SocketPackage sp = SocketPackage.parse(socket.getIn()); //阻塞读,直到读完一个消息包未知,这样可以解决粘包或半包的问题
- SocketHandler handler = handlers.get(sp.getAction()); //根据行为标识获取响应的处理器
- handler.handle(sp, socket); //处理结果和响应信息都在handler中回写
- }
- }
- cache(Exception e) {
- LOG.error("连接异常中断", e);
- NiuxzServer.destroy(socket);
- }
- }
- }
- }
- /** 创建一个消息处理器 SocketHandler 接收所有内容后 回显 **/
- class EchoSocketHandler implements SocketHandler {
- /** 处理socket请求 **/
- void handle(SocketPackage sp, NiuxzSocket socket) {
- sp.setAction(10); // 比如协议中的行为标识10是响应成功的意思
- socket.write(sp.toBytes()); // 直接回写
- }
- }
至此两端的工作代码已经初步完成。socket可以按照相互制定的通讯方式进行通讯了。
3、心跳机制:
心跳机制socket长链接通讯中不可或缺的一个机制。主动端可以检测socket是否存活,被动端可以检测对方是否还在线。因为有时候网络并不一定那么完美,会出现链路上的异常,此时应用层可能并不能发现问题,等下次再用这个连接的时候就会抛出异常了,如果是被动端,还会白白占用着一个线程,不如在那之前就发现一部分异常,并销毁连接,下次通讯时出错的概率就降低了很多,被动端也会释放线程,释放资源。
- @Scheduled(fixedDelay = 30 * 1000) //延时30秒执行一次
- void HeartBeat() {
- for (NiuxzSocket socket: socketPool.getAllSocket()) {
- if (System.curTime() - socket.getLastUse() > 30 * 1000) { //如果系统时间减上次使用时间大于30秒
- //开启线程,从连接池中取出这个连接remove(socket)移除成功再继续操作,保证不会有其他线程同时使用这个socket。发送一个SocketPackage,socketClient.sendHeartBeat()
- if (socketPool.remove(socket)) {
- socketClient.snedHeartBeat(socket); //socketClient.snedHeartBeat这个方法实现:行为标识设置为心跳包,比如规定1就是心跳包。完事回收这个链接socketPool.recycle(socket),但当中间反生异常,则代表这个连接不可用了,就销毁socketPool.destroy(socket)。
- }
- }
- }
- }
以上便是我用同步socket实现第一版分布式文件系统时总结的经验,有些问题其实在NIO中变得不是问题了。NIO和AIO更适合会持有大量连接的服务器端。
来源: http://www.cnblogs.com/niuxiaozu/p/7942804.html