# coding=utf-8
"""
Tencent Cloud EdgeOne API
腾讯云 EdgeOne (边缘安全速平台) API
API Documentation: https://cloud.tencent.com/document/api/1552/80731
@author: NewFuture
"""
from ddns.provider._base import join_domain
from .tencentcloud import TencentCloudProvider
class EdgeOneNoAccerProvider(TencentCloudProvider):
"""
腾讯云 EdgeOne API 提供商
Tencent Cloud EdgeOne API Provider
"""
endpoint = "https://teo.tencentcloudapi.com"
# 腾讯云 EdgeOne API 配置
service = "teo"
version_date = "2022-09-01"
def _query_zone_id(self, domain):
# type: (str) -> str | None
"""查询域名的加速域名信息获取 ZoneId https://cloud.tencent.com/document/api/1552/80713"""
# 首先尝试直接查找域名
filters = [{"Name": "zone-name", "Values": [domain], "Fuzzy": False}] # type: Any
response = self._request("DescribeZones", Filters=filters)
if response and "Zones" in response:
for zone in response.get("Zones", []):
if zone.get("ZoneName") == domain:
zone_id = zone.get("ZoneId")
if zone_id:
self.logger.debug("Found acceleration domain %s with Zone ID: %s", domain, zone_id)
return zone_id
self.logger.debug("Acceleration domain not found for: %s", domain)
return None
def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
# type: (str, str, str, str, str | None, dict) -> dict | None
"""查询加速域名信息 https://cloud.tencent.com/document/api/1552/86336"""
domain = join_domain(subdomain, main_domain)
filters = [{"Name": "name", "Values": [domain], "Fuzzy": False}] # type: Any
response = self._request("DescribeDnsRecords", ZoneId=zone_id, Filters=filters)
if response and "DnsRecords" in response:
for domain_info in response.get("DnsRecords", []):
if domain_info.get("Name") == domain:
self.logger.debug("Found domain: %s", domain_info)
return domain_info
self.logger.warning("No domain found for: %s, response: %s", domain, response)
return None
def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
# type: (str, str, str, str, str, int, str | None, dict) -> bool
"""创建新的加速域名记录 https://cloud.tencent.com/document/api/1552/86338"""
domain = join_domain(subdomain, main_domain)
res = self._request("CreateDnsRecord", ZoneId=zone_id, Name=domain, Type=record_type, Content=value)
if res:
self.logger.info("domain created (%s)", res.get("RequestId"))
return True
self.logger.error("Failed to create domain, response: %s", res)
return False
def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
"""更新加速域名的源站 IP 地址 https://cloud.tencent.com/document/api/1552/86335"""
# domain = old_record.get("ModifyDnsRecords")
# 构建源站信息
new_record = {"RecordId": old_record.get("RecordId"), "Name": old_record.get("Name"), "Type": record_type, "Content": value}
response = self._request("ModifyDnsRecords", ZoneId=zone_id, DnsRecords=[new_record])
if response:
self.logger.info("domain updated (%s)", response.get("RequestId"))
return True
self.logger.error("Failed to update domain origin, response: %s", response)
return False
描述场景和问题 (Is your feature request related to a problem? Please describe)
当前edgeone的provider仅支持添加/更新加速域名,但将域名完全托管给edgeone后,也有更新/添加非加速域名的需求
解决方案或者思路 (Describe the solution you'd like)
创建一个EdgeOneNoAccerate的Provider,参考代码如下
考虑过的其他方案或者思路 (Describe alternatives you've considered)
增加选项,根据填写的选项选择使用哪种模式
补充说明 (Additional context)