之前写了一篇 借助 CloudWatch 和 WAF 缓解 DDOS 攻击,经过实践发现了一些问题:

  1. 在受到攻击时 Cloudwatch 产生存储和查询的费用都高
  2. 获取详细攻击信息使用了较多的 log 查询,查询次数多、时间长
  3. 通过 Cloudwatch 中日志统计攻击 IP ,获取 IP 比较慢,且不够准确

发生攻击时产生的请求已经非常多了,将请求转换成日志在分析,不是经济和有效率的做法。查了一下 boto3 的文档,发现 Waf 已经提供了一个获取被频率限制 ip 的 api : get_rate_based_statement_managed_keys
而且通过分析 sampled request : get_sampled_requests 可以获取攻击的域名。既然可以直接获取攻击 IP 和域名,Cloudwatch 的日志存储和分析的费用就可以省下了。

更新了一下 Lambda 代码,如下:

import boto3
from collections import Counter
from datetime import datetime, timedelta

def lambda_handler(event, context):

    acl_name = 'webapi'
    acl_id = 'ada04a19-0fc9-5abd-c497-6646b2369d8e'
    arn = 'arn:aws:wafv2:us-east-1:849873481391:global/webacl/webapi/eda04a18-9fc8-4abc-b497-5646b2369d8g'
    rule_names = ['WAF-RateLimit-URI','WAF-RateLimit-All']  
    ip_set_name = 'Global_blacklist-auto'
    
    # 检查 CloudWatch Alarm 状态
    print(f"Getting CloudWatch Alarm state...")
    cloudwatch_client = boto3.client('cloudwatch')
    alarm_name = 'WAF-ALL-BLOCKED-Req-5mins'
    
    response = cloudwatch_client.describe_alarms(
        AlarmNames=[alarm_name]
    )
    
    alarm_state = response['MetricAlarms'][0]['StateValue']
    print(f"CloudWatch Alarm state: {alarm_state}")
    
    if alarm_state == 'OK':
        print("CloudWatch Alarm is in OK state. Exiting Lambda function.")
        return
    
    merged_ip_addresses = []
    wafv2_client = boto3.client('wafv2')

    # 从sampled requests获取被攻击域名,并告警
    host_counter = Counter()
    print("Step 1: Getting all rate limited rule logs from WAF...")
    for rule in rule_names:
        response = wafv2_client.get_sampled_requests(
            WebAclArn=arn,
            RuleMetricName=rule,
            Scope='CLOUDFRONT',
            TimeWindow={
                'StartTime': (datetime.utcnow() - timedelta(hours=3)).strftime("%Y-%m-%dT%H:%MZ") ,
                'EndTime': datetime.utcnow().strftime("%Y-%m-%dT%H:%MZ")
            },
            MaxItems=500  
        )
        
        for request in response['SampledRequests']:
            headers = {header['Name']: header['Value'] for header in request['Request']['Headers']}
            if 'Host' in headers:
                host_counter[headers['Host']] += 1
    
    most_common_host = host_counter.most_common(1)
    print(f'Most common host: {most_common_host[0][0]} with {most_common_host[0][1]} requests')

    print("Step 2: Getting all rate limited IPs from WAF...")
    for rule_name in rule_names:
        response = wafv2_client.get_rate_based_statement_managed_keys(
            Scope='CLOUDFRONT',
            WebACLName=acl_name,
            WebACLId=acl_id,
            RuleName=rule_name
        )
        
        managed_keys_ipv4 = response['ManagedKeysIPV4']['Addresses']
        print(f"Rule: {rule_name} has:  {len(managed_keys_ipv4)} blocked IPs")
        print(managed_keys_ipv4)
        merged_ip_addresses.extend(managed_keys_ipv4)
    
    print(f"Total Rate limited ip:  {len(merged_ip_addresses)}")
    print(merged_ip_addresses)
    
    print("Step 3: Getting existing IPs from ip set...")
    response = wafv2_client.list_ip_sets(
        Scope='CLOUDFRONT'
    )
    ip_sets = response['IPSets']
    
    ip_set_id = None
    lock_token = None

    for ip_set in ip_sets:
        if ip_set['Name'] == ip_set_name:
            ip_set_id = ip_set['Id']
            lock_token = ip_set['LockToken']
            break
    
    # 获取现有 IP 集的 IP 地址
    response = wafv2_client.get_ip_set(
        Name=ip_set_name,
        Id=ip_set_id,
        Scope='CLOUDFRONT'
    )
    existing_ip_addresses = response['IPSet']['Addresses']
    
    print(f"Total existing IPs: {len(existing_ip_addresses)}")
    print(existing_ip_addresses)
    
    # 将 merged_ip_addresses 合并到现有 IP 地址中
    merged_ip_addresses = list(set(existing_ip_addresses + merged_ip_addresses))
    print(f"Merged IP addresses: {merged_ip_addresses}")
    
    cidr_addresses = []
    for ip_address in merged_ip_addresses:
        ip, _ = ip_address.split('/')
        cidr_address = '.'.join(ip.split('.')[:-1]) + '.0/24'
        cidr_addresses.append(cidr_address)
    # 对CIDR地址进行去重
    unique_cidr_addresses = list(set(cidr_addresses))

    print(f"Block /24 ip range : {len(unique_cidr_addresses)}")
    print(unique_cidr_addresses)
    print(f"Updating IP Set {ip_set_name} in CloudFront WAF...")
    response = wafv2_client.update_ip_set(
        Name=ip_set_name,
        Scope='CLOUDFRONT',
        Id=ip_set_id,
        LockToken=lock_token,
        Description='IP blacklist for Global WAF add by Cron',
        Addresses=unique_cidr_addresses
    )
    print(f"Total blocked IP ranges: {len(unique_cidr_addresses)}")

其他的配置同上一篇文章,主要是 IAM role /超时时间/ EventBridge 定时执行 这三点设置。这个 Lambda 实现了以下功能:

  1. 检测 Cloudwatch 中的 Alarm 的状态,In-Alarm 才会执行拉黑
  2. 统计被攻击的域名,可以进行自定义告警,代码中只是打印到日志
  3. Cloudfront 只开启了 IPv4, 所以只获取 WAF 中被拦截的 IPv4 地址
  4. 获取 IP 地址后去重,并拦截整个C段 /24,减少 WAF IP Set 中的规则数量
  5. 每次执行先获取已经存在的 IP Set ,在加入新的攻击 IP 段

这里发现了 boto3 waf api 的一个坑,文档里面说 rate limit 规则可以在 rule group 中,查询时只需要带上 RuleGroupRuleName 参数即可,但是实际上是查询会抛异常 WAFNonexistentItemException ,所以不要将 rate limit 相关的规则放入 rule group 中。