写在前面
从初次了解elastic产品到正式投入使用,拖拖拉拉的也有小半年了,刚接触看到一些帖子都是安装教程,后来看到一些都是深入教程,此篇文章比较居中一点,总结了我在踩的一些坑和记录一些周边插件的使用方式、方法,便于自己后续回顾,也希望能给新用户一些引导,少走一些弯路;核心其实是想表达一下对rockybean和KennyW的爱,这期间非常感谢两位的协助,在非工作日深夜排查问题多次,正文多处采用二位给予的讲解,万分感谢。
- Module (load="imudp")
- Module (load="omkafka")
- Input (type="imudp" port="514")
- Module (load="mmsequence")
- $MaxMessageSize 4k
- local5.none /var/log/messages
- local5.none @log.58dns.org:514
- set $!newmsg = replace($msg,'\\x','\\u00')
- template(name="kafka_topic" type="string" string="%programname%")
- template(name="kafka_msg" type="string" string="%!newmsg%")
- if ($syslogfacility-text == 'local5' and $syslogseverity-text == 'info') then{
- action(type="omkafka" topic="kafka_topic" partitions.auto="on"
- dynatopic="on" dynatopic.cachesize="1000"
- confParam=["compression.codec=snappy"]
- #kafka broker addr
- broker=["10.10.10.1:9092","10.10.10.2:9092",]
- template="kafka_msg"
- errorfile="/var/log/omkafka/log_kafka_failures.log")
- action(type="omkafka" topic="kafka_topic" partitions.auto="on"
- dynatopic="on" dynatopic.cachesize="1000"
- confParam=["compression.codec=snappy"]
- #kafka broker addr
- broker=["20.20.20.1:9092","20.20.20.2:9092",]
- template="kafka_msg"
- errorfile="/var/log/omkafka/log_kafka_failures.log")
- stop
- }
- log_format json_format '{"@timestamp":"$time_iso8601",'
- '"cookie_id":"$cookie_id",' #内部cookie_id
- '"client_ip":"$remote_addr",'
- '"remote_user":"$remote_user",'
- '"request_method":"$request_method",'
- '"domain":"$host",'
- '"user_agent":"$http_user_agent",'
- '"xff":"$http_x_forwarded_for",'
- '"upstream_addr":"$upstream_addr",'
- '"upstream_response_time":"$upstream_response_time",'
- '"request_time":"$request_time",'
- '"size":"$body_bytes_sent",'
- '"idc_tag":"tjtx",'
- '"cluster":"$host_pass",'
- '"status":"$status",'
- '"upstream_status":"$upstream_status",'
- '"host":"$hostname",'
- '"via":"$http_via",'
- '"protocol":"$scheme",'
- '"request_uri":"$request_uri",'
- '"http_referer":"$http_referer"}';
- access_log syslog:local5:info:127.0.0.1:514:nginx_aggregation_log json_format;
- #nginx_aggregation_log 这是自定义的Topic
1) UDP传输虽快,但是以太网(Ethernet)数据帧的长度必须在46-1500字节之间,UDP不能像TCP重组TCP包,去除IP和UDP的数据包,最终可使用只剩1472字节。如果传输大于这个长度的消息,并不会想UDP本身一样直接丢弃,只是会损坏源数据格式,截断超过限制字节以外的数据;
2) 对于Nginx日志来说,只要不保留POST数据,基本一条消息不会超过限制字节,我在NginxSyslog介绍中没看到支持TCP,用lua脚本实现的TCP方式传输,但是看了很多帖子都不建议在Nginx中用TCP日志传输。就是因为TCP传输可靠,但像网络抖动、传输异常,可能会不停的重试多次或等待,直接影响这条请求,也直接影响到了用户;
3) 消息超过了UDP传输限制怎么办,我这目前是保留一条消息的重要字段,如上述的json_format的格式,将 request_uri、http_referer等可能会较大的字段放到最后,如果真的发现消息不完整,直接丢弃http_referer,取request_uri问号前的内容;(在logstash或hangout中filters实现,具体配置详见下文Hangout-filters)
NginxSyslog模块介绍
注:
目前我们这kakfa集群是kafka_2.10-0.8.1.1版本,但是logstash5.x对kafka有版本要求>0.10版本。后来采用hangout,更换了几个jar,解决此问题
- inputs:
- - Kafka:
- topic:
- nginx_aggregation_log: 32
- codec:
- json
- consumer_settings:
- group.id: es-nginx-58_nginx_aggregation
- zookeeper.connect: "10.10.10.1:2181,10.10.10.2:2181"
- auto.commit.interval.ms: "20000"
- socket.receive.buffer.bytes: "1048576"
- fetch.message.max.bytes: "1048576"
- num.consumer.fetchers: "1"
- filters:
- - Filters:
- if:
- - '<#if message??>true</#if>'
- #如果不是完整的JSON,会出现message,则走此逻辑
- filters:
- - Grok:
- match:
- - '(?<msg>{"@timestamp":.*"request_uri":([^\?]+)\?)'
- #正则匹配@timestamp开始到request_uri后边的第一个?截止
- - Gsub:
- fields:
- msg: ['$','"}']
- #补全符号,完整新的JSON格式
- - Json:
- field: msg
- remove_fields: ['message']
- #干掉错误的数据
- - Convert:
- fields:
- request_time:
- to: float
- remove_if_fail: true
- upstream_response_time:
- to: float
- remove_if_fail: true
- size:
- to: integer
- remove_if_fail: true
- - GeoIP2:
- source: client_ip
- database: '/opt/soft/hangout/etc/other/GeoLite2-City.mmdb'
- - Json:
- field: geoip
- - Remove:
- fields:
- - msg
- - Add:
- fields:
- request_url: '<#assign a=request_uri?split("?")>${a[0]}'
- #request_uri这个term的cardinality很高,所以?前用于聚合,原有的用于搜索
- if:
- - '<#if request_uri??>true</#if>'
- outputs:
- - Elasticsearch:
- cluster: es-nginx
- timezone: "Asia/Shanghai"
- hosts: "10.10.10.1:9300,10.10.10.2:9300"
- index: 'hangout-nginx_aggregation_log-%{ YYYY.MM.dd}'
topic: nginx_aggregation_log: 32,这个32代表需要建立多少子线程去kafka读取数据,数量最好与Partition相等,如果少于Partition,会一个线程同时去2个Partition读取消息,若大于Partition则会有不工作的进程
- cluster.name: es-nginx
- node.name: 10.10.10.1
- #为后期冷热数据使用
- node.attr.rack_id: hdd
- path.data: /data
- path.logs: /opt/logs/elasticsearch/
- network.host: 0.0.0.0
- http.port: 9200
- #设置新节点被启动时能够发现的主节点列表
- discovery.zen.ping.unicast.hosts: ["10.10.10.1","10.10.10.2","10.10.10.3"]
- #防止脑裂(n/2+1)
- discovery.zen.minimum_master_nodes: 2
- node.master: true
- node.data: false
- {
- "template": "agg-nginx-*",
- "aliases": {
- "agg-nginx": {}
- },
- "settings": {
- "number_of_shards": 4,
- "number_of_replicas": 1,
- "index.routing.allocation.include.rack_id": "ssd"
- }
通过上述配置PUT到 _template/ur_name下在分片上的定义已经成功,但是像agg-nginx- 和test-agg-test- 这样的2个索引名字,即使你创建了另一个"template": "agg-nginx-test-*"的模板依旧都匹配第一个,当然换名字最简单,但是template的order的是专门解决这个问题的
默认创建"order": "0",值越高优先级越高,所以在想要先匹配的将order值调高即可
- ES的mapping非常类似于静态语言中的数据类型:声明一个变量为int类型的变量, 以后这个变量都只能存储int类型的数据。同样的, 一个number类型的mapping字段只能存储number类型的数据。
- 同语言的数据类型相比,mapping还有一些其他的含义,mapping不仅告诉ES一个field中是什么类型的值, 它还告诉ES如何索引数据以及数据是否能被搜索到
- 下列是一个删减版的mapping
- "mappings": {
- "ngx_log": {
- "_all": {
- "enabled": false
- },
- "properties": {
- "@timestamp": {
- "type": "date"
- },
- "client_ip": {
- "type": "ip"
- },
- "domain": {
- "type": "keyword"
- },
- "geoip": {
- "properties": {
- "city_name": {
- "type": "keyword"
- },
- "country_name": {
- "type": "keyword"
- },
- "latitude": {
- "type": "float"
- },
- "location": {
- "type": "geo_point"
- },
- "longitude": {
- "type": "float"
- },
- }
- },
- "request_time": {
- "type": "float"
- },
- "request_url": {
- "type": "keyword"
- },
- "status": {
- "type": "keyword"ype ": "keyword "
- },
- }
- }
- }"
- ![图片描述][15]上边这英文有点多,其实简单理解就是不分词,你就最好别用text了,而且Text类型也会相应的多占用空间,依照上述,数据主要是日志分析,每条数据的格式已经很明确,主要用于日志分析,所以不需要分词。像一些所有引擎的业务更适合需要分词;
- .es(index = index1, timefield = @timestamp).label('Today').title(QPS).color(#1E90FF),
- .es(offset = -24h, index = index2, timefield = @timestamp).label('Yesterday').lines(fill = 1, width = 0.5).color(gray)
来源: http://www.tuicool.com/articles/qy6FFrA