方案价值与应用场景
防御性场景 – DDoS 攻击流量过载保护
当你的 CDN 被恶意攻击,触发流量封顶限制。程序会将 DNS 指向 Cloudflare,保证站点访问的同时利用 Cloudflare 的防火墙功能抵御攻击。
当你做好 IP 屏蔽,重新启用 CDN 时,程序会自动恢复原本的国内外分流状态。
运维性场景 – 基础设施维护无缝切换
当你需要关闭 CDN/切换 CDN 时,程序会将 DNS 指向 Cloudflare 保证站点访问。
当你重新启用 CDN 时,程序会自动恢复原本的国内外分流状态。
前提条件
已部署 DNS 国内外分流
DNS 国内外分流 简易设置教程
- 目标域名使用国内 DNS(如阿里 DNS)
- 目标域名配置好境内 CDN
- 目标域名借助第二域名配置好 Cloudflare for SaaS
- DNS 设置国内线路 cname 解析值设定为 CDN 提供的域名
- DNS 设置海外线路 cname 解析值设定为 Cloudflare for SaaS 提供的域名
配置好后的架构图如下所示
图像生成源码
@startuml
left to right direction
actor 访客 as visitor
package "互联网" {
note "域名" as site_address #orange
node "DNS服务器" as dns {
component "境内 cname 解析" as cname_cn
component "境外 cname 解析" as cname_intl
}
}
node "境内CDN" as cdn #lightblue {
component "CDN 提供的域名" as cdn_address #orange
}
node "Cloudflare" as cf #skyblue {
component "Cloudflare for SaaS(自定义主机名)提供的域名" as cf_address #orange
}
visitor --> site_address : 访问站点
site_address --> dns : DNS查询
dns --> cname_cn : 境内解析分流
dns --> cname_intl : 境外解析分流
cname_cn --> cdn_address
cname_intl --> cf_address
@enduml
程序架构设计
代码流程图
title: CDN状态切换与DNS配置流程图
—
%%{init: { “theme”: “light” } }%%
flowchart TD
A([开始]) –> B[程序初始化]
B –> C(进入CDN停用监测)
C –> D{CDN是否停用?}
D –>|是| E[执行操作\n关闭 DNS 分流]
E –> F[暂停国内默认CNAME记录\n海外CNAME记录的线路改为默认]
F –> G(进入CDN启用监测)
D –>|否| H[等待后重新监测]
H –> D
G –> I{CDN是否启用?}
I –>|是| J[执行操作\n启用 DNS 分流]
J –> K[国内线路改为海外\n启用原国内解析]
K –> C
I –>|否| L[等待后重新监测]
L –> I
classDef action fill:#f9f,stroke:#333;
classDef condition fill:#ccf,stroke:#333;
class E,F,J,K action;
class D,I condition;
title: CDN状态切换与DNS配置流程图
—
%%{init: {
“theme”: “dark”,
“themeVariables”: {
“primaryColor”: “#1a1a1a”,
“edgeLabelBackground”:”#2D2D2D”
}
}}%%
flowchart TD
A([开始]) –> B[程序初始化]
B –> C(进入CDN停用监测)
C –> D{CDN是否停用?}
D –>|是| E[执行操作\n关闭 DNS 分流]
E –> F[暂停国内默认CNAME记录\n海外CNAME记录的线路改为默认]
F –> G(进入CDN启用监测)
D –>|否| H[等待后重新监测]
H –> D
G –> I{CDN是否启用?}
I –>|是| J[执行操作\n启用 DNS 分流]
J –> K[国内线路改为海外\n启用原国内解析]
K –> C
I –>|否| L[等待后重新监测]
L –> I
classDef action fill:#6366f1,stroke:#374151,stroke-width:2px,color:#fff;
classDef condition fill:#14b8a6,stroke:#374151,stroke-width:2px,color:#fff;
class E,F,J,K action;
class D,I condition;
程序监测的情况
触发停用监测的情况
- 攻击者消耗 CDN 流量达到上限,触发设置的流量封顶限制,自动停用。
- 手动停用。
触发启用监测的情况
- 手动启用。
以下情况不会触发监测
- 修改 CDN 配置,CDN 会标注为
部署中
状态,此时不会触发监测。
代码分享及部署教程
由于我使用的是多吉云 CDN + 阿里 DNS,此处脚本使用的是多吉云 CDN 和阿里云提供的 API。
如您有需要适配其他 CDN 和 DNS 的需求,请在此评论区留言,或前往关于页寻找电子邮箱地址留言。适配后我将第一时间通过邮件的方式通知您。
使用方法:
-
安装 Python(3.12 或更高版本)
-
安装所需的包
pip install alibabacloud_alidns20150109==3.5.7
- 镜像加速下载:
pip install alibabacloud_alidns20150109==3.5.7 -i
-
创建
env.toml
复制粘贴以下内容
dogecloud_access_key = 'xxxxxxxxxxxxxxxx'
dogecloud_secret_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
aliyun_accesskey_id = 'XXXXXXXXXXXXXXXXXXXXXXXX'
aliyun_accesskey_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
domain = 'example.com'
rr="@"
-
创建并获取多吉云 AccessKey
-
创建并获取阿里云 AccessKey
-
填写配置文件
dogecloud_access_key
填写多吉云 AccessKeydogecloud_secret_key
填写多吉云 SecretKeyaliyun_accesskey_id
填写阿里云 AccessKey IDaliyun_accesskey_secret
填写阿里云 AccessKey Secretdomain
填写你的主域名rr
填写你域名的 RR 字段- 举例:
- example.com -> domain 填
example.com
, rr 填@
- blog.example.com -> domain 填
example.com
, rr 填blog
- example.com -> domain 填
-
将以下代码复制到
main.py
,执行即可。
点我展开折叠代码
警告:此代码 dogecloud_api
为多吉云官方提供的 SDK 代码,create_client
、disable_default_enable_oversea
、enable_default_convert_to_oversea
是由阿里云自动生成得来。笔者花费了 27 分钟将这些 SDK 粘在一起。代码中有无法解释的注释/风格混乱是意料之中的情况。
import hmac
import json
import sys
import time
import tomllib
import urllib
from hashlib import sha1
from typing import List
import requests
from alibabacloud_alidns20150109 import models as alidns_20150109_models
from alibabacloud_alidns20150109.client import Client as Alidns20150109Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
with open("env.toml", "rb") as f:
config: dict = tomllib.load(f)
dogecloud_access_key = config["dogecloud_access_key"]
dogecloud_secret_key = config["dogecloud_secret_key"]
aliyun_accesskey_id = config["aliyun_accesskey_id"]
aliyun_accesskey_secret = config["aliyun_accesskey_secret"]
domain = config["domain"]
rr = config["rr"]
if rr == "@":
full_domain = domain
else:
full_domain = rr + "." + domain
def dogecloud_api(api_path, data={}, json_mode=False):
"""
调用多吉云API
:param api_path: 调用的 API 接口地址,包含 URL 请求参数 QueryString,例如:/console/vfetch/add.json?url=xxx&a=1&b=2
:param data: POST 的数据,字典,例如 {'a': 1, 'b': 2},传递此参数表示不是 GET 请求而是 POST 请求
:param json_mode: 数据 data 是否以 JSON 格式请求,默认为 false 则使用表单形式(a=1&b=2)
:type api_path: string
:type data: dict
:type json_mode bool
:return dict: 返回的数据
"""
with open("env.toml", "rb") as f:
config = tomllib.load(f)
access_key = config["dogecloud_access_key"]
secret_key = config["dogecloud_secret_key"]
body = ""
mime = ""
if json_mode:
body = json.dumps(data)
mime = "application/json"
else:
body = urllib.parse.urlencode(data)
mime = "application/x-www-form-urlencoded"
sign_str = api_path + "\n" + body
signed_data = hmac.new(secret_key.encode("utf-8"), sign_str.encode("utf-8"), sha1)
sign = signed_data.digest().hex()
authorization = "TOKEN " + access_key + ":" + sign
response = requests.post(
" + api_path,
data=body,
headers={"Authorization": authorization, "Content-Type": mime},
)
return response.json()
def create_client() -> Alidns20150109Client:
"""
使用AK&SK初始化账号Client
@return: Client
@throws Exception
"""
with open("env.toml", "rb") as f:
config = tomllib.load(f)
config = open_api_models.Config(
access_key_id=config["aliyun_accesskey_id"],
access_key_secret=config["aliyun_accesskey_secret"],
)
# Endpoint 请参考
config.endpoint = f"alidns.cn-huhehaote.aliyuncs.com"
return Alidns20150109Client(config)
def disable_default_enable_oversea(args: List[str]) -> None | int:
"""
禁用默认CNAME记录,将海外记录设为默认线路
"""
client = create_client()
add_custom_line_request = alidns_20150109_models.DescribeDomainRecordsRequest(
lang="zh", domain_name=domain, rrkey_word=rr
)
try:
response = client.describe_domain_records(add_custom_line_request)
if response.status_code != 200:
return 500
records = response.body.domain_records.record
default_cname_record_id = None
oversea_record_id = None
oversea_record_value = None
for record in records:
if (
record.domain_name == domain
and record.line == "default"
and record.rr == rr
and record.type == "CNAME"
):
default_cname_record_id = record.record_id
if record.line == "oversea":
oversea_record_id = record.record_id
oversea_record_value = record.value
except Exception as error:
# # 错误 message
# print(error.message)
# # 诊断地址
# print(error.data.get("Recommend"))
# from alibabacloud_tea_util.client import Client as UtilClient
# UtilClient.assert_as_string(error.message)
return 500
set_domain_record_status_request = (
alidns_20150109_models.SetDomainRecordStatusRequest(
record_id=default_cname_record_id, status="Disable"
)
)
try:
response = client.set_domain_record_status_with_options(
set_domain_record_status_request, util_models.RuntimeOptions()
)
if response.status_code != 200:
return 500
except Exception as error:
return 500
update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest(
record_id=oversea_record_id,
rr=rr,
type="CNAME",
line="default",
value=oversea_record_value,
)
try:
response = client.update_domain_record_with_options(
update_domain_record_request, util_models.RuntimeOptions()
)
if response.status_code != 200:
return 500
except Exception as error:
return 500
def enable_default_convert_to_oversea(args: List[str]) -> None | int:
"""
启用默认CNAME记录,将当前默认记录转为海外线路
"""
client = create_client()
add_custom_line_request = alidns_20150109_models.DescribeDomainRecordsRequest(
lang="zh", domain_name=domain, rrkey_word=rr
)
try:
response = client.describe_domain_records(add_custom_line_request)
if response.status_code != 200:
return 500
records = response.body.domain_records.record
default_cname_record_id = None
oversea_record_id = None
oversea_record_value = None
for record in records:
# print(record.status)
if (
record.domain_name == domain
and record.line == "default"
and record.rr == rr
and record.type == "CNAME"
and record.status == "DISABLE"
):
default_cname_record_id = record.record_id
if (
record.domain_name == domain
and record.line == "default"
and record.rr == rr
and record.type == "CNAME"
and record.status == "ENABLE"
):
oversea_record_id = record.record_id
oversea_record_value = record.value
except Exception as error:
return 500
set_domain_record_status_request = (
alidns_20150109_models.SetDomainRecordStatusRequest(
record_id=default_cname_record_id, status="Enable"
)
)
try:
response = client.set_domain_record_status_with_options(
set_domain_record_status_request, util_models.RuntimeOptions()
)
if response.status_code != 200:
return 500
except Exception as error:
return 500
update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest(
record_id=oversea_record_id,
rr=rr,
type="CNAME",
line="oversea",
value=oversea_record_value,
)
try:
response = client.update_domain_record_with_options(
update_domain_record_request, util_models.RuntimeOptions()
)
if response.status_code != 200:
return 500
except Exception as error:
return 500
def cdn_offline_monitor(domain_info) -> str:
if domain_info["status"] not in ("offline"):
print("CDN 状态: 在线")
return "offline"
else:
print("检测到 CDN 已离线")
if disable_default_enable_oversea(sys.argv[1:]) is not None:
print("DNS 修改失败,暂停 1s 后重试")
time.sleep(1)
print("开始监控域名上线状态...")
return "online"
def cdn_online_monitor(domain_info) -> str:
if domain_info["status"] not in ("online"):
print("CDN 状态: 离线")
return "online"
else:
print("检测到 CDN 已上线")
if enable_default_convert_to_oversea(sys.argv[1:]) is not None:
print("DNS 修改失败,暂停 1s 后重试")
time.sleep(1)
print("开始监控域名离线状态...")
return "offline"
def domain_info_handler(domain_info, monitoring_mode) -> str:
# 检测离线模式
if monitoring_mode == "offline":
return cdn_offline_monitor(domain_info)
# 检测上线模式
elif monitoring_mode == "online":
return cdn_online_monitor(domain_info)
if __name__ == "__main__":
# 定义当前监控模式:'offline' 监控是否离线,'online' 监控是否上线
monitoring_mode = "offline"
while True:
for domain_info in dogecloud_api("/cdn/domain/list.json")["data"]["domains"]:
if domain_info["name"] != full_domain:
continue
monitoring_mode = domain_info_handler(domain_info, monitoring_mode)
time.sleep(3) # 多吉云 API 限制 30000 次 / 日
附录
相关文章:混合云架构下的动态 IPv6 智能加速实践:从家庭网络到全球分发的低成本部署指南