一、背景

rds产生的慢日志需要推送至elk平台统一管理,参考文档:https://github.com/jrbeilke/logstash-lambda

二、配置rds慢日志功能

参考文档:https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/UserGuide/USER_LogAccess.MySQL.Generallog.html

备注:同时要在控制台配置数据导入中勾选慢日志

三、配置订阅筛选器

四、配置lambda函数

环境–python2.7

from __future__ import print_function

import base64
import json
import urllib
import boto3
import socket
import ssl
import re
import zlib
import gzip

# Parameters
host = "blog.g6p.cn"
metadata = {
    "your_metafields": {
        "backend": "python"
    },
    "some_field": "change_me"
}

# Constants
raw_port = 30464

# SSL security
# while creating the lambda function
enable_security = False
ssl_port = 10515


def lambda_handler(event, context):
    # data = gzip.decompress(base64.b64decode(event["awslogs"]["data"]))
    # logs = json.loads(data)
    # print(f'logs: {logs}')
    # print(f'Logging Event: {event}')
    # print(f"Awslog: {event['awslogs']}")
    # cw_data = event['awslogs']['data']
    # print(f'data: {cw_data}')
    # print(f'type: {type(cw_data)}')

    # Check prerequisites
    if host == "<your_logstash_hostname>" or host == "":
        raise Exception(
                "You must configure your Logstash hostname before starting this lambda function (see #Parameters section)")

    # Attach Logstash TCP Socket
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    port = raw_port
    if enable_security:
        s = ssl.wrap_socket(s)
        port = ssl_port

    s.connect((host, port))

    # Add the context to meta
    metadata["aws"] = {}
    metadata["aws"]["function_name"] = context.function_name
    metadata["aws"]["function_version"] = context.function_version
    metadata["aws"]["invoked_function_arn"] = context.invoked_function_arn
    metadata["aws"]["memory_limit_in_mb"] = context.memory_limit_in_mb

    try:

        # Route to the corresponding parser
        event_type = parse_event_type(event)

        if event_type == "s3":
            logs = s3_handler(s, event)

        elif event_type == "awslogs":
            logs = awslogs_handler(s, event)

        for log in logs:
            print('type:>>>',type(log))
            send_entry(s, log)

    except Exception as e:
        # Logs through the socket the error
        err_message = 'Error parsing the object. Exception: {}'.format(str(e))
        send_entry(s, err_message)
        raise e
    finally:
        s.close()


# Utility functions

def parse_event_type(event):
    if "Records" in event and len(event["Records"]) > 0:
        if "s3" in event["Records"][0]:
            return "s3"

    elif "awslogs" in event:
        return "awslogs"

    raise Exception("Event type not supported (see #Event supported section)")


# Handle S3 events
def s3_handler(s, event):
    s3 = boto3.client('s3')

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')

    # Extract the S3 object
    response = s3.get_object(Bucket=bucket, Key=key)
    body = response['Body']
    data = body.read()

    structured_logs = []

    # If the name has a .gz extension, then decompress the data
    if key[-3:] == '.gz':
        data = zlib.decompress(data, 16 + zlib.MAX_WBITS)

    if is_cloudtrail(str(key)) is True:
        cloud_trail = json.loads(data)
        for event in cloud_trail['Records']:
            # Create structured object
            structured_line = merge_dicts(event, {"aws": {"s3": {"bucket": bucket, "key": key}}})
            structured_logs.append(structured_line)
    else:
        # Send lines to Logstash
        for line in data.splitlines():
            # Create structured object
            structured_line = {"aws": {"s3": {"bucket": bucket, "key": key}}, "message": line}
            structured_logs.append(structured_line)

    return structured_logs


# Handle CloudWatch events and logs
def awslogs_handler(s, event):
    # Get logs
    data = zlib.decompress(base64.b64decode(event["awslogs"]["data"]), 16 + zlib.MAX_WBITS)
    logs = json.loads(str(data))

    structured_logs = []

    # Send lines to Logstash
    for log in logs["logEvents"]:
        # Create structured object and send it
        structured_line = merge_dicts(log, {
            "aws": {
                "awslogs": {
                    "logGroup": logs["logGroup"],
                    "logStream": logs["logStream"],
                    "owner": logs["owner"]
                }
            }
        })
        structured_logs.append(structured_line)

    print('structured_logs:>>>',structured_logs)

    return structured_logs


def send_entry(s, log_entry):
    # The log_entry can only be a string or a dict
    if isinstance(log_entry, str):
        log_entry = {"message": log_entry}
    elif not isinstance(log_entry, dict):
        raise Exception(
                "Cannot send the entry as it must be either a string or a dict. Provided entry: " + str(log_entry))

    # Merge with metadata
    log_entry = merge_dicts(log_entry, metadata)

    # Send to Logstash
    str_entry = json.dumps(log_entry)
    s.send((str_entry + "\n").encode("UTF-8"))


def merge_dicts(a, b, path=None):
    if path is None: path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge_dicts(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass  # same leaf value
            else:
                raise Exception(
                        'Conflict while merging metadatas and the log entry at %s' % '.'.join(path + [str(key)]))
        else:
            a[key] = b[key]
    return a


def is_cloudtrail(key):
    regex = re.compile('\d+_CloudTrail_\w{2}-\w{4,9}-[12]_\d{8}T\d{4}Z.+.json.gz$', re.I)
    match = regex.search(key)
    return bool(match)

备注:host为logstash的主机地址,logstash输入插件配置如下:

五、验证

六、写在最后

  • aws的很多组件都可以通过lambda函数实现自定义,优点是接入功能强大,缺点则是需要运维或者开发去熟悉其events事件格式。对比阿里云少了一站式服务。
  • 目前现有环境分自建mysql和aws rds两类,考虑到统一性所以将慢日志均推送至elk中(至于要使用elk平台则是考虑到减少手工操作,平台化标准化及未来自动化)。
  • 近期在有在研究elk相关知识点,有兴趣的朋友可以交流下心得。
最后修改日期: 2023年12月13日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。