过滤 Linux 系统登录日志 / var/log/secure
登陆成功
- Jan 6 17:11:47 localhost sshd[3324]: Received disconnect from 172.16.0.13: 11: disconnected by user
- Jan 6 17:11:47 localhost sshd[3324]: pam_unix(sshd:session): session closed for user root
- Jan 6 17:11:48 localhost sshd[3358]: Address 172.16.0.13 maps to localhost, but this does not map back to the address - POSSIBLE BREAK-IN ATTEMPT!
- Jan 6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
- Jan 6 17:11:51 localhost sshd[3358]: pam_unix(sshd:session): session opened for user root by (uid=0)
登陆失败
- Jan 6 17:13:10 localhost sshd[3380]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=SSH ruser= rhost=172.16.0.39 user=root
- Jan 6 17:13:12 localhost sshd[3380]: Failed password for root from 172.16.0.39 port 58481 ssh2
以上信息中我们只用判断登录成功或失败
Jan 6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
或者
- Jan 6 17:13:12 localhost sshd[3380]: Failed password for root from 172.16.0.39 port 58481 ssh2
- ---------------------
logstash 配置
- input {
- file {
- path => "/var/log/secure"
- }
- }
- filter {
- grok {
- match => {
- "message" => ".* sshd\[\d+\]: (?<status>\S+) .* (?<ClientIP>(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})?) .*"
- }
- overwrite => ["message"]
- }
- }
- output {
- if [ClientIP] =~ /\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/ and ([status] == "Accepted" or [status] == "Failed") {
- Elasticsearch {
- hosts => "172.16.11.199"
- index => "logstash-%{ YYYY.MM.dd}"
- }
- }
- }
配置解释:
input 插件使用 file 读取日志文件
filter 插件使用 grok 来匹配相应的日志行
message 中定义了两个 Fields, 分别匹配登录来源 IP, 和登录状态
overwrite 表示重写 message 行
output 插件指定将过滤出来的信息输出到哪个地方, 这里输出到 Elasticsearch
一个条件判断, 判断 filter 中定义的两个 fields 是否匹配, 如果匹配则输出到 Elasticsearch 中, 如果不匹配则不操作
正则解释
Jan 6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
.* 匹配 Jan 6 17:11:51 localhost
sshd[\d+]: 匹配 sshd[3358]: 段,\d + 匹配多个数字
(?<status>\S+):
1.(?<xxx > 正则表达式): 定义一个 xxx 字段匹配后面正则表达式, 类似{xxx: 匹配的结果}, 在上面 output 中的条件判断即可使用该字段来使用匹配到的结果
2.\S + 表示多个字符串, 也就是匹配 Accepted 或 Failed
(?<ClientIP>(?:\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})?)
1. 先定义一个 ClientIP 字段
2.(?:...)? 表示匹配一个 ip 但不保存供以后引用, 如果 (...) 则以后可以使用 $1 来调用匹配到的值, 最后一个? 表示非贪婪匹配, 尽可能少的匹配
最终输出结果:
- {
- "message" => "Mar 22 10:16:51 k8s-n2 sshd[27997]: Failed password for root from 10.201.1.10 port 39302 ssh2",
- "@version" => "1",
- "@timestamp" => "2019-03-22T02:16:51.813Z",
- "path" => "/var/log/secure",
- "host" => "k8s-n2",
- "status" => "Failed",
- "ClientIP" => "10.201.1.10"
- }
来源: http://www.bubuko.com/infodetail-2995845.html