一, 背景
在最近的项目中的一个需求是消息实时推送消息以及通知功能, 项目使用 django 写的所以决定采用 django-channels 来实现 websocket 进行实时通讯. 目前官方已经更新到 2.1 版本, 相对于老的 channels 1.x 版本有了很大变化, 无论是使用方式还是功能, 其中最大的变化莫过于 2.x 版本中带来的 asyncio 特性, 可使用异步处理模式. 本文内容将介绍 channels2 版本使用, 由于项目 django 是 1.11, 其中也遇到了一些坑, 比如在 channels 在处理一次请求后 hang 住然后报错, 后面修改了下 django1.11 版本的一点源码得以解决, 2.0 版本应该不会有问题.
二, channels 介绍
channels 是以 django 插件的形式存在, 它不仅能处理 http 请求, 还提供对 websocket,MQTT 等长连接支持. 不仅如此, channels 在保留了原生 django 的同步和易用的特性上还带来了异步处理方式 (channels2.X 版本), 并且将 django 自带的认证系统以及 session 集成到模块中, 扩展性非常强. 官方文档: https://channels.readthedocs.io/en/latest/index.html
三, 安装以及安装需求
channels2.0 最低 django 版本要求是 1.11+,python3.5+. 笔者的版本是 django1.11, 直接安装可能有问题, 以下是测试通过的版本.
笔者的相关版本如下:
- Django==1.11.10
- channels==2.1.4
- channels-Redis==2.3.1
- asgiref==2.1.6
- asgi-Redis==1.4.3
如果 django 版本比较高直接采用 pip 安装:
- pip3 install channels
- pip3 install channels-Redis #可选的, 官方推荐如果使用 Redis 作为 channel layer
Redis 安装可以参考博客: https://www.cnblogs.com/wdliu/p/9360286.html
四, 开始使用
一, 配置 settings.py
笔者采用的 Redis 作为 channel layer(关于其介绍请移步至 https://channels.readthedocs.io/en/latest/topics/channel_layers.html), 它是实现消息推送的核心, 在项目的 settings.py 中:
注册 channles App:
- INSTALLED_APPS = [
- 'django.contrib.admin',
- 'django.contrib.auth',
- 'django.contrib.contenttypes',
- 'django.contrib.sessions',
- 'django.contrib.messages',
- 'django.contrib.staticfiles',
- 'cmdb',
- 'channels', #注册 App
- ]
配置 channels layer:
- ASGI_APPLICATION = 'devops.routing.application'
- CHANNEL_LAYERS = {
- 'default': {
- 'BACKEND': 'channels_redis.core.RedisChannelLayer',
- 'CONFIG': {
- "hosts": [('10.1.210.33', 6379)], #需修改
- },
- },
- }
二, 路由配置
在项目 settings 文件同级目录中新增 routing.py
- #!/usr/bin/env python3
- # -*- coding:utf-8 -*-
- # Author:wd
- from channels.auth import AuthMiddlewareStack
- from channels.routing import ProtocolTypeRouter, URLRouter
- import deploy.routing
- application = ProtocolTypeRouter({
- 'websocket': AuthMiddlewareStack(
- URLRouter(
- deploy.routing.websocket_urlpatterns# 指明路由文件是 devops/routing.py
- )
- ),
- })
最后在 App 里配置路由和对应的消费者, 笔者这里是 devops 下的 routing.py:
- #!/usr/bin/env python3
- # -*- coding:utf-8 -*-
- # Author:wd
- from django.conf.urls import url
- from . import consumers
- websocket_urlpatterns = [
- url(r'^ws/deploy/(?P<service_name>[^/]+)/$', consumers.DeployResult), #consumers.DeployResult 是该路由的消费者
- ]
项目目录结构如下:
三, 编写 webscoket 消息处理方法 (消费者)
首先说明, 消费者是 Channels 代码的基本单元, 当一个新的 Socket 进入的时候, Channels 会根据路由表找到正确的消费者, 以下代码中每个方法都可以看作一个消费者, 他们消费不同的 event, 比如刚刚接受连接时候 connect 方法进行消费处理并接受连接, 关闭 websocket 时候使用 disconnect 进行消费处理.
- deploy/consumers.py:
- #!/usr/bin/env python3
- # -*- coding:utf-8 -*-
- # Author:wd
- from channels.generic.websocket import AsyncWebsocketConsumer
- import JSON
- class DeployResult(AsyncWebsocketConsumer):
- async def connect(self):
- self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"]
- self.chat_group_name = 'chat_%s' % self.service_uid
- # 收到连接时候处理,
- await self.channel_layer.group_add(
- self.chat_group_name,
- self.channel_name
- )
- await self.accept()
- async def disconnect(self, close_code):
- # 关闭 channel 时候处理
- await self.channel_layer.group_discard(
- self.chat_group_name,
- self.channel_name
- )
- # 收到消息
- async def receive(self, text_data):
- text_data_json = JSON.loads(text_data)
- message = text_data_json['message']
- print("收到消息 --》",message)
- # 发送消息到组
- await self.channel_layer.group_send(
- self.chat_group_name,
- {
- 'type': 'client.message',
- 'message': message
- }
- )
- # 处理客户端发来的消息
- async def client_message(self, event):
- message = event['message']
- print("发送消息...",message)
- # 发送消息到 WebSocket
- await self.send(text_data=JSON.dumps({
- 'message': message
- }))
以上代码部分说明:
1.self.scope 是单个连接传入的详细信息, 其中包含了请求的 session, 以及 django 认证系统中的用户信息等;
2.async...await 是 python3.5 之后的新异步特性, 基于 asyncio 模块;
四, 发起 webscoket 请求
利用 JS 发起 websocket 请求
- function InitWebSocket() {
- var websocket = new WebSocket(
- 'ws://' + Windows.location.host + '/ws/deploy/tasks/' );
- websocket.onmessage = function (e) {
- var data = JSON.parse(e.data);
- var message = '\n' + data['message'];
- document.querySelector('#deploy-res').innerText += (message + '\n');
- };
- }
五, 发送消息到 channel
无论是消息的推送或者消息的接受, 都是经过 channel layer 进行传输, 以下是发送消息示例,
- from channels.layers import get_channel_layer
- from asgiref.sync import async_to_sync
- channel_layer = get_channel_layer()
- def send_channel_msg(channel_name, msg):
- """
- send msg to channel
- :param channel_name:
- :param msg:
- :return:
- """
- async_to_sync(channel_layer.group_send)(channel_name,
- {"type": "deploy.run", "text": msg})
六, 生产部署
大多数 django 的应用部署方式都采用的是 nginx+uwsgi 进行部署, 当 django 集成 channels 时候, 由于 uwsgi 不能处理 websocket 请求, 所以我们需要 asgi 服务器来处理 websocket 请求, 官方推荐使用 daphne. 下一篇文章将介绍 nginx+supervisor+daphne+uwsgi 进行生产部署.
来源: https://www.cnblogs.com/wdliu/p/10028236.html