一、应用场景

有几个 cloudfront distribution 启用了 WAF,经常遇到扫描和攻击,请求数太多了。为了不产生天价账单,做了一个 Telegram 告警,有警报可以实时处理。

二、涉及服务

  1. AWS Cloudfront: CDN服务进行全球加速
  2. AWS WAF: 启用应用防护,过滤异常请求
  3. AWS Cloudwatch: 收集 Cloudfront/WAF 指标,创建告警触发 SNS
  4. AWS SNS: 发送通知,并触发 Lambda 执行
  5. AWS Lambda: 获取 CDN 的指标,并发送 Telegram 通知

三、准备工作

  1. WAF:设置启用防火墙的过滤规则,设置频率限制等,拦截异常请求
  2. Cloudwatch: 根据 WAF 规则创建警报,例如5分钟内如果有3000个异常请求就报警,并创建一个 SNS Topic
  3. AWS SNS: 无需特别设置,保持默认即可,也可以继续添加多个subscription订阅,实现多种告警方式
  4. AWS Lambbda: 编写代码从 Cloudwatch 中获取 Cloudfront 性能统计指标,并发送给 Telegram 机器人
  5. Telegram: 申请一个新的机器人并拿到 token , 难道需要接受告警的 user id(有专门的机器人可以获得)

四、部署告警服务

4.1 创建 Lambda 函数

先创建一个 lambda function,Runtime 选择 python 3.10.其他使用默认设置即可。
代码如下:

import os
import json
import urllib3
import boto3
from datetime import datetime, timedelta

def lambda_handler(event, context):
    # 从环境变量中获取 Telegram Bot Token 和聊天 ID

    bot_token = os.environ['BOT_TOKEN']
    user_will_chat_id = os.environ['CHAT_ID']

    cloudfront_client = boto3.client('cloudfront')
    cloudwatch_client = boto3.client('cloudwatch')

    # 获取所有 CloudFront distributions
    distributions = cloudfront_client.list_distributions()["DistributionList"]["Items"]

    # 获取每个 Distribution 的请求数和错误率
    distribution_metrics = []
    for distribution in distributions:
        distribution_id = distribution["Id"]
        distribution_description = distribution['Comment']
        
        requests = get_metrics(cloudwatch_client, distribution_id, 'Requests', ['Sum'])
        error_rate = get_metrics(cloudwatch_client, distribution_id, 'TotalErrorRate', ['Average'])

        # 将结果添加到列表中
        distribution_metrics.append({
            'DistributionName': distribution_description,
            'TotalRequests': requests,
            'TotalErrorRate': error_rate
        })
    #按TotalRequests和TotalErrorRate进行排序
    sorted_request_metrics = sorted(distribution_metrics, key=lambda x: x['TotalRequests'] if x['TotalRequests'] is not None else 0, reverse=True)
    sorted_rate_metrics = sorted(distribution_metrics, key=lambda x: x['TotalErrorRate'] if x['TotalErrorRate'] is not None else 0, reverse=True)

    #获取请求数前3和错误率前3
    top3_request_metrics = sorted_request_metrics[:3]
    top3_rate_metrics = sorted_rate_metrics[:3]
    
    # 发送telegram bot的消息
    msg = f"按请求数(1h)排序前三:\n"
    for i, metric in enumerate(top3_request_metrics):
        msg += f"{i+1}. {metric['DistributionName']} 请求数:{metric['TotalRequests']} 错误率:{round(metric['TotalErrorRate'], 2)}  \n"
    msg += "\n按错误率(1h)排序前三:\n"
    for i, metric in enumerate(top3_rate_metrics):
        msg += f"{i+1}. {metric['DistributionName']}  错误率:{round(metric['TotalErrorRate'], 2)}  请求数:{metric['TotalRequests']} \n"

    # 发送警报通知
    send_telegram_message(bot_token,user_will_chat_id, "请注意CDN有攻击!!!")
    send_telegram_message(bot_token,user_will_chat_id, msg)

def send_telegram_message(bot_token, chat_id, message):
    bot_api_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    http = urllib3.PoolManager()

    # 准备请求数据
    data = {
        'chat_id': chat_id,
        'text': message
    }

    # 发送 POST 请求到 Telegram 机器人 API
    msg_response = http.request('POST', bot_api_url, fields=data)
    
    # 根据响应内容判断是否成功发送消息
    if json.loads(msg_response.data.decode('utf-8'))['ok']:
        print(f"Message sent to chat {chat_id}: {message}")
    else:
        print(f"Failed to send message to chat {chat_id}")

        
def get_metrics(cloudwatch_client, distribution_id, metric_name, statistics):
    #获取当前时间和一小时前时间
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=1)

    #获取指定的metric
    response = cloudwatch_client.get_metric_statistics(
        Namespace='AWS/CloudFront',
        MetricName=metric_name,
        Dimensions=[
            {'Name': 'Region', 'Value': 'Global'},
            {'Name': 'DistributionId', 'Value': distribution_id}
        ],
        StartTime=start_time,
        EndTime=end_time,
        Period=3600,
        Statistics=statistics
    )

    # 提取并打印请求数统计数据
    datapoints = response['Datapoints']
    for datapoint in datapoints:
        value = datapoint[statistics[0]]
        return value

以上代码会读取所有 Cloudfront distribution , 并从 Cloudwatch 中获取请求数和总的错误率,并取前三,发送给 Telegram 机器人。

4.2 设置 Lambda 函数

4.2.1 设置 Lambda 函数的权限

找到默认的 Lambda role:

Lambda function --> Configuration --> Permissions -->Execution role --> Role name

在 IAM 中打开 Role name,选择 add permission ,添加 CloudFrontReadOnlyAccess 和 CloudWatchReadOnlyAccess 两个权限,保存即可

4.2.2 设置 Lambda 函数的Timeout

由于 Cloudfront 的 distribution 比较多,获取指标比较慢,Lambda 默认 3s 的超时时间比较短,需要增加到至少1分钟.

Lambda function --> Configuration --> General configuration --> Timeout

4.2.3 设置 Lambda 函数的环境变量

需要在环境变量中设置 Telegram 机器人的 token 和用户 id :

Lambda function --> Configuration -->Environment variables 

添加两个环境变量:BOT_TOKEN 和 CHAT_ID

五、 测试告警

设置好以上后,在 SNS 点击对应的 Topic ,然后选择 Publish message ,在 Message body 中随便输入点内容,点击 Publish message 即可,Telegram 就会收到通知了。

telegram-alert.png