一、背景

测试阿里云ECS实例接口调用,参考文档:https://api.aliyun.com/document/Ecs/2014-05-26/overview

二、python脚本

import aliyunsdkcore.client
import aliyunsdkecs.request.v20140526.DescribeInstancesRequest
import json,requests
from datetime import datetime

# 阿里云 AccessKey ID 和 AccessKey Secret
access_key_id = 'xxxxxxxxxxxxxxxx'
access_secret = 'xxxxxxxxxxxxxxxx'

# 实例所在 region ID
region_id = 'cn-hangzhou'
# 设置小于10天到期实例告警
expire_day = 10
url = 'https://oapi.dingtalk.com/robot/send?access_token=0a2a20bebf254aa1871a3w0e54eeef635208e607e62419122341dd58dc5cd526'

def getData(text_content, isAtAll=False):
    '''返回钉钉发送消息格式'''
    text = {
        'msgtype': 'markdown',
        'markdown': {
            "title": "ECS实例到期检测",
            "text": f"# ECS到期时间小于%s天告警通知\n\n### alert时间:%s\n\n告警详情如下:\n\n%s"%(expire_day, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), text_content),
        },
        "at":{
            "isAtAll": isAtAll
        }
    }
    return text

def sendDing(url,data):
    '''钉钉发送'''
    headers = {'Content-Type':'application/json;charset=utf-8'}
    res = requests.post(url=url,data=json.dumps(data),headers=headers)
    if res.status_code != 200:
        now = datetime.now()
        time_str = now.strftime('%Y-%m-%d %H:%M:%S')
        print('%s 发送消息失败!'%time_str)
        return 2
    return 1

def get_ecs_instances(access_key_id, access_secret, region_id):
    # 创建 ECS API 客户端
    client = aliyunsdkcore.client.AcsClient(access_key_id, access_secret, region_id)

    # 查询ECS实例列表
    request = aliyunsdkecs.request.v20140526.DescribeInstancesRequest.DescribeInstancesRequest()
    response = client.do_action_with_exception(request)

    # 解析响应
    result = json.loads(response.decode('utf-8'))

    try:
        instances = []
        for instance in result["Instances"]["Instance"]:
            instances.append({
                "InstanceId": instance["InstanceId"],
                "InstanceName": instance["InstanceName"],
                "ExpiredTime": instance["ExpiredTime"],
                "PublicIpAddress": instance["PublicIpAddress"]["IpAddress"],
                "PrivateIpAddress": instance["VpcAttributes"]["PrivateIpAddress"]["IpAddress"],
                "InstanceType": instance["InstanceType"],
                "CreationTime": instance["CreationTime"]
            })
    except Exception as e:
        print(e)
    return instances

def get_ecs_expire_date(instances, days):
    expire_day_less_instance = []
    for instance in instances:
        expire_time = instance["ExpiredTime"]
        timeArray = datetime.strptime(expire_time, "%Y-%m-%dT%H:%MZ")
        available_days = (timeArray - datetime.now()).days

        if available_days < days:
            # print("aaa")
            expire_day_less_instance.append({
                "InstanceId": instance["InstanceId"],
                "InstanceName": instance["InstanceName"],
                "ExpiredTime": instance["ExpiredTime"],
                "PublicIpAddress": instance["PublicIpAddress"],
                "PrivateIpAddress": instance["PrivateIpAddress"],
                "InstanceType": instance["InstanceType"],
                "CreationTime": instance["CreationTime"],
                "AvailableDays": available_days
            })
    return expire_day_less_instance


if __name__ == '__main__':
    instances = get_ecs_instances(access_key_id, access_secret, region_id)
    print(instances)
    # 检测10天内到期实例
    instances_list = get_ecs_expire_date(instances, expire_day)
    # markdown表格编写
    table = "| <center><font color='#00A600'>ECS实例Id</font></center> | <center><font color='#00A600'>ECS名称</font></center> | <center><font color='#00A600'>ECS到期时间</font></center> |\n| ---- | --- | ----- |\n"
    for d in instances_list:
        table += f"| {d['InstanceId']} | {d['InstanceName']} | {d['ExpiredTime']} |\n"

    data = getData(table, isAtAll=False)
    sendDing(url,data)

最后修改日期: 2024年1月6日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。