一、背景
rds产生的慢日志需要推送至elk平台统一管理,参考文档:https://github.com/jrbeilke/logstash-lambda
二、配置rds慢日志功能
参考文档:https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/UserGuide/USER_LogAccess.MySQL.Generallog.html
备注:同时要在控制台配置数据导入中勾选慢日志
三、配置订阅筛选器
四、配置lambda函数
环境–python2.7
from __future__ import print_function
import base64
import json
import urllib
import boto3
import socket
import ssl
import re
import zlib
import gzip
# Parameters
host = "blog.g6p.cn"
metadata = {
"your_metafields": {
"backend": "python"
},
"some_field": "change_me"
}
# Constants
raw_port = 30464
# SSL security
# while creating the lambda function
enable_security = False
ssl_port = 10515
def lambda_handler(event, context):
# data = gzip.decompress(base64.b64decode(event["awslogs"]["data"]))
# logs = json.loads(data)
# print(f'logs: {logs}')
# print(f'Logging Event: {event}')
# print(f"Awslog: {event['awslogs']}")
# cw_data = event['awslogs']['data']
# print(f'data: {cw_data}')
# print(f'type: {type(cw_data)}')
# Check prerequisites
if host == "<your_logstash_hostname>" or host == "":
raise Exception(
"You must configure your Logstash hostname before starting this lambda function (see #Parameters section)")
# Attach Logstash TCP Socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = raw_port
if enable_security:
s = ssl.wrap_socket(s)
port = ssl_port
s.connect((host, port))
# Add the context to meta
metadata["aws"] = {}
metadata["aws"]["function_name"] = context.function_name
metadata["aws"]["function_version"] = context.function_version
metadata["aws"]["invoked_function_arn"] = context.invoked_function_arn
metadata["aws"]["memory_limit_in_mb"] = context.memory_limit_in_mb
try:
# Route to the corresponding parser
event_type = parse_event_type(event)
if event_type == "s3":
logs = s3_handler(s, event)
elif event_type == "awslogs":
logs = awslogs_handler(s, event)
for log in logs:
print('type:>>>',type(log))
send_entry(s, log)
except Exception as e:
# Logs through the socket the error
err_message = 'Error parsing the object. Exception: {}'.format(str(e))
send_entry(s, err_message)
raise e
finally:
s.close()
# Utility functions
def parse_event_type(event):
if "Records" in event and len(event["Records"]) > 0:
if "s3" in event["Records"][0]:
return "s3"
elif "awslogs" in event:
return "awslogs"
raise Exception("Event type not supported (see #Event supported section)")
# Handle S3 events
def s3_handler(s, event):
s3 = boto3.client('s3')
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
# Extract the S3 object
response = s3.get_object(Bucket=bucket, Key=key)
body = response['Body']
data = body.read()
structured_logs = []
# If the name has a .gz extension, then decompress the data
if key[-3:] == '.gz':
data = zlib.decompress(data, 16 + zlib.MAX_WBITS)
if is_cloudtrail(str(key)) is True:
cloud_trail = json.loads(data)
for event in cloud_trail['Records']:
# Create structured object
structured_line = merge_dicts(event, {"aws": {"s3": {"bucket": bucket, "key": key}}})
structured_logs.append(structured_line)
else:
# Send lines to Logstash
for line in data.splitlines():
# Create structured object
structured_line = {"aws": {"s3": {"bucket": bucket, "key": key}}, "message": line}
structured_logs.append(structured_line)
return structured_logs
# Handle CloudWatch events and logs
def awslogs_handler(s, event):
# Get logs
data = zlib.decompress(base64.b64decode(event["awslogs"]["data"]), 16 + zlib.MAX_WBITS)
logs = json.loads(str(data))
structured_logs = []
# Send lines to Logstash
for log in logs["logEvents"]:
# Create structured object and send it
structured_line = merge_dicts(log, {
"aws": {
"awslogs": {
"logGroup": logs["logGroup"],
"logStream": logs["logStream"],
"owner": logs["owner"]
}
}
})
structured_logs.append(structured_line)
print('structured_logs:>>>',structured_logs)
return structured_logs
def send_entry(s, log_entry):
# The log_entry can only be a string or a dict
if isinstance(log_entry, str):
log_entry = {"message": log_entry}
elif not isinstance(log_entry, dict):
raise Exception(
"Cannot send the entry as it must be either a string or a dict. Provided entry: " + str(log_entry))
# Merge with metadata
log_entry = merge_dicts(log_entry, metadata)
# Send to Logstash
str_entry = json.dumps(log_entry)
s.send((str_entry + "\n").encode("UTF-8"))
def merge_dicts(a, b, path=None):
if path is None: path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge_dicts(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
pass # same leaf value
else:
raise Exception(
'Conflict while merging metadatas and the log entry at %s' % '.'.join(path + [str(key)]))
else:
a[key] = b[key]
return a
def is_cloudtrail(key):
regex = re.compile('\d+_CloudTrail_\w{2}-\w{4,9}-[12]_\d{8}T\d{4}Z.+.json.gz$', re.I)
match = regex.search(key)
return bool(match)
备注:host为logstash的主机地址,logstash输入插件配置如下:
五、验证
六、写在最后
- aws的很多组件都可以通过lambda函数实现自定义,优点是接入功能强大,缺点则是需要运维或者开发去熟悉其events事件格式。对比阿里云少了一站式服务。
- 目前现有环境分自建mysql和aws rds两类,考虑到统一性所以将慢日志均推送至elk中(至于要使用elk平台则是考虑到减少手工操作,平台化标准化及未来自动化)。
- 近期在有在研究elk相关知识点,有兴趣的朋友可以交流下心得。
留言