本文的内容
如何用 filebeat kafka es 做一个好用, 好管理的日志收集工具
放弃 logstash, 使用 elastic pipeline
gunicron 日志格式与 filebeat/es 配置
flask 日志格式与异常日志采集与 filebeat/es 配置
以上的配置
概况
我有一个 HTTP 请求, 经过的路径为
Gateway(kong)-->webContainer(gunicorn)-->WebApp(flask)
我准备以下流向处理我的日志
- file --> filebeat --> kafka topic--> filebeat --> elastic pipeline --> Elasticsearch
- |
- | ----------> HBase
为什么这么做
Logstash 去哪里了?
Logstash 太重了, 不过这不是问题, 也就是多个机器加点钱的问题. 能把事情处理就行.
Logstash 不美, Logstash 虽然是集中管理配置, 但是一个 logstash 好像总是不够, Logstash 好像可以分开配置, 但是你永远不知道如何划分哪些配置应该放在一个配置文件, 哪些应该分开.
删除一个配置? 不可能的, 我怎么知道应该删除什么配置.
如果用了 Logstash. As a 'poor Ops guys having to understand and keep up with all the crazy input possibilities. ^_^
Filebeat 的痛处
看看这个 Issue 吧, 万人血书让 filebeat 支持 grok, 但是就是不支持, 不过给了我们两条路, 比如你可以用存 JSON 的日志啊, 或者用 pipeline
Filebeat 以前是没有一个好的 kafka-input. 只能自己写 kafka-es 的转发工具
简单点
我想要的日志采集就是简简单单, 或者说微服务的内聚力. 一条日志采集线就不该和其他业务混合. 最好的就是以下这种状态
onefile -> filebeat_config -> kafka_topic -> filebeat_config -> elastic pipepline -> es index
Gunicorn 日志
gunicorn 日志
gunicorn 日志采集如下的信息
- time
- client_ip
- http method
- http scheme
- url
- url query string
- response status code
- client name
- rt
- trace id
- remote ips
日志格式
%(t)s [%(h)s] [%(m)s] [%(H)s] [%(U)s] [%(q)s] [%(s)s] [%(a)s] [%(D)s] [%({Kong-Request-ID}i)s] [%({X-Forwarded-For}i)s]
日志例子
[15/Nov/2019:10:23:37 +0000] [172.31.37.123] [GET] [HTTP/1.1] [/API/v1/_instance/json_schema/Team/list] [a=1] [200] [Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/78.0.3904.97 Safari/537.36] [936] [9cbf6a3b-9c3a-4835-a2ef-02e03ee826d7#16] [137.59.103.3, 172.30.17.253, 172.30.18.12]
Es processing 解析
es processing 是 6.0 之后的功能, 相当于 es 之前自带了一个 logstash. 对于复杂日志有多种 processing,
可以使用 grok 或者 dissect. 某些情况下 dissect 更加快一些.
经过 kafka, 再有 filebeat 打到 ES, 需要删除多余的信息
- PUT _ingest/pipeline/gunicorn
- {
- "description" : "devops gunicorn pipeline",
- "processors" : [
- {
- "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]}
- },
- {
- "json": {
- "field": "message",
- "add_to_root": true
- }
- },
- {
- "remove": {"field": ["@metadata", "ecs", "agent", "input"]}
- },
- {
- "dissect" : {
- "field": "message",
- "pattern": "[%{@timestamp}] [%{client_ip}] [%{method}] [%{scheme}] [%{path}] [%{query_string}] [%{status}] [%{client}] [%{rt_millo}] [%{trace_id}] [%{remote_ips}]"
- }
- }
- ],
- "on_failure": [
- {
- "set": {
- "field": "_index",
- "value": "failed-{{ _index }}"
- }
- }
- ]
- }
- Es mapping
这里比较关键的是 ES 时间格式文档的定义, 如果某些字段我们觉得有必要分词, 就是用 text. 否则使用 keyword. 这样可以更加
方便的聚合和查询日志数据, 开启_source 方便做一些数据统计
- PUT _template/gunicorn
- {
- "index_patterns": ["*gunicorn*"],
- "settings": {
- "number_of_shards": 1
- },
- "version": 1,
- "mappings": {
- "_source": {
- "enabled": true
- },
- "properties": {
- "@timestamp": {
- "type": "date",
- "format": "dd/LLL/yyyy:HH:mm:ss Z"
- },
- "client_ip": {
- "type": "ip"
- },
- "method": {
- "type": "keyword"
- },
- "scheme": {
- "type": "keyword"
- },
- "path": {
- "type": "text"
- },
- "query_string": {
- "type": "text"
- },
- "status": {
- "type": "integer"
- },
- "client": {
- "type": "text"
- },
- "rt_millo": {
- "type": "long"
- },
- "trace_id": {
- "type": "keyword"
- },
- "remote_ips": {
- "type": "text"
- }
- }
- }
- }
filebeat 采集到 kafka 配置文件
- filebeat.inputs:
- - type: log
- paths:
- - /yourpath/gunicorn-access.log
- multiline.pattern: '^\['
- multiline.negate: true
- multiline.match: after
- tail_files: true
- queue.mem:
- events: 4096
- flush.min_events: 512
- flush.timeout: 5s
- output.kafka:
- hosts: ["kafka-01","kafka-02","kafka-03"]
- topic: 'gunicron_access'
- required_acks: 1
- compression: gzip
- max_message_bytes: 1000000
filebeat 从 kafka 消费配置文件
- filebeat.inputs:
- - type: kafka
- hosts: ["kafka-01","kafka-02","kafka-03"]
- topics: ["gunicron_access"]
- group_id: "filebeat_gunicron"
- output.Elasticsearch:
- hosts: ["es-url"]
- pipeline: "gunicorn"
- index: "gunicorn-%{ yyyy.MM.dd}"
- setup.template.name: "gunicorn"
- setup.template.pattern: "gunicorn-*"
- setup.ilm.enabled: false
- setup.template.enabled: false
Flask 日志
Flask 日志是我们程序打印的, 用于查看一些异常和错误的日志. 在上线初期, info 日志是可以打开 debug 的日志的. 这样方便我们进行调试.
在稳定之后应该将日志接受级别调高. info 日志不适合做统计, 只是除了问题我们可以快速定位问题所在. 异常应该打到 info 日志中
INFO 日志可以使用我建议的格式. 我们关心
time
levelname: 日志级别
host, process, thread: 用于定位到某台机器的某个进程下的某个线程 (一些复杂的 bug 需要, 或者开启了异步进程)
name, funcname, filename, lineno: 用于定位日志发生的代码位置
message: 日志内容
日志格式
- {
- "format": "[%(asctime)s.%(msecs)03d] [%(levelname)s] [{}:%(process)d:%(thread)d] [%(name)s:%(funcName)s] [%(filename)s:%(lineno)d] %(message)s".format(HOST),
- "datefmt": "%Y-%m-%d %H:%M:%S"
- }
日志例子
- [2019-11-18 08:47:49.424] [INFO] [cmdb-008069:5990:140482161399552] [cmdb:execute_global_worker] [standalone_scheduler.py:116] RUN_INFO: tiny_collector_ali starting at 2019-11-18 08:47:49, next run will be at approximately 2019-11-18 09:47:49
- [2019-11-18 08:11:27.715] [ERROR] [cmdb-008069:5985:140184204932928] [cmdb:common_handler] [error.py:48] 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
- Traceback (most recent call last):
- File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1805, in full_dispatch_request
- rv = self.dispatch_request()
- File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1783, in dispatch_request
- self.raise_routing_exception(req)
- File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1766, in raise_routing_exception
- raise request.routing_exception
- File "/home/server/venv3/lib/python3.6/site-packages/flask/ctx.py", line 336, in match_request
- self.url_adapter.match(return_rule=True)
- File "/home/server/venv3/lib/python3.6/site-packages/werkzeug/routing.py", line 1799, in match
- raise NotFound()
- werkzeug.exceptions.NotFound: 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
Es processing 解析
经过 kafka, 再有 filebeat 打到 ES, 需要删除多余的信息
- PUT _ingest/pipeline/info
- {
- "description" : "devops info pipeline",
- "processors" : [
- {
- "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]}
- },
- {
- "json": {
- "field": "message",
- "add_to_root": true
- }
- },
- {
- "remove": {"field": ["@metadata", "ecs", "agent", "input"]}
- },
- {
- "dissect" : {
- "field": "message",
- "pattern": "[%{@timestamp}] [%{level}] [%{host}:%{process_id}:%{thread_id}] [%{name}:%{func_name}] [%{file}:%{line_no}] %{content}"
- }
- }
- ],
- "on_failure": [
- {
- "set": {
- "field": "_index",
- "value": "failed-{{ _index }}"
- }
- }
- ]
- }
- Es mapping
thread_id 要给一个 long 字段, python 如果获取不到会给一个超出 integer 范围的数字
- PUT _template/info
- {
- "index_patterns": ["*info*"],
- "settings": {
- "number_of_shards": 1
- },
- "version": 1,
- "mappings": {
- "_source": {
- "enabled": true
- },
- "properties": {
- "@timestamp": {
- "type": "date",
- "format": "yyyy-MM-dd HH:mm:ss.SSS"
- },
- "level": {
- "type": "keyword"
- },
- "host": {
- "type": "keyword"
- },
- "process_id": {
- "type": "integer"
- },
- "thread_id": {
- "type": "long"
- },
- "name": {
- "type": "keyword"
- },
- "func_name": {
- "type": "keyword"
- },
- "file": {
- "type": "keyword"
- },
- "line_no": {
- "type": "integer"
- },
- "content": {
- "type": "text"
- }
- }
- }
- }
filebeat 采集到 Kafka 配置文件
这里采用 ^\[20\d{2} 来区分行首
- filebeat.inputs:
- - type: log
- paths:
- - /you_path/App.log
- multiline.pattern: '^\[20\d{2}'
- multiline.negate: true
- multiline.match: after
- tail_files: true
- queue.mem:
- events: 4096
- flush.min_events: 512
- flush.timeout: 5s
- output.kafka:
- hosts: ["kafka-01", "kafka-02", "kafka-03"]
- topic: 'devops_app'
- required_acks: 1
- compression: gzip
- max_message_bytes: 1000000
filebeat 从 kafka 消费配置文件
- filebeat.inputs:
- - type: kafka
- hosts: ["kafka-01", "kafka-02", "kafka-03"]
- topics: ["devops_app"]
- group_id: "filebeat_app"
- output.Elasticsearch:
- hosts: ["es_url"]
- pipeline: "info"
- index: "app-info-%{ yyyy.MM.dd}"
- setup.template.name: "info"
- setup.template.pattern: "app-info-*"
- setup.ilm.enabled: false
- setup.template.enabled: false
来源: https://www.cnblogs.com/ohbonsai/p/12092835.html