通过Boto3 API从WAF中获取攻击IP
之前写了一篇 借助 CloudWatch 和 WAF 缓解 DDOS 攻击,经过实践发现了一些问题:
- 在受到攻击时 Cloudwatch 产生存储和查询的费用都高
- 获取详细攻击信息使用了较多的 log 查询,查询次数多、时间长
- 通过 Cloudwatch 中日志统计攻击 IP ,获取 IP 比较慢,且不够准确
发生攻击时产生的请求已经非常多了,将请求转换成日志在分析,不是经济和有效率的做法。查了一下 boto3 的文档,发现 Waf 已经提供了一个获取被频率限制 ip 的 api : get_rate_based_statement_managed_keys
而且通过分析 sampled request : get_sampled_requests 可以获取攻击的域名。既然可以直接获取攻击 IP 和域名,Cloudwatch 的日志存储和分析的费用就可以省下了。
更新了一下 Lambda 代码,如下:
import boto3
from collections import Counter
from datetime import datetime, timedelta
def lambda_handler(event, context):
acl_name = 'webapi'
acl_id = 'ada04a19-0fc9-5abd-c497-6646b2369d8e'
arn = 'arn:aws:wafv2:us-east-1:849873481391:global/webacl/webapi/eda04a18-9fc8-4abc-b497-5646b2369d8g'
rule_names = ['WAF-RateLimit-URI','WAF-RateLimit-All']
ip_set_name = 'Global_blacklist-auto'
# 检查 CloudWatch Alarm 状态
print(f"Getting CloudWatch Alarm state...")
cloudwatch_client = boto3.client('cloudwatch')
alarm_name = 'WAF-ALL-BLOCKED-Req-5mins'
response = cloudwatch_client.describe_alarms(
AlarmNames=[alarm_name]
)
alarm_state = response['MetricAlarms'][0]['StateValue']
print(f"CloudWatch Alarm state: {alarm_state}")
if alarm_state == 'OK':
print("CloudWatch Alarm is in OK state. Exiting Lambda function.")
return
merged_ip_addresses = []
wafv2_client = boto3.client('wafv2')
# 从sampled requests获取被攻击域名,并告警
host_counter = Counter()
print("Step 1: Getting all rate limited rule logs from WAF...")
for rule in rule_names:
response = wafv2_client.get_sampled_requests(
WebAclArn=arn,
RuleMetricName=rule,
Scope='CLOUDFRONT',
TimeWindow={
'StartTime': (datetime.utcnow() - timedelta(hours=3)).strftime("%Y-%m-%dT%H:%MZ") ,
'EndTime': datetime.utcnow().strftime("%Y-%m-%dT%H:%MZ")
},
MaxItems=500
)
for request in response['SampledRequests']:
headers = {header['Name']: header['Value'] for header in request['Request']['Headers']}
if 'Host' in headers:
host_counter[headers['Host']] += 1
most_common_host = host_counter.most_common(1)
print(f'Most common host: {most_common_host[0][0]} with {most_common_host[0][1]} requests')
print("Step 2: Getting all rate limited IPs from WAF...")
for rule_name in rule_names:
response = wafv2_client.get_rate_based_statement_managed_keys(
Scope='CLOUDFRONT',
WebACLName=acl_name,
WebACLId=acl_id,
RuleName=rule_name
)
managed_keys_ipv4 = response['ManagedKeysIPV4']['Addresses']
print(f"Rule: {rule_name} has: {len(managed_keys_ipv4)} blocked IPs")
print(managed_keys_ipv4)
merged_ip_addresses.extend(managed_keys_ipv4)
print(f"Total Rate limited ip: {len(merged_ip_addresses)}")
print(merged_ip_addresses)
print("Step 3: Getting existing IPs from ip set...")
response = wafv2_client.list_ip_sets(
Scope='CLOUDFRONT'
)
ip_sets = response['IPSets']
ip_set_id = None
lock_token = None
for ip_set in ip_sets:
if ip_set['Name'] == ip_set_name:
ip_set_id = ip_set['Id']
lock_token = ip_set['LockToken']
break
# 获取现有 IP 集的 IP 地址
response = wafv2_client.get_ip_set(
Name=ip_set_name,
Id=ip_set_id,
Scope='CLOUDFRONT'
)
existing_ip_addresses = response['IPSet']['Addresses']
print(f"Total existing IPs: {len(existing_ip_addresses)}")
print(existing_ip_addresses)
# 将 merged_ip_addresses 合并到现有 IP 地址中
merged_ip_addresses = list(set(existing_ip_addresses + merged_ip_addresses))
print(f"Merged IP addresses: {merged_ip_addresses}")
cidr_addresses = []
for ip_address in merged_ip_addresses:
ip, _ = ip_address.split('/')
cidr_address = '.'.join(ip.split('.')[:-1]) + '.0/24'
cidr_addresses.append(cidr_address)
# 对CIDR地址进行去重
unique_cidr_addresses = list(set(cidr_addresses))
print(f"Block /24 ip range : {len(unique_cidr_addresses)}")
print(unique_cidr_addresses)
print(f"Updating IP Set {ip_set_name} in CloudFront WAF...")
response = wafv2_client.update_ip_set(
Name=ip_set_name,
Scope='CLOUDFRONT',
Id=ip_set_id,
LockToken=lock_token,
Description='IP blacklist for Global WAF add by Cron',
Addresses=unique_cidr_addresses
)
print(f"Total blocked IP ranges: {len(unique_cidr_addresses)}")
其他的配置同上一篇文章,主要是 IAM role /超时时间/ EventBridge 定时执行 这三点设置。这个 Lambda 实现了以下功能:
- 检测 Cloudwatch 中的 Alarm 的状态,In-Alarm 才会执行拉黑
- 统计被攻击的域名,可以进行自定义告警,代码中只是打印到日志
- Cloudfront 只开启了 IPv4, 所以只获取 WAF 中被拦截的 IPv4 地址
- 获取 IP 地址后去重,并拦截整个C段 /24,减少 WAF IP Set 中的规则数量
- 每次执行先获取已经存在的 IP Set ,在加入新的攻击 IP 段
这里发现了 boto3 waf api 的一个坑,文档里面说 rate limit 规则可以在 rule group 中,查询时只需要带上 RuleGroupRuleName 参数即可,但是实际上是查询会抛异常 WAFNonexistentItemException ,所以不要将 rate limit 相关的规则放入 rule group 中。