websocket
WebSocket 协议是基于 TCP 的一种新的网络协议. 它实现了浏览器与服务器全双工 (full-duplex) 通信 -- 允许服务器主动发送信息给客户端.
WebSocket 通信协议于 2011 年被 https://baike.baidu.com/item/IETF 定为标准 RFC 6455, 并被 RFC7936 所补充规范.
WebSocket 协议支持 (在受控环境中运行不受信任的代码的) 客户端与 (选择加入该代码的通信的) 远程主机之间进行全双工通信. 用于此的安全模型是 Web 浏览器常用的基于原始的安全模式. 协议包括一个开放的握手以及随后的 TCP 层上的消息帧. 该技术的目标是为基于浏览器的, 需要和服务器进行双向通信的 (服务器不能依赖于打开多个 HTTP 连接(例如, 使用 XMLHttpRequest 或 < iframe > 和长轮询)) 应用程序提供一种通信机制.
这个协议目前仍是草案, 只有最新的一些浏览器可以支持它. 但是, 它的好处是显而易见的, 随着支持它的浏览器越来越多, 我们将看到它越来越流行.(和以往的 Web 开发一样, 必须谨慎地坚持依赖可用的新功能并能在必要时回滚到旧技术的务实策略.)
Django 用法
在 1.9 版本之后, Django 实现了对 Channels 的支持, 他所使用的是 WebSocket 通信, 解决了实时通信的问题, 而且在使用 WebSocket 进行通信的同时依旧能够支持 HTTP 通信.
1.1 目录结构
在此结构中必须有硬性要求, 具体如下:
新的目录如下:
|-- channels_example
| |--channels_example
| |-- __init__.py
| |-- settings.py
| |-- urls.py
| |-- wsgi.py
| |-- routing.py #必须
| |-- consumer.py #必须
| |-- asgi.py
| |-- manage.py
1.2 配置 settings.py 文件
1.2.1 将其添加到 APP 列表里
- INSTALLED_APPS = [
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
- 'channels',
- ]
1.2.2 然后, 添加新的参数 CHANNEL_LAYERS, 如下:
- CHANNEL_LAYERS = {
- "default": {
- "BACKEND": "asgiref.inmemory.ChannelLayer",
- "ROUTING": "channels_example.routing.channel_routing",
- },
- }
需要注意的是 ROUTING 参数, 他是用来指定 WebSocket 表单的位置, 当有 WebSocket 请求访问时, 就会根据这个路径找到相应表单, 调用相应的函数进行处理.
channels_example.routing 就是我们刚才建好的 routing,py 文件, 里面的 channel_routing 我们下面会进行填充.
1.3 填写路由映射地址
- from channels.routing import route
- import consumers
- channel_routing = [
- route('websocket.connect', consumers.ws_connect),
- route('websocket.disconnect', consumers.ws_disconnect),
- # route('websocket.receive', consumers.ws_message),
- route('websocket.receive', consumers.ws_message_uuid),
- ]
1.4 路由映射到相对应的函数
- from django.http import HttpResponse
- from channels.handler import AsgiHandler
- #message.reply_channel 一个客户端通道的对象
- #message.reply_channel.send(chunk) 用来唯一返回这个客户端
- # 一个管道大概会持续 30s
- def ws_connect(message):
- auth = True
- if not auth:
- reply = json.dumps({'error': error})
- message.reply_channel.send({'text': reply, 'close': True})
- else:
- reply = "{}"
- message.reply_channel.send({'text': reply})
- print(">>> %s connected" % str(message))
- def ws_disconnect(message):
- print("<<<%s disconnected" % str(message))
- # with message_queue.mutex:
- # message_queue.queue.clear()
- while not message_queue.empty():
- try:
- message_queue.get(False)
- except Empty:
- continue
- message_queue.task_done()
- def ws_message_uuid(message):
- task = Task.create(message)
- if task:
- message_queue.put(task)
tornado 用法
1.1Tornado 的 WebSocket 模块
Tornado 在 websocket 模块中提供了一个 WebSocketHandler 类. 这个类提供了和已连接的客户端通信的 WebSocket 事件和方法的钩子. 当一个新的 WebSocket 连接打开时, open 方法被调用, 而 on_message 和 on_close 方法分别在连接接收到新的消息和客户端关闭时被调用.
此外, WebSocketHandler 类还提供了 write_message 方法用于向客户端发送消息, close 方法用于关闭连接.
- class EchoHandler(tornado.websocket.WebSocketHandler):
- def open(self):
- self.write_message('connected!')
- def on_message(self, message):
- self.write_message(message)
正如你在我们的 EchoHandler 实现中所看到的, open 方法只是使用 WebSocketHandler 基类提供的 write_message 方法向客户端发送字符串 "connected!". 每次处理程序从客户端接收到一个新的消息时调用 on_message 方法, 我们的实现中将客户端提供的消息原样返回给客户端. 这就是全部! 让我们通过一个完整的例子看看实现这个协议是如何简单的吧.
WebSocketHandler.open()
当一个 WebSocket 连接建立后被调用.
WebSocketHandler.on_message(message)
当客户端发送消息 message 过来时被调用, 注意此方法必须被重写.
WebSocketHandler.on_close()
当 WebSocket 连接关闭后被调用.
WebSocketHandler.write_message(message, binary=False)
向客户端发送消息 messagea,message 可以是字符串或字典 (字典会被转为 json 字符串). 若 binary 为 False, 则 message 以 utf8 编码发送; 二进制模式(binary=True) 时, 可发送任何字节码.
WebSocketHandler.close()
关闭 WebSocket 连接.
WebSocketHandler.check_origin(origin)
判断源 origin, 对于符合条件 (返回判断结果为 True) 的请求源 origin 允许其连接, 否则返回 403. 可以重写此方法来解决 WebSocket 的跨域请求(如始终 return True).
1.2 实例 -- 工作 websocket 实际应用
- #coding=utf-8
- import uuid
- import os
- from works.actions import work
- import hashlib
- import json
- import Queue
- from threading import Thread
- import numpy as np
- import cv2
- import base64
- import jwt
- import tornado.gen
- from handlers.base_handler import BaseWebSocket
- from config import MEDIA_ROOT
- import time
- message_queue = Queue.PriorityQueue()
- def work_loop():
- while True:
- task = message_queue.get()
- iuuid = task.uuid
- offset_top = task.offset_top
- image_data = task.image_data
- channel = task.channel
- zoom = task.zoom
- rType = task.rType
- responseType = task.responseType
- print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top))
- filename = str(uuid.uuid1()) + '.jpg'
- filepath = os.path.join(MEDIA_ROOT, filename)
- with open(filepath, 'wb') as f:
- f.write(image_data.decode("base64"))
- if zoom != 1.0:
- im = cv2.imread(filepath)
- if im is None:
- continue
- osize = im.shape[1], im.shape[0]
- size = int(im.shape[1] * zoom), int(im.shape[0] * zoom)
- im = cv2.resize(im, size)
- cv2.imwrite(filepath, im)
- try:
- reply = work(filepath, use_crop=False, result=rType,responseType=responseType)
- except Exception as e:
- print("!!!!!! %s -> %s caused error" % (iuuid, filename))
- print(e)
- cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename))
- os.system(cmd.encode('utf-8'))
- continue
- if responseType == 'url':
- # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename
- rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename
- reply = {'url': rtn_url, 'uuid': iuuid}
- reply['uuid'] = iuuid
- channel.write_message({'text': json.dumps(reply)})
- print '%s end time:' % channel, time.time()
- class BrowserWebSocket(BaseWebSocket):
- '''浏览器 websocket 服务器'''
- def open(self):
- '''新的 WebSocket 连接打开时被调用'''
- # message = {}
- # remote_ip = self.request.remote_ip
- # message['query_string']=self.get_argument('query_string')
- # message['remote_ip']=remote_ip
- # auth, error = verify_auth_token(message)
- auth = True
- error = 'error'
- if not auth:
- reply = json.dumps({'error': error})
- self.write_message({'text': reply, 'close': True})
- else:
- reply = "{}"
- self.write_message({'text': reply})
- print(">>> %s connected" % self.request.remote_ip)
- def on_message(self, message):
- '''连接收到新消息时被调用'''
- print '%s start time:'%self,time.time()
- task = Task.create(message,self)
- if task:
- message_queue.put(task)
- @tornado.gen.coroutine
- def on_messages(self, message):
- '''连接收到新消息时被调用'''
- task = Task.create(message,self)
- if task:
- message_queue.put(task)
- def on_close(self):
- '''客户端关闭时被调用'''
- print("<<<%s disconnected" % str(self.request.remote_ip))
- # with message_queue.mutex:
- # message_queue.queue.clear()
- while not message_queue.empty():
- try:
- message_queue.get(False)
- except Queue.Empty:
- continue
- message_queue.task_done()
- def check_origin(self, origin):
- '''允许 WebSocket 的跨域请求'''
- return True
- class Task(object):
- def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args):
- self.uuid = uuid
- self.offset_top = int(float(offset_top))
- self.image_data = image_data
- self.channel = channel
- self.zoom = zoom
- self.rType = rType
- self.responseType = responseType
- @classmethod
- def create(clz, message,sel):
- # data = message.get('text')
- data = message
- try:
- params = json.loads(data[:150])
- image_data = data[150:]
- image_data = image_data.replace("","+")
- params['image_data'] = image_data
- params['channel'] = sel
- # add Type
- if params.get('responseType') is None:
- params['responseType'] = 'url'
- # request type
- if params.get('rType') is None:
- params['rType'] = 'rl'
- task = Task(**params)
- except ValueError as e:
- task = None
- print(">>>message data error!")
- print(e)
- return task
- def __cmp__(self, other):
- return cmp(self.offset_top, other.offset_top)
- def verify_auth_token(message):
- '''token 验证'''
- token = message.get('query_string')
- secret_key = 'aoiakai'
- try:
- payload = jwt.decode(token, secret_key, algorithms=['HS256'])
- if payload.get('ip') != message.get('remote_ip'):
- return False, 'ip mismatch'
- except jwt.ExpiredSignatureError as e:
- print(e)
- return False, 'token expired'
- except Exception as e:
- print(e)
- return False, 'enter correct token'
- return True, ''
- work_thread = Thread(target=work_loop)
- work_thread.daemon = True
- work_thread.start()
来源: https://www.cnblogs.com/aylin/p/8831135.html