-
Notifications
You must be signed in to change notification settings - Fork 5
Feature : AlephDNS (2nd attempt) #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
f8e110c
Feature : AlephDNS (#47)
1yam c276470
Fix: Reformat with `black`
hoh 24900ff
minor change on dns settings
aliel 525366f
avoid not valid local variable in certain conditions
aliel 967b7b4
add method to retreive txt values
aliel f27e002
fix dns record check
aliel 73c6b6b
fix program cname value
aliel 9e3a651
Refactor: str -> Enum for DNS target
hoh f20590d
Refactor: Add types to arguments
hoh 50cd4f2
Fix: variable did not exist
hoh 828576c
Add typing
hoh 8d88625
Cleanup: imports with isort
hoh 30e842e
Cleanup: typing
hoh 1fa6f5c
Refactor: domain_from_url
hoh 7642ea4
Refactor: use a generator
hoh 720c0bd
Cleanup: Add docstring and property
hoh 3c747f0
WIP: Refactor and cleanup domain related code
hoh 2409cd0
[dns] let checks continue
aliel 779e01a
fix hostname_from_url when it's already a hostname
aliel ec263bf
remove unused import
aliel d79d4f1
speedup dns detection using authoritative ns server
aliel a75a7ae
fix cname to target
aliel 282a287
fix cond
aliel 3e0c98f
fix while loop
aliel f94e30c
Fix typing; add docs; refactor for clarity
MHHukiewitz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,294 @@ | ||
import logging | ||
from enum import Enum | ||
from ipaddress import IPv4Address, IPv6Address | ||
from typing import Any, AsyncIterable, Dict, Iterable, List, NewType, Optional, Union | ||
from urllib.parse import urlparse | ||
|
||
import aiodns | ||
from pydantic import BaseModel, HttpUrl | ||
|
||
from .conf import settings | ||
from .exceptions import DomainConfigurationError | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
Hostname = NewType("Hostname", str) | ||
|
||
|
||
class TargetType(str, Enum): | ||
""" | ||
The type of target that a domain points to. | ||
|
||
- IPFS: The domain points to an IPFS hash. | ||
- PROGRAM: The domain points to an aleph.im program. | ||
- INSTANCE: The domain points to an aleph.im instance. | ||
""" | ||
|
||
IPFS = "ipfs" | ||
PROGRAM = "program" | ||
INSTANCE = "instance" | ||
|
||
|
||
class DNSRule(BaseModel): | ||
""" | ||
A DNS rule is a DNS record that is required for a domain to be configured. | ||
|
||
Args: | ||
name: The name of the rule. | ||
dns: The DNS record. | ||
info: Instructions to configure the DNS record. | ||
on_error: Error message when the rule is not found. | ||
""" | ||
|
||
name: str | ||
dns: Dict[str, Any] | ||
info: str | ||
on_error: str | ||
|
||
def raise_error(self, status: Dict[str, bool]): | ||
raise DomainConfigurationError((self.info, self.on_error, status)) | ||
|
||
|
||
def hostname_from_url(url: Union[HttpUrl, str]) -> Hostname: | ||
"""Extract FQDN from url""" | ||
|
||
parsed = urlparse(url) | ||
if all([parsed.scheme, parsed.netloc]) is True: | ||
url = parsed.netloc | ||
|
||
return Hostname(url) | ||
|
||
|
||
async def get_target_type(fqdn: Hostname) -> Optional[TargetType]: | ||
"""Returns the aleph.im target type of the domain""" | ||
domain_validator = DomainValidator() | ||
resolver = await domain_validator.get_resolver_for(fqdn) | ||
try: | ||
entry = await resolver.query(fqdn, "CNAME") | ||
cname = getattr(entry, "cname") | ||
if settings.DNS_IPFS_DOMAIN in cname: | ||
return TargetType.IPFS | ||
elif settings.DNS_PROGRAM_DOMAIN in cname: | ||
return TargetType.PROGRAM | ||
elif settings.DNS_INSTANCE_DOMAIN in cname: | ||
return TargetType.INSTANCE | ||
|
||
return None | ||
except aiodns.error.DNSError: | ||
return None | ||
|
||
|
||
class DomainValidator: | ||
""" | ||
Tools used to analyze domain names used on the aleph.im network. | ||
""" | ||
|
||
resolver: aiodns.DNSResolver | ||
|
||
def __init__(self): | ||
self.resolver = aiodns.DNSResolver(servers=settings.DNS_RESOLVERS) | ||
|
||
async def get_name_servers(self, hostname: Hostname): | ||
"""Get DNS name servers (NS) of a domain""" | ||
dns_servers = settings.DNS_RESOLVERS | ||
fqdn = hostname | ||
|
||
while True: | ||
# Detect and get authoritative NS of subdomains if delegated | ||
try: | ||
entries = await self.resolver.query(fqdn, "NS") | ||
servers: List[Any] = [] | ||
for entry in entries: | ||
servers += await self.get_ipv6_addresses(entry.host) | ||
servers += await self.get_ipv4_addresses(entry.host) | ||
|
||
dns_servers = servers | ||
break | ||
except aiodns.error.DNSError: | ||
sub_domains = fqdn.split(".") | ||
if len(sub_domains) > 2: | ||
fqdn = Hostname(".".join(sub_domains[1:])) | ||
continue | ||
|
||
if len(sub_domains) == 2: | ||
break | ||
except Exception as err: | ||
logger.debug(f"Unexpected {err=}, {type(err)=}") | ||
break | ||
|
||
return dns_servers | ||
|
||
async def get_resolver_for(self, hostname: Hostname): | ||
dns_servers = await self.get_name_servers(hostname) | ||
return aiodns.DNSResolver(servers=dns_servers) | ||
|
||
async def get_ipv4_addresses(self, hostname: Hostname) -> List[IPv4Address]: | ||
"""Returns all IPv4 addresses for a domain""" | ||
entries: Iterable = await self.resolver.query(hostname, "A") or [] | ||
return [entry.host for entry in entries] | ||
|
||
async def get_ipv6_addresses(self, hostname: Hostname) -> List[IPv6Address]: | ||
"""Returns all IPv6 addresses for a domain""" | ||
entries: Iterable = await self.resolver.query(hostname, "AAAA") or [] | ||
return [entry.host for entry in entries] | ||
|
||
async def get_dnslinks(self, hostname: Hostname) -> List[str]: | ||
"""Returns all DNSLink values for a domain.""" | ||
entries = await self.resolver.query(f"_dnslink.{hostname}", "TXT") | ||
return [entry.text for entry in entries] | ||
|
||
async def get_dnslink(self, hostname: Hostname) -> Optional[str]: | ||
"""Returns the DNSLink corresponding to a domain. | ||
|
||
Since it is possible to add multiple TXT records containing a DNSLink to | ||
the same domain, a behaviour has to be defined. | ||
|
||
- Some IPFS implementations might use the first valid dnslink= record they find. | ||
- Others might throw an error indicating that the DNSLink resolution is ambiguous due to multiple records. | ||
- Still, others might try to fetch content from all provided DNSLinks, | ||
though this behavior would be less common and may introduce overhead. | ||
""" | ||
dnslinks = await self.get_dnslinks(hostname) | ||
return dnslinks[0] if dnslinks else None | ||
|
||
async def get_txt_values( | ||
self, hostname: Hostname, delimiter: Optional[str] = None | ||
) -> AsyncIterable[str]: | ||
"""Returns all TXT values for a domain""" | ||
entries: Iterable = await self.resolver.query(hostname, "TXT") or [] | ||
for entry in entries: | ||
if not hasattr(entry, "text"): | ||
logger.debug("An entry does not have any text") | ||
continue | ||
if not entry.text.startswith("0x"): | ||
logger.debug("Does not look like an Ethereum address") | ||
continue | ||
|
||
if delimiter: | ||
for part in entry.text.split(delimiter): | ||
yield part | ||
else: | ||
yield entry.text | ||
|
||
async def check_domain( | ||
self, hostname: Hostname, target: TargetType, owner: Optional[str] = None | ||
) -> Dict: | ||
""" | ||
Checks that the domain points towards the given aleph.im target. | ||
|
||
Args: | ||
hostname: The hostname of the domain. | ||
target: The aleph.im target type. | ||
owner: The owner wallet address of the domain for ownership proof. | ||
|
||
Raises: | ||
DomainConfigurationError: If the domain is not configured. | ||
|
||
Returns: | ||
A dictionary containing the status of the domain configuration. | ||
""" | ||
status = {"cname": False, "owner_proof": False} | ||
|
||
dns_rules = self.get_required_dns_rules(hostname, target, owner) | ||
|
||
for dns_rule in dns_rules: | ||
status[dns_rule.name] = False | ||
|
||
record_name = dns_rule.dns["name"] | ||
record_type = dns_rule.dns["type"] | ||
record_value = dns_rule.dns["value"] | ||
|
||
try: | ||
resolver = await self.get_resolver_for(hostname) | ||
entries = await resolver.query(record_name, record_type.upper()) | ||
except aiodns.error.DNSError: | ||
# Continue checks | ||
entries = None | ||
|
||
if entries and record_type == "txt": | ||
for entry in entries: | ||
if hasattr(entry, "text") and entry.text == record_value: | ||
break | ||
else: | ||
return dns_rule.raise_error(status) | ||
elif ( | ||
entries is None | ||
or not hasattr(entries, record_type) | ||
or getattr(entries, record_type) != record_value | ||
): | ||
return dns_rule.raise_error(status) | ||
|
||
status[dns_rule.name] = True | ||
|
||
return status | ||
|
||
def get_required_dns_rules( | ||
self, hostname: Hostname, target: TargetType, owner: Optional[str] = None | ||
) -> List[DNSRule]: | ||
""" | ||
Returns the DNS rules (CNAME, TXT) required for a domain to be configured. | ||
|
||
Args: | ||
hostname: The hostname of the domain. | ||
target: The aleph.im target type. | ||
owner: The owner wallet address of the domain to add as an ownership proof. | ||
|
||
Returns: | ||
A list of DNS rules with | ||
""" | ||
target = target.lower() | ||
dns_rules = [] | ||
|
||
cname_value = None | ||
if target == TargetType.IPFS: | ||
cname_value = settings.DNS_IPFS_DOMAIN | ||
elif target == TargetType.PROGRAM: | ||
cname_value = f"{hostname}.{settings.DNS_PROGRAM_DOMAIN}" | ||
elif target == TargetType.INSTANCE: | ||
cname_value = f"{hostname}.{settings.DNS_INSTANCE_DOMAIN}" | ||
|
||
# cname rule | ||
dns_rules.append( | ||
DNSRule( | ||
name="cname", | ||
dns={ | ||
"type": "cname", | ||
"name": hostname, | ||
"value": cname_value, | ||
}, | ||
info=f"Create a CNAME record for {hostname} with value {cname_value}", | ||
on_error=f"CNAME record not found: {hostname}", | ||
) | ||
) | ||
MHHukiewitz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if target == TargetType.IPFS: | ||
# ipfs rule | ||
dns_rules.append( | ||
DNSRule( | ||
name="delegation", | ||
dns={ | ||
"type": "cname", | ||
"name": f"_dnslink.{hostname}", | ||
"value": f"_dnslink.{hostname}.{settings.DNS_STATIC_DOMAIN}", | ||
}, | ||
info=f"Create a CNAME record for _dnslink.{hostname} with value _dnslink.{hostname}.{settings.DNS_STATIC_DOMAIN}", | ||
on_error=f"CNAME record not found: _dnslink.{hostname}", | ||
) | ||
) | ||
|
||
if owner: | ||
# ownership rule | ||
dns_rules.append( | ||
DNSRule( | ||
name="owner_proof", | ||
dns={ | ||
"type": "txt", | ||
"name": f"_control.{hostname}", | ||
"value": owner, | ||
}, | ||
info=f"Create a TXT record for _control.{hostname} with value {owner}", | ||
on_error="Owner address mismatch", | ||
) | ||
) | ||
|
||
return dns_rules |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import pytest | ||
|
||
from aleph.sdk.domain import DomainValidator, TargetType, hostname_from_url | ||
from aleph.sdk.exceptions import DomainConfigurationError | ||
|
||
|
||
def test_hostname(): | ||
hostname = hostname_from_url("https://aleph.im") | ||
assert hostname == "aleph.im" | ||
hostname = hostname_from_url("aleph.im") | ||
assert hostname == "aleph.im" | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_query(): | ||
alephdns = DomainValidator() | ||
hostname = hostname_from_url("https://aleph.im") | ||
query = await alephdns.resolver.query(hostname, "A") | ||
assert query is not None | ||
assert len(query) > 0 | ||
assert hasattr(query[0], "host") | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_get_ipv6_address(): | ||
alephdns = DomainValidator() | ||
url = "https://aleph.im" | ||
hostname = hostname_from_url(url) | ||
ipv6_addresses = await alephdns.get_ipv6_addresses(hostname) | ||
assert ipv6_addresses is not None | ||
assert len(ipv6_addresses) > 0 | ||
assert ":" in str(ipv6_addresses[0]) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_dnslink(): | ||
alephdns = DomainValidator() | ||
url = "https://aleph.im" | ||
hostname = hostname_from_url(url) | ||
dnslink = await alephdns.get_dnslink(hostname) | ||
assert dnslink is not None | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_configured_domain(): | ||
alephdns = DomainValidator() | ||
url = "https://custom-domain-unit-test.aleph.sh" | ||
hostname = hostname_from_url(url) | ||
status = await alephdns.check_domain(hostname, TargetType.IPFS, "0xfakeaddress") | ||
assert type(status) is dict | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_not_configured_domain(): | ||
alephdns = DomainValidator() | ||
url = "https://not-configured-domain.aleph.sh" | ||
hostname = hostname_from_url(url) | ||
with pytest.raises(DomainConfigurationError): | ||
status = await alephdns.check_domain(hostname, TargetType.IPFS, "0xfakeaddress") | ||
assert type(status) is None |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.