一、安装filebeat

[root@test2-db tmp]# rpm -ivh filebeat-7.6.1-x86_64.rpm

二、配置filebeat

[root@test2-db filebeat]# vi filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  tags: mysql-slow-logs
  fields:
    log_source: mysql-slow-logs
    addr: 192.168.0.134
  paths:
    - /data/mysql/log/slow.log

  exclude_lines: ['^\# Time']
  multiline.pattern: '^\# Time|^\# User'
  multiline.negate: true
  multiline.match: after

output.redis:
  hosts: ["192.168.10.105:6380"]
  db: 0
  timeout: 5
  password: "intel.com"
  key: "default_list"
  keys:
    - key: "mysql-slow-logs"
      when.equals:
        fields.log_source: "mysql-slow-logs"

三、配置logstash

调试
[root@elk-logstash tmp]# vi mysql-test.conf
input {
  redis {
    data_type => "list"
    host => "192.168.10.105"
    port => 6380
    db => 0
    password =>"intel.com"
    timeout => 5
    key => "mysql-slow-logs"
  }
}

filter {

    if [fields][log_source] == "mysql-slow-logs" {
      grok {
          #无use有id
          match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+\[(?<clientip>[0-9.]+)\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
          match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+(?<clientip>.*)\[\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
          #有use有id
          match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+\[(?<clientip>[0-9.]+)\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
          match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+(?<clientip>.*)\[\]\s+Id:\s%{NUMBER:id:int}\n# Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s%{NUMBER:lock_time:float}\s+Rows_sent:\s%{NUMBER:rows_sent:int}\s+Rows_examined:\s%{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int};\n(?<query>.*)" ]
          #数据库备份用户
          match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\]\s+@\s+(?<clientip>.*)\[\]\s+Id:\s%{NUMBER:id:int}\n# Query_time# Time:\s+(?<time>.*)" ]

          timeout_millis => 10000
          #remove_field => ["message"]
      }

      if !([tags] and "_grokparsefailure" in [tags]) {
          mutate {
              remove_field => ["message"]
          }
      }

      date {
        match => ["timestamp_mysql","UNIX"]
        target => "@timestamp"
      }
    }
}

output {
    if [fields][log_source] == "mysql-slow-logs" {
        elasticsearch {
            hosts => ["192.168.10.102:9200","192.168.10.103:9200","192.168.10.104:9200"]
            timeout => 5
            index => "mysql-slow-logs-%{+YYYY.MM.dd}"
        }
    }
      stdout {
         codec => rubydebug
      }
}

[root@elk-logstash tmp]# /usr/share/logstash/bin/logstash -f mysql-test.conf

四、成功后添加到logstash的pipline中

[root@elk-logstash conf.d]# vi mysql-slow.conf

五、检索index

总结:mysql日志抓取只算是实现基础功能,再深入的话会将mysql的处理时间打印出来并进行汇总统计。

最后修改日期: 2023年12月16日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。