一、背景

  • 对网站应用层的探测一般有两种模式,即白盒探测、黑盒探测。白盒探测更多的场景为对应用的性能指标的输出(这里可以参考prometheus的exporter),而黑盒探测更多的为一个外部探测器对站点是否正常的探测。prometheus黑盒探测参考地址:https://github.com/prometheus/blackbox_exporter

  • 目前主要使用的探测端为阿里云站点监控。该监控能很好的自定义探测点进行站点探测。但如果在海外有多个数据中心的站点需要实时探测,这种情况下该阿里监控似乎是很难解决全方位覆盖。这里采用自定义脚本对网站进行探测。

二、脚本

#!/user/bin/env python3
# -*- coding: utf-8 -*-
# yousong.xiang
# 2020.8.31
# v1.0.1
# 检测url是否正常

import subprocess
import threading
import os
import logging
import json
import requests
import sys

check_alive_list = []
check_unreacheable_list = []
log_dir = '/tmp'
file_log = 'check_http_api.log'
file_info = 'domainInfo'

def gaojing(data):
    # 将消息提交给钉钉机器人
    headers = {'Content-Type': 'application/json;charset=utf-8'}
    # 注意替换钉钉群的机器人webhook
    webhook = "https://oapi.dingtalk.com/robot/send?access_token=90fea408c219b11ba93ae518ad38460074077737992144dcfb86b65f08093b961"
    requests.post(url=webhook,data=json.dumps(data),headers=headers)

def get_data(text_content):
    # 返回钉钉机器人所需的文本格式
    text = {
        "msgtype": "text",
        "text": {
            "content": text_content
        },
    }
    # print(json.dumps(text))
    return text

def http_list_get(file_info):
    """读取监控信息文件,这里不做兼容处理,信息文件必须由值且按照规范格式"""
    current_filename = os.path.dirname(sys.argv[0])

    with open(os.path.join(current_filename,file_info),'r',encoding='utf-8') as f1:
        http_list = eval(f1.read())

    return http_list

def check_api(http_list,logger1,logger2):
  """ curl请求 """
  try:
    while http_list:
      http_json = http_list.pop()
      if http_json["uri"] == "/":
        http_url = '{}://{}'.format(http_json["scheme"],http_json["ip"])
      else:
        http_url = '{}://{}/{}'.format(http_json["scheme"], http_json["ip"],http_json["uri"])
      result_code = subprocess.call('/bin/curl --connect-timeout 10 -k -v -I {} -H "Host:{}"'.format(http_url,http_json["domain"]), shell=True)
      if result_code:
        check_unreacheable_list.append(http_json)
        logger2.error('{} ip:{}异常'.format(http_url,http_json["ip"]))
      else:
        check_alive_list.append(http_json)
        logger1.info('{} ip:{}正常'.format(http_url, http_json["ip"]))
  except Exception as f:
    pass

def main():
  res_log = os.path.join(log_dir, file_log)
  logging.basicConfig(level=logging.INFO,
                      format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                      datefmt='%Y-%m-%d %H:%M:%S',
                      filename=res_log,
                      filemode='a+')
  logger1 = logging.getLogger('myapp.area1')
  logger2 = logging.getLogger('myapp.area2')

  threads = []
  http_list = http_list_get(file_info)
  for i in range(1, 10):
    thr = threading.Thread(target=check_api, args=(http_list,logger1, logger2))
    thr.start()
    threads.append(thr)
  for thr in threads:
    thr.join()

  while check_unreacheable_list:
    http_info = check_unreacheable_list.pop()
    if http_info["uri"] == "/":
      http_url = '{}://{}'.format(http_info["scheme"], http_info["domain"])
    else:
      http_url = '{}://{}/{}'.format(http_info["scheme"], http_info["domain"], http_info["uri"])
    text_content = 'alert>{} ip:{}异常 站点黑盒探测'.format(http_url,http_info["ip"])
    data = get_data(text_content)
    gaojing(data)
    exit(4)

if __name__ == '__main__':
  main()

三、脚本描述

  • 自定义字典格式的站点信息并对其进行周期性探测
  • 采用多线程方式并发探测网站
  • 对于探测结果进行日志打印及钉钉告警
  • 文件domainInfo需要预先定义好参数,参考文档:https://github.com/xiangys0134/deploy/blob/master/%E7%BD%91%E7%BB%9C%E7%AE%A1%E7%90%86/domainInfo

四、写在最后

  • 对于黑盒探测的结果效果明显,但太过于手工。而定义一个黑盒探测可以说效率比较低。
  • 黑盒探测更多发现的是个体问题。
  • 如果是大规模的应用(假设站点高可用性极强,极少宕机的可能性),应以整体应用的探测为原则进行设计监控,例如某数据中心某时段可用性及成功返回百分比等等。
  • 近期在搞prometheus监控,有兴趣的朋友可以交流下心得。
最后修改日期: 2021年9月15日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。