分类 Cloud 下的文章

探索 Harbor API 的一些玩法

最近有个需求是通过 Harbor API 获取 image 的最新 tag,来发送提醒进行更新,于是简单研究了一下 Harbor 的API 用法。
Harbor 是一个功能丰富、易用且安全的开源 Docker Registry 项目,专为企业级容器镜像管理而设计。

主要特性包括:

  • 安全与合规性:Harbor 提供安全扫描和漏洞管理功能,自动扫描镜像中的漏洞并生成详细报告。
  • 多租户支持:为每个项目提供独立的命名空间,支持自定义角色,实现多租户镜像管理。
  • 多镜像源:支持镜像在多个 Registry 之间的复制、备份和迁移。
  • 直观界面:提供易用的图形界面,支持镜像的推送、拉取、删除、日志查询和配置管理。
  • 垃圾回收:定期清理不再使用的镜像,释放存储空间。
  • 可扩展性:架构设计支持功能扩展和插件添加。

Harbor REST-API 的认证和用法

Harbor的 REST-API 提供了灵活的接口,下面是API认证及简单用法的介绍:

API认证

Harbor API 采用基于 HTTP 的认证,可使用 curl 命令直接提供用户名和密码进行请求,示例:

curl -X GET -u 'username:password' "https://{HarborURL.OPSWILL.COM}:{PORT}/api/v2.0/projects"

可以通过 base64 算法加密用户名和密码,然后添加 Authorization 头进行认证:

curl -H "Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=" "https://{HarborURL.OPSWILL.COM}:{PORT}/api/v2.0/projects"

API 的使用方法

查看 Harbor 状态

可以通过/health API查看 Harbor 组件状态,示例:

curl -H 'Accept: application/json' "https://{HarborURL.OPSWILL.COM}:{PORT}/api/v2.0/health"

列出全部 projects

curl -H "Accept: application/json" -H "Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=" "https://{HarborURL.OPSWILL.COM}:{PORT}/api/v2.0/projects"

列出某个 project 下的全部 repositories

curl -H "Accept: application/json" -H "Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=" "https://{HarborURL.OPSWILL.COM}:{PORT}/api/v2.0/projects/{PROJECT_NAME}/repositories"

列出某个 repo 下的 artifacts

curl -H "Accept: application/json" -H "Authorization: Basic dXNlcm5hbWU6cGFzc3dvcmQ=" "https://{HarborURL.OPSWILL.COM}:{PORT}/api/v2.0/projects/{PROJECT_NAME}/repositories/{REPO_NAME}/artifacts"

通过上述命令可获取镜像的详细信息,包括标签、漏洞情况、命令、环境变量、暴露端口等。

全部 Harbor REST-API

可通过 Swagger 查看和使用所有可用 API ,直接访问链接:
Harbor REST-API Swagger

总结

Harbor API 使用起来还是不难的,配合脚本使用很方便。闲着没事又水了一篇文章 :) 。

Amazon EKS集群升级流程

AWS 会对每个 EKS 版本提供14个月支持,同时 AWS 会强制更新太旧的 EKS 集群,因此可能导致生产服务中断。要升级 EKS 集群,首先要注意以下事项:

  • 验证与应用程序关联的容器映像版本是否可用,主要是排除应用异常导致的干扰。
  • 检查集群所在子网有足够的可用 IP,可用IP不足也可能会导致 EKS 升级失败。
  • 检查 node group 配置中的最大不可用性节点数量,这个数量表示每次更新期间节点组中可以替换的节点数。在大规模集群中,设置为百分比更有利于提高更新的速度。

一、升级EKS的检查工作

EKS 不允许跨版本升级,在升级 EKS 集群之前需要做兼容性的检查:

1.1 与 Kubernetes 相关的变更

每一个版本的 EKS 都有一些 API 已被弃用。 因此需要检查特定版本更改的详细信息,需要参考 Kubernetes 的变更日志和 EKS 版本更新文档。

1.2 使用 kubent 检查 API 变更

国际版的 EKS web console 中提供了 Upgrade insights 工具,可以检查 API 变动。但是中国区没有这个工具,可以使用第三方工具 kube-no-trouble 进行检查。

  1. 下载 https://github.com/doitintl/kube-no-trouble/releases 并然后解压。
  2. 确保 kubectl 当前的 context 是要升级的 EKS 集群,使用下面的cmd来检查:

    kubectl config current-context
  3. 执行./kubent开始检查。 确保 >>> Deprecated APIs removed in 1.XX <<< 中没有任何内容。
    注意:要保证 aws cli 的版本应大于2.0,尽量使用新版本的 aws cli 。

如果API版本发生变化,需要参考 https://docs.amazonaws.cn/en_us/eks/latest/userguide/update-cluster.html 进行更新。

1.3 检查 addons 版本是否需要更新

所有的 addon 都在 kube-system namesapce中,首先需要确定所有 addon 需要升级的目标版本号,供后续更新使用。

1.3.1 检查兼容的版本

使用以下命令可以列出与 EKS 1.22 版本兼容的所有插件版本:

  1. 查看 kube-proxy 全部兼容的版本

    aws eks describe-addon-versions --kubernetes-version 1.22 --addon-name kube-proxy | grep addonVersion
  2. 查看 coredns 兼容的版本

    aws eks describe-addon-versions --kubernetes-version 1.22 --addon-name coredns | grep addonVersion
  3. 查看 vpc-cni 兼容的版本

    aws eks describe-addon-versions --kubernetes-version 1.22 --addon-name vpc-cni | grep addonVersion

    另外,建议结合 AWS 的文档确定最终要的 addon 版本:

https://docs.aws.amazon.com/eks/latest/userguide/managing-kube-proxy.html
https://docs.aws.amazon.com/eks/latest/userguide/managing-coredns.html
https://docs.aws.amazon.com/eks/latest/userguide/managing-vpc-cni.html

1.3.2 检查当前 addons 版本

您可以使用下面的 AWS CLI 命令来查询 addon的 image 版本:

#kube-proxy
kubectl get daemonset kube-proxy -n kube-system -o yaml | grep image

# coredns
kubectl get deployment coredns -n kube-system -o yaml | grep image

# vpc-cni
kubectl get daemonset aws-node -n kube-system -o yaml | grep image

对比一下当前的 addon 版本是否则在兼容列表中,如果不在就需要升级 addon 版本。如果使用了其他插件,也需要查看相关文档以确定兼容性。

二、更新 EKS 版本

EKS 不允许跨版本升级,所以在升级过程中,需要先升级到1.22版本,然后升级到1.23,最后升级到1.24,以此类推。
按照 EKS Control Plane -> EKS addons -> EKS node group 的顺序进行升级。

2.1 升级 EKS Control Plane

升级的第一步是先升级 EKS Control Plane,这一步很简单,只需要在 eks web console 中点击 “update now” 更新即可。一旦升级成功就无法降级到以前的版本,单独升级 EKS Control Plane 不会重新调度集群中的 Pod,只有升级 node group 时 Pod 才会被重新调度。

2.2 更新插件到兼容的版本

通常需要更新包括 kube-proxy、coredns、vpc-cni 等 addon 。如果 addon 是通过 eks web console 安装的,也可以在 web console 中直接进行升级。

如果插件是 self-managed 的,那么必须找到正确的版本,然后使用 kubectl set image 进行更新。

2.2.1 更新 kube-proxy

以中国区为例,首先打开 https://docs.amazonaws.cn/eks/latest/userguide/managing-kube-proxy.html , 需要在文档中找到 kube-proxy 在 ecr 中的镜像地址和 tag 。例如中国北京区的地址是:918309763551.dkr.ecr.cn-north-1.amazonaws.com.cn/eks/kube-proxy tag 是: v1.25.16-minimal-eksbuild.1

查看当前使用的 kube-proxy image :

kubectl get daemonset kube-proxy --namespace kube-system -o=jsonpath='{$.spec.template.spec.containers[:1].image}'

更新 image :

kubectl set image daemonset.apps/kube-proxy -n kube-system kube-proxy=918309763551.dkr.ecr.cn-north-1.amazonaws.com.cn/eks/kube-proxy:v1.25.16-minimal-eksbuild.1

命令执行后 kube-proxy 的 pod 就会使用新 image 重建,需要检查 pods 是否成功重建。

kubectl get pods -n kube-system

2.2.2 更新 coredns

同 kube-proxy 类似,访问 https://docs.amazonaws.cn/eks/latest/userguide/managing-coredns.html 找到 ecr 中coredns 的地址和 tag 。

查看当前的 image :

kubectl get deployment coredns --namespace kube-system -o=jsonpath='{$.spec.template.spec.containers[:1].image}'

更新 coredns image:

kubectl set image --namespace kube-system deployment.apps/coredns coredns=918309763551.dkr.ecr.region-code.amazonaws.com.cn/eks/coredns:v1.8.7-eksbuild.3

2.2.3 更新 vpc-cni

vpc-cni的更新不能使用上面的方式,首先访问 https://github.com/aws/amazon-vpc-cni-k8s/releases ,找到要升级的版本,下载对应的 yaml 文件,以v1.16.3 为例:
aws 中国区:https://raw.githubusercontent.com/aws/amazon-vpc-cni-k8s/v1.16.3/config/master/aws-k8s-cni-cn.yaml
aws 国际区:https://raw.githubusercontent.com/aws/amazon-vpc-cni-k8s/v1.16.3/config/master/aws-k8s-cni.yaml

需要确认 yaml 文件中: image 链接的 account numbmer/region code/release tag是否正确,如果下载了对应区域的 yaml 文件通常不需要修改。如果需要修改为防止出错,建议使用 sed 命令全部一次性修改:

sed -i.bak -e 's|cn-northwest-1|cn-north-1|' aws-k8s-cni-cn.yaml

更新 vpc-cni 插件:

kubectl apply -f aws-k8s-cni-cn.yaml

2.3 更新节点组

通常使用的是 aws managed 的 node group ,在升级完 Control Plane 后,在 node group 设置页直接点击 “Update now” 即可。可以选择 rolling update 或者 force update ,rolling update 会先调度 pods 到新节点,因此不会中断业务,但是会更耗时。对 self-managed node group 需要创建新节点组,将应用程序/Pod 迁移到新节点组,然后删除旧节点组。

3. EKS 升级后的检查

需要检查:

  1. 验证Control Plane 和节点组版本是否已更新。
  2. 检查插件的状态。
  3. 检查节点状态和版本。
  4. 检查应用是否正常。

至此 AWS EKS 的版本升级就结束了。总体来说对于简单的环境不是特别复杂,升级的风险总体可控,风险不是不大。

通过Boto3 API从WAF中获取攻击IP

之前写了一篇 借助 CloudWatch 和 WAF 缓解 DDOS 攻击,经过实践发现了一些问题:

  1. 在受到攻击时 Cloudwatch 产生存储和查询的费用都高
  2. 获取详细攻击信息使用了较多的 log 查询,查询次数多、时间长
  3. 通过 Cloudwatch 中日志统计攻击 IP ,获取 IP 比较慢,且不够准确

发生攻击时产生的请求已经非常多了,将请求转换成日志在分析,不是经济和有效率的做法。查了一下 boto3 的文档,发现 Waf 已经提供了一个获取被频率限制 ip 的 api : get_rate_based_statement_managed_keys
而且通过分析 sampled request : get_sampled_requests 可以获取攻击的域名。既然可以直接获取攻击 IP 和域名,Cloudwatch 的日志存储和分析的费用就可以省下了。

更新了一下 Lambda 代码,如下:

import boto3
from collections import Counter
from datetime import datetime, timedelta

def lambda_handler(event, context):

    acl_name = 'webapi'
    acl_id = 'ada04a19-0fc9-5abd-c497-6646b2369d8e'
    arn = 'arn:aws:wafv2:us-east-1:849873481391:global/webacl/webapi/eda04a18-9fc8-4abc-b497-5646b2369d8g'
    rule_names = ['WAF-RateLimit-URI','WAF-RateLimit-All']  
    ip_set_name = 'Global_blacklist-auto'
    
    # 检查 CloudWatch Alarm 状态
    print(f"Getting CloudWatch Alarm state...")
    cloudwatch_client = boto3.client('cloudwatch')
    alarm_name = 'WAF-ALL-BLOCKED-Req-5mins'
    
    response = cloudwatch_client.describe_alarms(
        AlarmNames=[alarm_name]
    )
    
    alarm_state = response['MetricAlarms'][0]['StateValue']
    print(f"CloudWatch Alarm state: {alarm_state}")
    
    if alarm_state == 'OK':
        print("CloudWatch Alarm is in OK state. Exiting Lambda function.")
        return
    
    merged_ip_addresses = []
    wafv2_client = boto3.client('wafv2')

    # 从sampled requests获取被攻击域名,并告警
    host_counter = Counter()
    print("Step 1: Getting all rate limited rule logs from WAF...")
    for rule in rule_names:
        response = wafv2_client.get_sampled_requests(
            WebAclArn=arn,
            RuleMetricName=rule,
            Scope='CLOUDFRONT',
            TimeWindow={
                'StartTime': (datetime.utcnow() - timedelta(hours=3)).strftime("%Y-%m-%dT%H:%MZ") ,
                'EndTime': datetime.utcnow().strftime("%Y-%m-%dT%H:%MZ")
            },
            MaxItems=500  
        )
        
        for request in response['SampledRequests']:
            headers = {header['Name']: header['Value'] for header in request['Request']['Headers']}
            if 'Host' in headers:
                host_counter[headers['Host']] += 1
    
    most_common_host = host_counter.most_common(1)
    print(f'Most common host: {most_common_host[0][0]} with {most_common_host[0][1]} requests')

    print("Step 2: Getting all rate limited IPs from WAF...")
    for rule_name in rule_names:
        response = wafv2_client.get_rate_based_statement_managed_keys(
            Scope='CLOUDFRONT',
            WebACLName=acl_name,
            WebACLId=acl_id,
            RuleName=rule_name
        )
        
        managed_keys_ipv4 = response['ManagedKeysIPV4']['Addresses']
        print(f"Rule: {rule_name} has:  {len(managed_keys_ipv4)} blocked IPs")
        print(managed_keys_ipv4)
        merged_ip_addresses.extend(managed_keys_ipv4)
    
    print(f"Total Rate limited ip:  {len(merged_ip_addresses)}")
    print(merged_ip_addresses)
    
    print("Step 3: Getting existing IPs from ip set...")
    response = wafv2_client.list_ip_sets(
        Scope='CLOUDFRONT'
    )
    ip_sets = response['IPSets']
    
    ip_set_id = None
    lock_token = None

    for ip_set in ip_sets:
        if ip_set['Name'] == ip_set_name:
            ip_set_id = ip_set['Id']
            lock_token = ip_set['LockToken']
            break
    
    # 获取现有 IP 集的 IP 地址
    response = wafv2_client.get_ip_set(
        Name=ip_set_name,
        Id=ip_set_id,
        Scope='CLOUDFRONT'
    )
    existing_ip_addresses = response['IPSet']['Addresses']
    
    print(f"Total existing IPs: {len(existing_ip_addresses)}")
    print(existing_ip_addresses)
    
    # 将 merged_ip_addresses 合并到现有 IP 地址中
    merged_ip_addresses = list(set(existing_ip_addresses + merged_ip_addresses))
    print(f"Merged IP addresses: {merged_ip_addresses}")
    
    cidr_addresses = []
    for ip_address in merged_ip_addresses:
        ip, _ = ip_address.split('/')
        cidr_address = '.'.join(ip.split('.')[:-1]) + '.0/24'
        cidr_addresses.append(cidr_address)
    # 对CIDR地址进行去重
    unique_cidr_addresses = list(set(cidr_addresses))

    print(f"Block /24 ip range : {len(unique_cidr_addresses)}")
    print(unique_cidr_addresses)
    print(f"Updating IP Set {ip_set_name} in CloudFront WAF...")
    response = wafv2_client.update_ip_set(
        Name=ip_set_name,
        Scope='CLOUDFRONT',
        Id=ip_set_id,
        LockToken=lock_token,
        Description='IP blacklist for Global WAF add by Cron',
        Addresses=unique_cidr_addresses
    )
    print(f"Total blocked IP ranges: {len(unique_cidr_addresses)}")

其他的配置同上一篇文章,主要是 IAM role /超时时间/ EventBridge 定时执行 这三点设置。这个 Lambda 实现了以下功能:

  1. 检测 Cloudwatch 中的 Alarm 的状态,In-Alarm 才会执行拉黑
  2. 统计被攻击的域名,可以进行自定义告警,代码中只是打印到日志
  3. Cloudfront 只开启了 IPv4, 所以只获取 WAF 中被拦截的 IPv4 地址
  4. 获取 IP 地址后去重,并拦截整个C段 /24,减少 WAF IP Set 中的规则数量
  5. 每次执行先获取已经存在的 IP Set ,在加入新的攻击 IP 段

这里发现了 boto3 waf api 的一个坑,文档里面说 rate limit 规则可以在 rule group 中,查询时只需要带上 RuleGroupRuleName 参数即可,但是实际上是查询会抛异常 WAFNonexistentItemException ,所以不要将 rate limit 相关的规则放入 rule group 中。

借助 CloudWatch 和 WAF 缓解 DDOS 攻击

先来几张图说明一下做这个的原因:

aws-ddos-attack-waf.png
aws-ddos-attack-waf-traffic.png
aws-ddos-attack-bill.png

一、前提条件

需要使用以下几个服务:

  1. AWS WAF 启用全局频率限制、全局黑名单、启用 logging 输出日志
  2. AWS CloudWatch 存储日志,攻击检测,发送告警到SNS
  3. AWS SNS 触发Lambda相关功能
  4. AWS Lambda 发送通知,更新 AWS WAF 全局黑名单
  5. Amazon EventBridge 定期执行 Lambda函数,更新 WAF 黑名单

1.1 AWS WAF 配置

需要在 WAF ACL 中设置以下几项:

  1. 创建一个 IP Sets 作为全局黑名单
  2. 创建一个全局黑名单规则,如果 IP 在全局黑名单中则 block,并设置优先级最高
  3. 根据业务需求创建一个全局频率限制规则,优先级仅次于黑名单规则
  4. 启用 WAF 的 logging,设置 Logging destination 为 CloudWatch,设置一个 filter : rule action on request 是 block 就 keep in logs

二、攻击检测及告警配置

2.1 Cloudwatch 配置

Cloudwatch 的告警基于 WAF ACL 中频率限制的规则。在 Cloudwatch 中设置一个告警,告警 Metric 基于 AWS/WAFV2 中 五分钟内的 BlockedRequests 总数,如果5分钟内被限制的请求大于 3000 则是 In alarm 状态,则触发 SNS Alert 告警通知;否则是 OK 状态,触发 SNS OK 通知 。

2.2 Amazon SNS 配置

SNS 需要配置两个 Standard 的 topic,一个是WAF-Ddos-Alert,一个是WAF-Ddos-Ok,用于触发 Lambda 发送攻击告警和攻击结束通知。

2.3 编写 Lambda 实现攻击检测和告警

新建一个 Lambda python function,代码放在下面,设置以下参数:

  1. trigger 设置为 SNS topic : WAF-Ddos-Alert
  2. Lambda 的 Execution role 需添加权限 : CloudWatchReadOnlyAccess CloudWatchLogsReadOnlyAccess
  3. 设置 telegram 相关的环境变量 : BOT_TOKEN 和 USER_WILL
  4. 设置超时时间为5分钟

代码如下:

import os
import json
import urllib3
import boto3
from datetime import datetime, timedelta
import time

def lambda_handler(event, context):
    
    # 从环境变量中获取 Telegram Bot Token 和聊天 ID
    bot_token = os.environ['BOT_TOKEN']
    user_will_chat_id = os.environ['USER_WILL']
    
    send_telegram_message(bot_token,user_will_chat_id, "请注意有DDos攻击!!!")
    
    #按实际情况修改
    log_group_name = 'aws-waf-logs-All-Blocked-Logs'
    
    # 获取当前时间和五分钟前的时间
    current_time = datetime.now()
    period = current_time - timedelta(minutes=15)
    
    # 创建 CloudWatch Logs Insights 客户端
    logs_client = boto3.client('logs')
    queries = [
        {
            'name': 'Top5_Host',
            'query': "fields @timestamp, @message | parse @message '{\"name\":\"Host\",\"value\":\"*\"}' as host | stats count(*) as requestCount by host | sort requestCount desc | limit 5"
        },
        {
            'name': 'Top5_IP',
            'query': "fields httpRequest.clientIp | stats count(*) as requestCount by httpRequest.clientIp | sort requestCount desc | limit 5"
        },
        {
            'name': 'Top5_Country',
            'query': "fields httpRequest.country | stats count(*) as requestCount by httpRequest.country | sort requestCount desc | limit 5"
        },
        {
            'name': 'Top5_Rule',
            'query': "fields terminatingRuleId | stats count(*) as requestCount by terminatingRuleId | sort requestCount desc | limit 5"
        }
    ]

    msg = '\n'
    for query in queries:
        response = logs_client.start_query(
            logGroupName=log_group_name,
            startTime=int(period.timestamp()),
            endTime=int(current_time.timestamp()),
            queryString=query['query']
        )
        query_id = response['queryId']
        print(f"Started query {query['name']}. Query ID: {query_id}")
    
        # 等待查询完成
        retries = 0
        max_retries = 5
        while retries < max_retries:
            query_response = logs_client.get_query_results(queryId=query_id)
            if len(query_response['results']) > 0 and 'status' in query_response:
                status = query_response['status']
                print(f"Query Status: {status}")
                print(query_response)
                
                if status == 'Complete':
                    break
            retries += 1
            if retries == max_retries:
                raise Exception("Query did not complete within the allowed time")
            else:
                print(f"Retrying {retries}...")
                time.sleep(10) 
                
        msg += f"{query['name']}: \n"
        for result in query_response['results']:
            request_item = [item['value'] for item in result ][0]
            request_count = [item['value'] for item in result if item['field'] == 'requestCount'][0]
            
            msg += f"{request_item} --> {request_count}\n"


    print(msg)
    
    send_telegram_message(bot_token,user_will_chat_id, msg)
    
def send_telegram_message(bot_token, chat_id, message):
    bot_api_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    http = urllib3.PoolManager()

    # 准备请求数据
    data = {
        'chat_id': chat_id,
        'text': message
    }

    # 发送 POST 请求到 Telegram 机器人 API
    msg_response = http.request('POST', bot_api_url, fields=data)
    
    # 根据响应内容判断是否成功发送消息
    if json.loads(msg_response.data.decode('utf-8'))['ok']:
        print(f"Message sent to chat {chat_id}: {message}")
    else:
        print(f"Failed to send message to chat {chat_id}")

该 Lambda 函数会在收到SNS通知时,先发送攻击通知,然后从 Cloudwatch 中所有被 block 请求中查询访问次数最多的5个域名/5个IP/5个国家/5个匹配到的waf规则。在发送 Cloudwatch 查询到的攻击信息。

三、攻击缓解

有了以上的准备工作,缓解攻击只需要统计一下 、攻击的 IP,然后更新 WAF 的黑名单即可,为了快速检测攻击 IP,可以在Amazon EventBridge 中设置一个定时器,每五分钟触发一下 Lambda 更新 WAF的黑名单。

3.1 EventBridge 设置 Scheduler

在 EventBridge -> Scheduler 中新建一个定时器,类型为:Recurring schedule,Schedule type是 Rate-based schedule 时间设置成5分钟,Target 设置为 Lambda 下面的函数。

3.2 使用 Lambda 更新 WAF 黑名单

在创建一个 Lambda函数,配置参数如下:

  1. trigger 可以不设置,如果需要手动拉黑,在可以设置为一个新的SNS topic
  2. Lambda 的 Execution role 需添加权限 : CloudWatchReadOnlyAccess、CloudWatchLogsReadOnlyAccess、AWSWAFFullAccess
  3. 设置超时时间为5分钟

Lambda 代码如下:

import boto3
from datetime import datetime, timedelta
import time

def lambda_handler(event, context):
    log_group_name = 'aws-waf-logs-All-Blocked-Logs'
    ip_set_name = 'Global_blacklist-auto'
    
    # 获取当前时间和统计攻击的时间窗口
    current_time = datetime.now()
    period = current_time - timedelta(minutes=10)
    
    # 创建 CloudWatch Logs Insights 客户端
    logs_client = boto3.client('logs')
    
    print("Step 1: Executing query to filter blocked IP addresses...")
    
    # 执行查询语句,获取统计时间窗口内被 block 次数大于100次的 IP 地址列表
    query = "fields httpRequest.clientIp | stats count(*) as requestCount by httpRequest.clientIp | filter requestCount > 100 | sort requestCount desc"

    response = logs_client.start_query(
        logGroupName=log_group_name,
        startTime=int(period.timestamp()),
        endTime=int(current_time.timestamp()),
        queryString=query
    )
    query_id = response['queryId']
    print(f"Started with Query ID: {query_id}")
    
    print("Step 2: Waiting for query to complete...")
    
    # 等待查询完成
    retries = 0
    max_retries = 10
    while retries < max_retries:
        query_status = logs_client.get_query_results(queryId=query_id)
        if len(query_status['results']) > 0 and 'status' in query_status:
            status = query_status['status']
            print(f"Query Status: {status}")
            print(query_status)
            if status == 'Complete':
                break
        retries += 1
        if retries == max_retries:
            raise Exception("Query did not complete within the allowed time")
        else:
            print(f"Retrying {retries}...")
            time.sleep(10) 
            
    print("Step 3: Getting all blocked ips from cloudwatch...")
    # 提取 IP 地址并整理为 WAF 可以使用的格式
    ips = [result[0]['value'] for result in query_status['results']]
    
    # 去重 IP 地址
    ips = list(set(ips))
    print(f"taotal ip: {len(ips)}")
    print(ips)
    
    print("Step 4: Getting IP Addresses from ip_set in CloudFront WAF...")
    wafv2_client = boto3.client('wafv2')
    response = wafv2_client.list_ip_sets(
        Scope='CLOUDFRONT'
    )
    ip_sets = response['IPSets']
    
    ip_set_id = None
    lock_token = None

    for ip_set in ip_sets:
        if ip_set['Name'] == ip_set_name:
            ip_set_id = ip_set['Id']
            lock_token = ip_set['LockToken']
            break
    
    # 获取现有 IP 集的 IP 地址
    response = wafv2_client.get_ip_set(
        Name=ip_set_name,
        Id=ip_set_id,
        Scope='CLOUDFRONT'
    )
    existing_ip_addresses = response['IPSet']['Addresses']
    print(f"tatal existing ips: {len(existing_ip_addresses)}")
    print(existing_ip_addresses)
    
    print(f"Step 5: Updating IP Set {ip_set_name} in CloudFront WAF...")
    
    # 将 cidr_addresses 合并到现有 IP 地址中
    cidr_addresses = [ip + '/32' for ip in ips]
    merged_ip_addresses = list(set(existing_ip_addresses + cidr_addresses))
    print(merged_ip_addresses)
    
    response = wafv2_client.update_ip_set(
        Name=ip_set_name,
        Scope='CLOUDFRONT',
        Id=ip_set_id,
        LockToken=lock_token,
        Description='IP blacklist for Global WAF',
        Addresses=merged_ip_addresses
    )
    print(f"total blocked ips: {len(merged_ip_addresses)} ")

3.3 其他设置

其他措施主要是完善 WAF 规则,增强 lambda 脚本的功能 及 DNS 分流等,可以根据实际情况做很多配置。

四、告警通知和效果

攻击时需要注意一下 WAF/Cloudfront/Cloudwatch的费用,心里要有预期。
两个 Lambda 函数都有非常详细的日志打印到 Cloudwatch 的 log group 中,可以从 Cloudwatch中查看。
告警通知如下:
aws-ddos-attack-tg-msg.png

借助 Cloudflare Workers 实现查询 ip 信息的API

在使用Cloudflare CDN时,CDN传给后端服务器中只传递了有限的 http request hearder
其中只包含了非常简陋的 ip 信息,例如只有 CF-IPCountry ,不足以实现一个查询ip信息的api,要想实现查询ip详细信息需要借助 Cloudflare Workers 来实现。

Cloudflare Workers 传入的 HTTP 请求都被称为 fetch 事件,fetch 事件中都包含一个 Request 接口实例,在这个 Request 实例中就包括了访问 ip 的详细信息

通过访问 Request.cf 可以获得 Cloudflare 提供的请求信息,包含了 ip 的详细信息;访问 Request.headers 则可以获得访客的 HTTP headers,写一个简单的 Workers 处理一下 Request.cf 、Request.headers 中的信息,并返回 Json 格式即可。

访问该 Cloudflare Workers 后返回包含访客 IP、Continent、Country、Region、ASN 等 ip 信息及请求的Header信息,同时也会把 Cloudflare 的 https 连接信息及 bot 识别的相关信息一起返回。

代码如下:

export default {
  async fetch(request) {
    const data = {
      Method: request.method,
      Url: request.url,
      IP: {
        IP: request.headers.get('CF-Connecting-IP'),
        Continent: request.cf.continent,
        Country: request.cf.country,
        IsEU: request.cf.isEUCountry,
        Region: request.cf.region,
        RegionCode: request.cf.regionCode,
        City: request.cf.city,
        Latitude: request.cf.latitude,
        Longitude: request.cf.longitude,
        PostalCode: request.cf.postalCode,
        MetroCode: request.cf.metroCode,
        Colo: request.cf.colo,
        ASN: request.cf.asn,
        ASOrganization: request.cf.asOrganization,
        Timezone: request.cf.timezone
      },
      Headers: {},
      Security: {}
    };

 // 遍历并存储每个 HTTP 头,排除以 cf- 开头的 HTTP 头
 request.headers.forEach((value, name) => {
  if (!name.toLowerCase().startsWith('cf-')) {
    data.Headers[name] = value;
  }
});

    // 遍历 request.cf 并存储所需对象的属性到 Security 中
    for (const key in request.cf) {
      if (
          key == 'clientTcpRtt' 
          || key == 'tlsCipher'
          || key == 'tlsVersion'
          || key == 'httpProtocol'
          || key == 'clientHandshake'
          || key == 'clientFinished'
          || key == 'serverHandshake'
          || key == 'serverFinished'
          || key == 'corporateProxy' 
          || key == 'verifiedBot' 
          || key == 'score'
          
          ) {
      if (typeof request.cf[key] === 'object') {
        for (const innerKey in request.cf[key]) {
          data.Security[innerKey] = request.cf[key][innerKey];
        }
      } else {
        data.Security[key] = request.cf[key];
      }
    }
    }

    var dataJson = JSON.stringify(data, null, 4);
    console.log(dataJson);

    return new Response(dataJson, {
      headers: {
        "Content-Type": "application/json;charset=UTF-8"
      }
    })
  }
};

需要注意 Cloudflare Workers 的域名被屏蔽了,建议绑定一个域名来访问。请求该 Workers 后返回如下 JSON :

{
    "Method": "GET",
    "Url": "https://ip.ipip.dev/",
    "IP": {
        "IP": "149.234.194.181",
        "Continent": "NA",
        "Country": "US",
        "Region": "Missouri",
        "RegionCode": "MO",
        "Latitude": "38.57740",
        "Longitude": "-90.67090",
        "Colo": "IAD",
        "ASN": 23167,
        "ASOrganization": "Bayer-arch",
        "Timezone": "America/Chicago"
    },
    "Headers": {
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
        "accept-encoding": "gzip",
        "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
        "connection": "Keep-Alive",
        "host": "ip.ipip.dev",
        "sec-ch-ua": "\"Not/A)Brand\";v=\"99\", \"Google Chrome\";v=\"115\", \"Chromium\";v=\"115\"",
        "sec-ch-ua-mobile": "?0",
        "sec-ch-ua-platform": "\"Windows\"",
        "sec-fetch-dest": "document",
        "sec-fetch-mode": "navigate",
        "sec-fetch-site": "none",
        "sec-fetch-user": "?1",
        "upgrade-insecure-requests": "1",
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
        "x-forwarded-proto": "https",
        "x-real-ip": "149.234.194.181"
    },
    "Security": {
        "clientTcpRtt": 1,
        "tlsCipher": "AEAD-AES128-GCM-SHA256",
        "tlsVersion": "TLSv1.3",
        "httpProtocol": "HTTP/2"
    }
}

使用 AWS Lambda发送 Telegram 警报

一、应用场景

有几个 cloudfront distribution 启用了 WAF,经常遇到扫描和攻击,请求数太多了。为了不产生天价账单,做了一个 Telegram 告警,有警报可以实时处理。

二、涉及服务

  1. AWS Cloudfront: CDN服务进行全球加速
  2. AWS WAF: 启用应用防护,过滤异常请求
  3. AWS Cloudwatch: 收集 Cloudfront/WAF 指标,创建告警触发 SNS
  4. AWS SNS: 发送通知,并触发 Lambda 执行
  5. AWS Lambda: 获取 CDN 的指标,并发送 Telegram 通知

三、准备工作

  1. WAF:设置启用防火墙的过滤规则,设置频率限制等,拦截异常请求
  2. Cloudwatch: 根据 WAF 规则创建警报,例如5分钟内如果有3000个异常请求就报警,并创建一个 SNS Topic
  3. AWS SNS: 无需特别设置,保持默认即可,也可以继续添加多个subscription订阅,实现多种告警方式
  4. AWS Lambbda: 编写代码从 Cloudwatch 中获取 Cloudfront 性能统计指标,并发送给 Telegram 机器人
  5. Telegram: 申请一个新的机器人并拿到 token , 难道需要接受告警的 user id(有专门的机器人可以获得)

四、部署告警服务

4.1 创建 Lambda 函数

先创建一个 lambda function,Runtime 选择 python 3.10.其他使用默认设置即可。
代码如下:

import os
import json
import urllib3
import boto3
from datetime import datetime, timedelta

def lambda_handler(event, context):
    # 从环境变量中获取 Telegram Bot Token 和聊天 ID

    bot_token = os.environ['BOT_TOKEN']
    user_will_chat_id = os.environ['CHAT_ID']

    cloudfront_client = boto3.client('cloudfront')
    cloudwatch_client = boto3.client('cloudwatch')

    # 获取所有 CloudFront distributions
    distributions = cloudfront_client.list_distributions()["DistributionList"]["Items"]

    # 获取每个 Distribution 的请求数和错误率
    distribution_metrics = []
    for distribution in distributions:
        distribution_id = distribution["Id"]
        distribution_description = distribution['Comment']
        
        requests = get_metrics(cloudwatch_client, distribution_id, 'Requests', ['Sum'])
        error_rate = get_metrics(cloudwatch_client, distribution_id, 'TotalErrorRate', ['Average'])

        # 将结果添加到列表中
        distribution_metrics.append({
            'DistributionName': distribution_description,
            'TotalRequests': requests,
            'TotalErrorRate': error_rate
        })
    #按TotalRequests和TotalErrorRate进行排序
    sorted_request_metrics = sorted(distribution_metrics, key=lambda x: x['TotalRequests'] if x['TotalRequests'] is not None else 0, reverse=True)
    sorted_rate_metrics = sorted(distribution_metrics, key=lambda x: x['TotalErrorRate'] if x['TotalErrorRate'] is not None else 0, reverse=True)

    #获取请求数前3和错误率前3
    top3_request_metrics = sorted_request_metrics[:3]
    top3_rate_metrics = sorted_rate_metrics[:3]
    
    # 发送telegram bot的消息
    msg = f"按请求数(1h)排序前三:\n"
    for i, metric in enumerate(top3_request_metrics):
        msg += f"{i+1}. {metric['DistributionName']} 请求数:{metric['TotalRequests']} 错误率:{round(metric['TotalErrorRate'], 2)}  \n"
    msg += "\n按错误率(1h)排序前三:\n"
    for i, metric in enumerate(top3_rate_metrics):
        msg += f"{i+1}. {metric['DistributionName']}  错误率:{round(metric['TotalErrorRate'], 2)}  请求数:{metric['TotalRequests']} \n"

    # 发送警报通知
    send_telegram_message(bot_token,user_will_chat_id, "请注意CDN有攻击!!!")
    send_telegram_message(bot_token,user_will_chat_id, msg)

def send_telegram_message(bot_token, chat_id, message):
    bot_api_url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    http = urllib3.PoolManager()

    # 准备请求数据
    data = {
        'chat_id': chat_id,
        'text': message
    }

    # 发送 POST 请求到 Telegram 机器人 API
    msg_response = http.request('POST', bot_api_url, fields=data)
    
    # 根据响应内容判断是否成功发送消息
    if json.loads(msg_response.data.decode('utf-8'))['ok']:
        print(f"Message sent to chat {chat_id}: {message}")
    else:
        print(f"Failed to send message to chat {chat_id}")

        
def get_metrics(cloudwatch_client, distribution_id, metric_name, statistics):
    #获取当前时间和一小时前时间
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=1)

    #获取指定的metric
    response = cloudwatch_client.get_metric_statistics(
        Namespace='AWS/CloudFront',
        MetricName=metric_name,
        Dimensions=[
            {'Name': 'Region', 'Value': 'Global'},
            {'Name': 'DistributionId', 'Value': distribution_id}
        ],
        StartTime=start_time,
        EndTime=end_time,
        Period=3600,
        Statistics=statistics
    )

    # 提取并打印请求数统计数据
    datapoints = response['Datapoints']
    for datapoint in datapoints:
        value = datapoint[statistics[0]]
        return value

以上代码会读取所有 Cloudfront distribution , 并从 Cloudwatch 中获取请求数和总的错误率,并取前三,发送给 Telegram 机器人。

4.2 设置 Lambda 函数

4.2.1 设置 Lambda 函数的权限

找到默认的 Lambda role:

Lambda function --> Configuration --> Permissions -->Execution role --> Role name

在 IAM 中打开 Role name,选择 add permission ,添加 CloudFrontReadOnlyAccess 和 CloudWatchReadOnlyAccess 两个权限,保存即可

4.2.2 设置 Lambda 函数的Timeout

由于 Cloudfront 的 distribution 比较多,获取指标比较慢,Lambda 默认 3s 的超时时间比较短,需要增加到至少1分钟.

Lambda function --> Configuration --> General configuration --> Timeout

4.2.3 设置 Lambda 函数的环境变量

需要在环境变量中设置 Telegram 机器人的 token 和用户 id :

Lambda function --> Configuration -->Environment variables 

添加两个环境变量:BOT_TOKEN 和 CHAT_ID

五、 测试告警

设置好以上后,在 SNS 点击对应的 Topic ,然后选择 Publish message ,在 Message body 中随便输入点内容,点击 Publish message 即可,Telegram 就会收到通知了。

telegram-alert.png

最新文章

最近回复

分类

归档

统计

  • 文章总数:167篇
  • 分类总数:5个
  • 评论总数:103条
  • 页面总数:171个
  • 本站运行:4865天

其它