一、安装filebeat
[root@test2-db tmp]# rpm -ivh filebeat-7.6.1-x86_64.rpm
二、配置filebeat
[root@test2-db filebeat]# vi filebeat.yml
filebeat.inputs:
- type: log
enabled: true
tags: mysql-slow-logs
fields:
log_source: mysql-slow-logs
addr: 192.168.0.134
paths:
- /data/mysql/log/slow.log
exclude_lines: ['^\# Time']
multiline.pattern: '^\# Time|^\# User'
multiline.negate: true
multiline.match: after
output.redis:
hosts: ["192.168.10.105:6380"]
db: 0
timeout: 5
password: "intel.com"
key: "default_list"
keys:
- key: "mysql-slow-logs"
when.equals:
fields.log_source: "mysql-slow-logs"
三、配置logstash
调试
[root@elk-logstash tmp]# vi mysql-test.conf
input {
redis {
data_type => "list"
host => "192.168.10.105"
port => 6380
db => 0
password =>"intel.com"
timeout => 5
key => "mysql-slow-logs"
}
}
filter {
if [fields][log_source] == "mysql-slow-logs" {
grok {
#无use有id
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+\[(?<clientip>[0-9.]+)\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+(?<clientip>.*)\[\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
#有use有id
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+\[(?<clientip>[0-9.]+)\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+(?<clientip>.*)\[\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
#数据库备份用户
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+(?<clientip>.*)\[\]\s+Id:\s%{NUMBER:id:int}\n# Query_time# Time:\s+(?<time>.*)" ]
timeout_millis => 10000
#remove_field => ["message"]
}
if !([tags] and "_grokparsefailure" in [tags]) {
mutate {
remove_field => ["message"]
}
}
date {
match => ["timestamp_mysql","UNIX"]
target => "@timestamp"
}
}
}
output {
if [fields][log_source] == "mysql-slow-logs" {
elasticsearch {
hosts => ["192.168.10.102:9200","192.168.10.103:9200","192.168.10.104:9200"]
timeout => 5
index => "mysql-slow-logs-%{+YYYY.MM.dd}"
}
}
stdout {
codec => rubydebug
}
}
[root@elk-logstash tmp]# /usr/share/logstash/bin/logstash -f mysql-test.conf
四、成功后添加到logstash的pipline中
[root@elk-logstash conf.d]# vi mysql-slow.conf
五、检索index
总结:mysql日志抓取只算是实现基础功能,再深入的话会将mysql的处理时间打印出来并进行汇总统计。
留言