一、背景
测试阿里云ECS实例接口调用,参考文档:https://api.aliyun.com/document/Ecs/2014-05-26/overview
二、python脚本
import aliyunsdkcore.client
import aliyunsdkecs.request.v20140526.DescribeInstancesRequest
import json,requests
from datetime import datetime
# 阿里云 AccessKey ID 和 AccessKey Secret
access_key_id = 'xxxxxxxxxxxxxxxx'
access_secret = 'xxxxxxxxxxxxxxxx'
# 实例所在 region ID
region_id = 'cn-hangzhou'
# 设置小于10天到期实例告警
expire_day = 10
url = 'https://oapi.dingtalk.com/robot/send?access_token=0a2a20bebf254aa1871a3w0e54eeef635208e607e62419122341dd58dc5cd526'
def getData(text_content, isAtAll=False):
'''返回钉钉发送消息格式'''
text = {
'msgtype': 'markdown',
'markdown': {
"title": "ECS实例到期检测",
"text": f"# ECS到期时间小于%s天告警通知\n\n### alert时间:%s\n\n告警详情如下:\n\n%s"%(expire_day, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), text_content),
},
"at":{
"isAtAll": isAtAll
}
}
return text
def sendDing(url,data):
'''钉钉发送'''
headers = {'Content-Type':'application/json;charset=utf-8'}
res = requests.post(url=url,data=json.dumps(data),headers=headers)
if res.status_code != 200:
now = datetime.now()
time_str = now.strftime('%Y-%m-%d %H:%M:%S')
print('%s 发送消息失败!'%time_str)
return 2
return 1
def get_ecs_instances(access_key_id, access_secret, region_id):
# 创建 ECS API 客户端
client = aliyunsdkcore.client.AcsClient(access_key_id, access_secret, region_id)
# 查询ECS实例列表
request = aliyunsdkecs.request.v20140526.DescribeInstancesRequest.DescribeInstancesRequest()
response = client.do_action_with_exception(request)
# 解析响应
result = json.loads(response.decode('utf-8'))
try:
instances = []
for instance in result["Instances"]["Instance"]:
instances.append({
"InstanceId": instance["InstanceId"],
"InstanceName": instance["InstanceName"],
"ExpiredTime": instance["ExpiredTime"],
"PublicIpAddress": instance["PublicIpAddress"]["IpAddress"],
"PrivateIpAddress": instance["VpcAttributes"]["PrivateIpAddress"]["IpAddress"],
"InstanceType": instance["InstanceType"],
"CreationTime": instance["CreationTime"]
})
except Exception as e:
print(e)
return instances
def get_ecs_expire_date(instances, days):
expire_day_less_instance = []
for instance in instances:
expire_time = instance["ExpiredTime"]
timeArray = datetime.strptime(expire_time, "%Y-%m-%dT%H:%MZ")
available_days = (timeArray - datetime.now()).days
if available_days < days:
# print("aaa")
expire_day_less_instance.append({
"InstanceId": instance["InstanceId"],
"InstanceName": instance["InstanceName"],
"ExpiredTime": instance["ExpiredTime"],
"PublicIpAddress": instance["PublicIpAddress"],
"PrivateIpAddress": instance["PrivateIpAddress"],
"InstanceType": instance["InstanceType"],
"CreationTime": instance["CreationTime"],
"AvailableDays": available_days
})
return expire_day_less_instance
if __name__ == '__main__':
instances = get_ecs_instances(access_key_id, access_secret, region_id)
print(instances)
# 检测10天内到期实例
instances_list = get_ecs_expire_date(instances, expire_day)
# markdown表格编写
table = "| <center><font color='#00A600'>ECS实例Id</font></center> | <center><font color='#00A600'>ECS名称</font></center> | <center><font color='#00A600'>ECS到期时间</font></center> |\n| ---- | --- | ----- |\n"
for d in instances_list:
table += f"| {d['InstanceId']} | {d['InstanceName']} | {d['ExpiredTime']} |\n"
data = getData(table, isAtAll=False)
sendDing(url,data)
留言