脚本分享:CDN 流量触顶后自动切换至 Cloudflare



方案价值与应用场景

防御性场景 – DDoS 攻击流量过载保护

当你的 CDN 被恶意攻击,触发流量封顶限制。程序会将 DNS 指向 Cloudflare,保证站点访问的同时利用 Cloudflare 的防火墙功能抵御攻击。
当你做好 IP 屏蔽,重新启用 CDN 时,程序会自动恢复原本的国内外分流状态。

运维性场景 – 基础设施维护无缝切换

当你需要关闭 CDN/切换 CDN 时,程序会将 DNS 指向 Cloudflare 保证站点访问。
当你重新启用 CDN 时,程序会自动恢复原本的国内外分流状态。

前提条件

已部署 DNS 国内外分流

DNS 国内外分流 简易设置教程
  1. 目标域名使用国内 DNS(如阿里 DNS)
  2. 目标域名配置好境内 CDN
  3. 目标域名借助第二域名配置好 Cloudflare for SaaS
  4. DNS 设置国内线路 cname 解析值设定为 CDN 提供的域名
  5. DNS 设置海外线路 cname 解析值设定为 Cloudflare for SaaS 提供的域名

配置好后的架构图如下所示

geodns-routing-arch

图像生成源码
@startuml
left to right direction

actor 访客 as visitor

package "互联网" {
  note "域名" as site_address #orange

  node "DNS服务器" as dns {
    component "境内 cname 解析" as cname_cn
    component "境外 cname 解析" as cname_intl
  }
}

node "境内CDN" as cdn #lightblue {
  component "CDN 提供的域名" as cdn_address #orange
}


node "Cloudflare" as cf #skyblue {
  component "Cloudflare for SaaS(自定义主机名)提供的域名" as cf_address #orange
}

visitor --> site_address : 访问站点
site_address --> dns : DNS查询

dns --> cname_cn : 境内解析分流
dns --> cname_intl : 境外解析分流

cname_cn --> cdn_address
cname_intl --> cf_address
@enduml

程序架构设计

代码流程图


title: CDN状态切换与DNS配置流程图

%%{init: { “theme”: “light” } }%%
flowchart TD
A([开始]) –> B[程序初始化]
B –> C(进入CDN停用监测)
C –> D{CDN是否停用?}
D –>|是| E[执行操作\n关闭 DNS 分流]
E –> F[暂停国内默认CNAME记录\n海外CNAME记录的线路改为默认]
F –> G(进入CDN启用监测)
D –>|否| H[等待后重新监测]
H –> D

G –> I{CDN是否启用?}
I –>|是| J[执行操作\n启用 DNS 分流]
J –> K[国内线路改为海外\n启用原国内解析]
K –> C
I –>|否| L[等待后重新监测]
L –> I

classDef action fill:#f9f,stroke:#333;
classDef condition fill:#ccf,stroke:#333;
class E,F,J,K action;
class D,I condition;


title: CDN状态切换与DNS配置流程图

%%{init: {
“theme”: “dark”,
“themeVariables”: {
“primaryColor”: “#1a1a1a”,
“edgeLabelBackground”:”#2D2D2D”
}
}}%%
flowchart TD
A([开始]) –> B[程序初始化]
B –> C(进入CDN停用监测)
C –> D{CDN是否停用?}
D –>|是| E[执行操作\n关闭 DNS 分流]
E –> F[暂停国内默认CNAME记录\n海外CNAME记录的线路改为默认]
F –> G(进入CDN启用监测)
D –>|否| H[等待后重新监测]
H –> D

G –> I{CDN是否启用?}
I –>|是| J[执行操作\n启用 DNS 分流]
J –> K[国内线路改为海外\n启用原国内解析]
K –> C
I –>|否| L[等待后重新监测]
L –> I

classDef action fill:#6366f1,stroke:#374151,stroke-width:2px,color:#fff;
classDef condition fill:#14b8a6,stroke:#374151,stroke-width:2px,color:#fff;
class E,F,J,K action;
class D,I condition;

程序监测的情况

触发停用监测的情况

  • 攻击者消耗 CDN 流量达到上限,触发设置的流量封顶限制,自动停用。
  • 手动停用。

触发启用监测的情况

  • 手动启用。

以下情况不会触发监测

  • 修改 CDN 配置,CDN 会标注为部署中状态,此时不会触发监测。

代码分享及部署教程

由于我使用的是多吉云 CDN + 阿里 DNS,此处脚本使用的是多吉云 CDN 和阿里云提供的 API。
如您有需要适配其他 CDN 和 DNS 的需求,请在此评论区留言,或前往关于页寻找电子邮箱地址留言。适配后我将第一时间通过邮件的方式通知您。

使用方法:

  1. 安装 Python(3.12 或更高版本)

  2. 安装所需的包

    • pip install alibabacloud_alidns20150109==3.5.7
    • 镜像加速下载:pip install alibabacloud_alidns20150109==3.5.7 -i
  3. 创建 env.toml 复制粘贴以下内容

dogecloud_access_key = 'xxxxxxxxxxxxxxxx'
dogecloud_secret_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
aliyun_accesskey_id = 'XXXXXXXXXXXXXXXXXXXXXXXX'
aliyun_accesskey_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
domain = 'example.com'
rr="@"
  1. 创建并获取多吉云 AccessKey

  2. 创建并获取阿里云 AccessKey

  3. 填写配置文件

    • dogecloud_access_key 填写多吉云 AccessKey
    • dogecloud_secret_key 填写多吉云 SecretKey
    • aliyun_accesskey_id 填写阿里云 AccessKey ID
    • aliyun_accesskey_secret 填写阿里云 AccessKey Secret
    • domain 填写你的主域名
    • rr 填写你域名的 RR 字段
    • 举例:
      • example.com -> domain 填 example.com, rr 填 @
      • blog.example.com -> domain 填 example.com, rr 填 blog
  4. 将以下代码复制到 main.py,执行即可。

点我展开折叠代码

警告:此代码 dogecloud_api 为多吉云官方提供的 SDK 代码,create_clientdisable_default_enable_overseaenable_default_convert_to_oversea 是由阿里云自动生成得来。笔者花费了 27 分钟将这些 SDK 粘在一起。代码中有无法解释的注释/风格混乱是意料之中的情况。

import hmac
import json
import sys
import time
import tomllib
import urllib
from hashlib import sha1
from typing import List

import requests
from alibabacloud_alidns20150109 import models as alidns_20150109_models
from alibabacloud_alidns20150109.client import Client as Alidns20150109Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models

with open("env.toml", "rb") as f:
    config: dict = tomllib.load(f)
dogecloud_access_key = config["dogecloud_access_key"]
dogecloud_secret_key = config["dogecloud_secret_key"]
aliyun_accesskey_id = config["aliyun_accesskey_id"]
aliyun_accesskey_secret = config["aliyun_accesskey_secret"]
domain = config["domain"]
rr = config["rr"]
if rr == "@":
    full_domain = domain
else:
    full_domain = rr + "." + domain


def dogecloud_api(api_path, data={}, json_mode=False):
    """
    调用多吉云API

    :param api_path:    调用的 API 接口地址,包含 URL 请求参数 QueryString,例如:/console/vfetch/add.json?url=xxx&a=1&b=2
    :param data:        POST 的数据,字典,例如 {'a': 1, 'b': 2},传递此参数表示不是 GET 请求而是 POST 请求
    :param json_mode:   数据 data 是否以 JSON 格式请求,默认为 false 则使用表单形式(a=1&b=2)

    :type api_path: string
    :type data: dict
    :type json_mode bool

    :return dict: 返回的数据
    """

    with open("env.toml", "rb") as f:
        config = tomllib.load(f)
    access_key = config["dogecloud_access_key"]
    secret_key = config["dogecloud_secret_key"]

    body = ""
    mime = ""
    if json_mode:
        body = json.dumps(data)
        mime = "application/json"
    else:
        body = urllib.parse.urlencode(data)
        mime = "application/x-www-form-urlencoded"
    sign_str = api_path + "\n" + body
    signed_data = hmac.new(secret_key.encode("utf-8"), sign_str.encode("utf-8"), sha1)
    sign = signed_data.digest().hex()
    authorization = "TOKEN " + access_key + ":" + sign
    response = requests.post(
        " + api_path,
        data=body,
        headers={"Authorization": authorization, "Content-Type": mime},
    )
    return response.json()


def create_client() -> Alidns20150109Client:
    """
    使用AK&SK初始化账号Client
    @return: Client
    @throws Exception
    """
    with open("env.toml", "rb") as f:
        config = tomllib.load(f)
    config = open_api_models.Config(
        access_key_id=config["aliyun_accesskey_id"],
        access_key_secret=config["aliyun_accesskey_secret"],
    )
    # Endpoint 请参考 
    config.endpoint = f"alidns.cn-huhehaote.aliyuncs.com"
    return Alidns20150109Client(config)


def disable_default_enable_oversea(args: List[str]) -> None | int:
    """
    禁用默认CNAME记录,将海外记录设为默认线路
    """
    client = create_client()
    add_custom_line_request = alidns_20150109_models.DescribeDomainRecordsRequest(
        lang="zh", domain_name=domain, rrkey_word=rr
    )
    try:
        response = client.describe_domain_records(add_custom_line_request)
        if response.status_code != 200:
            return 500

        records = response.body.domain_records.record
        default_cname_record_id = None
        oversea_record_id = None
        oversea_record_value = None

        for record in records:
            if (
                record.domain_name == domain
                and record.line == "default"
                and record.rr == rr
                and record.type == "CNAME"
            ):
                default_cname_record_id = record.record_id
            if record.line == "oversea":
                oversea_record_id = record.record_id
                oversea_record_value = record.value
    except Exception as error:
        # # 错误 message
        # print(error.message)
        # # 诊断地址
        # print(error.data.get("Recommend"))
        # from alibabacloud_tea_util.client import Client as UtilClient
        # UtilClient.assert_as_string(error.message)
        return 500

    set_domain_record_status_request = (
        alidns_20150109_models.SetDomainRecordStatusRequest(
            record_id=default_cname_record_id, status="Disable"
        )
    )
    try:
        response = client.set_domain_record_status_with_options(
            set_domain_record_status_request, util_models.RuntimeOptions()
        )
        if response.status_code != 200:
            return 500
    except Exception as error:
        return 500

    update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest(
        record_id=oversea_record_id,
        rr=rr,
        type="CNAME",
        line="default",
        value=oversea_record_value,
    )
    try:
        response = client.update_domain_record_with_options(
            update_domain_record_request, util_models.RuntimeOptions()
        )
        if response.status_code != 200:
            return 500
    except Exception as error:
        return 500


def enable_default_convert_to_oversea(args: List[str]) -> None | int:
    """
    启用默认CNAME记录,将当前默认记录转为海外线路
    """
    client = create_client()
    add_custom_line_request = alidns_20150109_models.DescribeDomainRecordsRequest(
        lang="zh", domain_name=domain, rrkey_word=rr
    )
    try:
        response = client.describe_domain_records(add_custom_line_request)
        if response.status_code != 200:
            return 500

        records = response.body.domain_records.record
        default_cname_record_id = None
        oversea_record_id = None
        oversea_record_value = None

        for record in records:
            # print(record.status)
            if (
                record.domain_name == domain
                and record.line == "default"
                and record.rr == rr
                and record.type == "CNAME"
                and record.status == "DISABLE"
            ):
                default_cname_record_id = record.record_id
            if (
                record.domain_name == domain
                and record.line == "default"
                and record.rr == rr
                and record.type == "CNAME"
                and record.status == "ENABLE"
            ):
                oversea_record_id = record.record_id
                oversea_record_value = record.value
    except Exception as error:
        return 500
    set_domain_record_status_request = (
        alidns_20150109_models.SetDomainRecordStatusRequest(
            record_id=default_cname_record_id, status="Enable"
        )
    )
    try:
        response = client.set_domain_record_status_with_options(
            set_domain_record_status_request, util_models.RuntimeOptions()
        )
        if response.status_code != 200:
            return 500
    except Exception as error:
        return 500

    update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest(
        record_id=oversea_record_id,
        rr=rr,
        type="CNAME",
        line="oversea",
        value=oversea_record_value,
    )
    try:
        response = client.update_domain_record_with_options(
            update_domain_record_request, util_models.RuntimeOptions()
        )
        if response.status_code != 200:
            return 500
    except Exception as error:
        return 500


def cdn_offline_monitor(domain_info) -> str:
    if domain_info["status"] not in ("offline"):
        print("CDN 状态: 在线")
        return "offline"
    else:
        print("检测到 CDN 已离线")
        if disable_default_enable_oversea(sys.argv[1:]) is not None:
            print("DNS 修改失败,暂停 1s 后重试")
            time.sleep(1)
        print("开始监控域名上线状态...")
        return "online"


def cdn_online_monitor(domain_info) -> str:
    if domain_info["status"] not in ("online"):
        print("CDN 状态: 离线")
        return "online"
    else:
        print("检测到 CDN 已上线")
        if enable_default_convert_to_oversea(sys.argv[1:]) is not None:
            print("DNS 修改失败,暂停 1s 后重试")
            time.sleep(1)
        print("开始监控域名离线状态...")
        return "offline"


def domain_info_handler(domain_info, monitoring_mode) -> str:
    # 检测离线模式
    if monitoring_mode == "offline":
        return cdn_offline_monitor(domain_info)
    # 检测上线模式
    elif monitoring_mode == "online":
        return cdn_online_monitor(domain_info)


if __name__ == "__main__":
    # 定义当前监控模式:'offline' 监控是否离线,'online' 监控是否上线
    monitoring_mode = "offline"

    while True:
        for domain_info in dogecloud_api("/cdn/domain/list.json")["data"]["domains"]:
            if domain_info["name"] != full_domain:
                continue

            monitoring_mode = domain_info_handler(domain_info, monitoring_mode)

        time.sleep(3)  # 多吉云 API 限制 30000 次 / 日

附录

相关文章:混合云架构下的动态 IPv6 智能加速实践:从家庭网络到全球分发的低成本部署指南