这是 Django Channels 系列文章的第二篇, 以 web 端实现 tailf 的案例讲解 Channels 的具体使用以及跟 Celery 的结合
通过上一篇《Django 使用 Channels 实现 WebSocket-- 上篇》 https://mp.weixin.qq.com/s/hqaPrPS7w3D-9SeegQAB2Q 的学习应该对 Channels 的各种概念有了清晰的认知, 可以顺利的将 Channels 框架集成到自己的 Django 项目中实现 WebSocket 了, 本篇文章将以一个 Channels+Celery 实现 Web 端 tailf 功能的例子更加深入的介绍 Channels
先说下我们要实现的目标: 所有登录的用户可以查看 tailf 日志页面, 在页面上能够选择日志文件进行监听, 多个页面终端同时监听任何日志都互不影响, 页面同时提供终止监听的按钮能够终止前端的输出以及后台对日志文件的读取
最终实现的结果见下图
接着我们来看下具体的实现过程
技术实现
所有代码均基于以下软件版本:
- python==3.6.3
- django==2.2
- channels==2.1.7
- celery==4.3.0
celery4 在 Windows 下支持不完善, 所以请在 Linux 下运行测试
日志数据定义
我们只希望用户能够查询固定的几个日志文件, 就不是用数据库仅借助 settings.py 文件里写全局变量来实现数据存储
在 settings.py 里添加一个叫 TAILF 的变量, 类型为字典, key 标识文件的编号, value 标识文件的路径
- TAILF = {
- 1: '/ops/coffee/error.log',
- 2: '/ops/coffee/access.log',
- }
基础 Web 页面搭建
假设你已经创建好了一个叫 tailf 的 App, 并添加到了 settings.py 的 INSTALLED_APPS 中, App 的目录结构大概如下
- tailf
- - migrations
- - __init__.py
- - __init__.py
- - admin.py
- - apps.py
- - models.py
- - tests.py
- - views.py
依然先构建一个标准的 Django 页面, 相关代码如下
- url:
- from django.urls import path
- from django.contrib.auth.views import LoginView,LogoutView
- from tailf.views import tailf
- urlpatterns = [
- path('tailf', tailf, name='tailf-url'),
- path('login', LoginView.as_view(template_name='login.html'), name='login-url'),
- path('logout', LogoutView.as_view(template_name='login.html'), name='logout-url'),
- ]
因为我们规定只有通过登录的用户才能查看日志, 所以引入 Django 自带的 LoginView,logoutView 帮助我们快速构建 Login,Logout 功能
指定了登录模板使用 login.HTML, 它就是一个标准的登录页面, post 传入 username 和 password 两个参数即可, 不贴代码了
- view:
- from django.conf import settings
- from django.shortcuts import render
- from django.contrib.auth.decorators import login_required
- # Create your views here.
- @login_required(login_url='/login')
- def tailf(request):
- logDict = settings.TAILF
- return render(request, 'tailf/index.html', {"logDict": logDict})
引入了 login_required 装饰器, 来判断用户是否登录, 未登录就给跳到 / login 登录页面
logDict 去 setting 里取我们定义好的 TAILF 字典赋值, 并传递给前端
- template:
- {% extends "base.html" %}
- {% block content %}
- <div class="col-sm-8">
- <select class="form-control" id="file">
- <option value=""> 选择要监听的日志 </option>
- {% for k,v in logDict.items %}
- <option value="{{ k }}">{{ v }}</option>
- {% endfor %}
- </select>
- </div>
- <div class="col-sm-2">
- <input class="btn btn-success btn-block" type="button" onclick="connect()" value="开始监听"/><br/>
- </div>
- <div class="col-sm-2">
- <input class="btn btn-warning btn-block" type="button" onclick="goclose()" value="终止监听"/><br/>
- </div>
- <div class="col-sm-12">
- <textarea class="form-control" id="chat-log" disabled rows="20"></textarea>
- </div>
- {% endblock %}
前端拿到 TAILF 后通过循环的方式填充到 select 选择框下, 因为数据是字典格式, 使用 logDict.items 的方式可以循环出字典的 key 和 value
这样一个日志监听页面就完成了, 但还无法实现日志的监听, 继续往下
集成 Channels 实现 WebSocket
日志监听功能主要的设计思路就是页面跟后端服务器建立 websocket 长连接, 后端通过 celery 异步执行 while 循环不断的读取日志文件然后发送到 websocket 的 channel 里, 实现页面上的实时显示
接着我们来集成 channels
先添加 routing 路由, 直接修改 webapp/routing.py
- from channels.auth import AuthMiddlewareStack
- from channels.routing import ProtocolTypeRouter, URLRouter
- from django.urls import path, re_path
- from chat.consumers import ChatConsumer
- from tailf.consumers import TailfConsumer
- application = ProtocolTypeRouter({
- 'websocket': AuthMiddlewareStack(
- URLRouter([
- path('ws/chat/', ChatConsumer),
- re_path(r'^ws/tailf/(?P<id>\d+)/$', TailfConsumer),
- ])
- )
- })
直接将路由信息写入到了 URLRouter 里, 注意路由信息的外层多了一个 list, 区别于上一篇中介绍的写路由文件路径的方式
页面需要将监听的日志文件传递给后端, 我们使用 routing 正则 P<id>\d + 传文件 ID 给后端程序, 后端程序拿到 ID 之后根据 settings 中指定的 TAILF 解析出日志路径
routing 的写法跟 Django 中的 url 写法完全一致, 使用 re_path 匹配正则 routing 路由
添加 consumer 在 tailf/consumers.py 文件中
- import JSON
- from channels.generic.websocket import WebsocketConsumer
- from tailf.tasks import tailf
- class TailfConsumer(WebsocketConsumer):
- def connect(self):
- self.file_id = self.scope["url_route"]["kwargs"]["id"]
- self.result = tailf.delay(self.file_id, self.channel_name)
- print('connect:', self.channel_name, self.result.id)
- self.accept()
- def disconnect(self, close_code):
- # 中止执行中的 Task
- self.result.revoke(terminate=True)
- print('disconnect:', self.file_id, self.channel_name)
- def send_message(self, event):
- self.send(text_data=JSON.dumps({
- "message": event["message"]
- }))
这里使用 Channels 的单通道模式, 每一个新连接都会启用一个新的 channel, 彼此互不影响, 可以随意终止任何一个监听日志的请求
connect
我们知道 self.scope 类似于 Django 中的 request, 记录了丰富的请求信息, 通过 self.scope["url_route"]["kwargs"]["id"] 取出 routing 中正则匹配的日志 ID
然后将 id 和 channel_name 传递给 celery 的任务函数 tailf,tailf 根据 id 取到日志文件的路径, 然后循环文件, 将新内容根据 channel_name 写入对应 channel
disconnect
当 websocket 连接断开的时候我们需要终止 Celery 的 Task 执行, 以清除 celery 的资源占用
终止 Celery 任务使用到 revoke 指令, 采用如下代码来实现
self.result.revoke(terminate=True)
注意 self.result 是一个 result 对象, 而非 id
参数 terminate=True 的意思是是否立即终止 Task, 为 True 时无论 Task 是否正在执行都立即终止, 为 False(默认) 时需要等待 Task 运行结束之后才会终止, 我们使用了 While 循环不设置为 True 就永远不会终止了
终止 Celery 任务的另外一种方法是:
- from webapp.celery import App
- App.control.revoke(result.id, terminate=True)
- send_message
方便我们通过 Django 的 view 或者 Celery 的 task 调用给 channel 发送消息, 官方也比较推荐这种方式
使用 Celery 异步循环读取日志
上边已经集成了 Channels 实现了 WebSocket, 但 connect 函数中的 celery 任务 tailf 还没有实现, 下边来实现它
关于 Celery 的详细内容可以看这篇文章:《Django 配置 Celery 执行异步任务和定时任务》 https://mp.weixin.qq.com/s/lXrp3igYo9W2UuE5Gauysg , 本文就不介绍集成使用以及细节原理, 只讲一下任务 task
task 实现代码如下:
- from __future__ import absolute_import
- from celery import shared_task
- import time
- from channels.layers import get_channel_layer
- from asgiref.sync import async_to_sync
- from django.conf import settings
- @shared_task
- def tailf(id, channel_name):
- channel_layer = get_channel_layer()
- filename = settings.TAILF[int(id)]
- try:
- with open(filename) as f:
- f.seek(0, 2)
- while True:
- line = f.readline()
- if line:
- print(channel_name, line)
- async_to_sync(channel_layer.send)(
- channel_name,
- {
- "type": "send.message",
- "message": "微信公众号 [运维咖啡吧] 原创 版权所有" + str(line)
- }
- )
- else:
- time.sleep(0.5)
- except Exception as e:
- print(e)
这里边主要涉及到 Channels 中另一个非常重要的点: 从 Channels 的外部发送消息给 Channel
其实上篇文章 https://mp.weixin.qq.com/s/hqaPrPS7w3D-9SeegQAB2Q 中检查通道层是否能够正常工作的时候使用的方法就是从外部给 Channel 通道发消息的示例, 本文的具体代码如下
- async_to_sync(channel_layer.send)(
- channel_name,
- {
- "type": "send.message",
- "message": "微信公众号 [运维咖啡吧] 原创 版权所有" + str(line)
- }
- )
channel_name 对应于传递给这个任务的 channel_name, 发送消息给这个名字的 channel
type 对应于我们 Channels 的 TailfConsumer 类中的 send_message 方法, 将方法中的_换成. 即可
message 就是要发送给这个 channel 的具体信息
上边是发送给单 Channel 的情况, 如果是需要发送到 Group 的话需要使用如下代码
- async_to_sync(channel_layer.group_send)(
- group_name,
- {
- 'type': 'chat.message',
- 'message': '欢迎关注公众号 [运维咖啡吧]'
- }
- )
只需要将发送单 channel 的 send 改为 group_send,channel_name 改为 group_name 即可
需要特别注意的是: 使用了 channel layer 之后一定要通过 async_to_sync 来异步执行
页面添加 WebSocket 支持
后端功能都已经完成, 我们最后需要添加前端页面支持 WebSocket
- function connect() {
- if ( $('#file').val() ) {
- Windows.chatSocket = new WebSocket(
- 'ws://' + Windows.location.host + '/ws/tailf/' + $('#file').val() + '/');
- chatSocket.onmessage = function(e) {
- var data = JSON.parse(e.data);
- var message = data['message'];
- document.querySelector('#chat-log').value += (message);
- // 跳转到页面底部
- $('#chat-log').scrollTop($('#chat-log')[0].scrollHeight);
- };
- chatSocket.onerror = function(e) {
- toastr.error('服务端连接异常!')
- };
- chatSocket.onclose = function(e) {
- toastr.error('websocket 已关闭!')
- };
- } else {
- toastr.warning('请选择要监听的日志文件')
- }
- }
上一篇文章 https://mp.weixin.qq.com/s/hqaPrPS7w3D-9SeegQAB2Q 中有详细介绍过 websocket 的消息类型, 这里不多介绍了
至此我们一个日志监听页面完成了, 包含了完整的监听功能, 但还无法终止, 接着看下面的内容
Web 页面主动断开 WebSocket
Web 页面上 "终止监听" 按钮的主要逻辑就是触发 WebSocket 的 onclose 方法, 从而可以触发 Channels 后端 consumer 的 disconnect 方法, 进而终止 Celery 的循环读取日志任务
前端页面通过. close() 可以直接触发 WebSocket 关闭, 当然你如果直接关掉页面的话也会触发 WebSocket 的 onclose 消息, 所以不用担心 Celery 任务无法结束的问题
- function goclose() {
- console.log(Windows.chatSocket);
- Windows.chatSocket.close();
- Windows.chatSocket.onclose = function(e) {
- toastr.success('已终止日志监听!')
- };
- }
至此我们包含完善功能的 Tailf 日志监听, 终止页面就全部完成了
写在最后
两篇文章结束不知道你是否对 Channels 有了更深一步的了解, 能够操刀上手将 Channels 用在自己的项目中, 实现理想的功能. 个人觉得 Channels 的重点和难点在于对 channel layer 的理解和运用, 真正的理解了并能熟练运用, 相信你一定能够举一反三完美实现更多需求. 最后如果对本文的 demo 源码感兴趣可以关注微信公众号 [运维咖啡吧] 后台回复小二加我微信向我索取, 一定有求必应
来源: https://www.cnblogs.com/37Y37/p/11274102.html