Skip to content

fix code quality issues from previous PRs #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ testing =
black
isort
flake8
aiodns
mqtt =
aiomqtt<=0.1.3
certifi
Expand Down
7 changes: 7 additions & 0 deletions src/aleph/sdk/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class Settings(BaseSettings):

CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists

# Dns resolver
DNS_IPFS_DOMAIN = "ipfs.public.aleph.sh"
DNS_PROGRAM_DOMAIN = "program.public.aleph.sh"
DNS_INSTANCE_DOMAIN = "instance.public.aleph.sh"
DNS_ROOT_DOMAIN = "static.public.aleph.sh"
DNS_RESOLVERS = ["1.1.1.1", "1.0.0.1"]

class Config:
env_prefix = "ALEPH_"
case_sensitive = False
Expand Down
141 changes: 141 additions & 0 deletions src/aleph/sdk/domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import re
from typing import Optional

import aiodns

from aleph.sdk.exceptions import DomainConfigurationError

from .conf import settings


class AlephDNS:
def __init__(self):
self.resolver = aiodns.DNSResolver(servers=settings.DNS_RESOLVERS)
self.fqdn_matcher = re.compile(r"https?://?")

async def query(self, name: str, query_type: str):
try:
return await self.resolver.query(name, query_type)
except Exception as e:
print(e)
return None

def url_to_domain(self, url):
return self.fqdn_matcher.sub("", url).strip().strip("/")

async def get_ipv6_address(self, url: str):
domain = self.url_to_domain(url)
ipv6 = []
query = await self.query(domain, "AAAA")
if query:
for entry in query:
ipv6.append(entry.host)
return ipv6

async def get_dnslink(self, url: str):
domain = self.url_to_domain(url)
query = await self.query(f"_dnslink.{domain}", "TXT")
if query is not None and len(query) > 0:
return query[0].text

async def check_domain_configured(self, domain, target, owner):
try:
print("Check...", target)
return await self.check_domain(domain, target, owner)
except Exception as error:
raise DomainConfigurationError(error)

async def check_domain(self, url: str, target: str, owner: Optional[str] = None):
status = {"cname": False, "owner_proof": False}

target = target.lower()

dns_rules = self.get_required_dns_rules(url, target, owner)

for dns_rule in dns_rules:
status[dns_rule["rule_name"]] = False

record_name = dns_rule["dns"]["name"]
record_type = dns_rule["dns"]["type"]
record_value = dns_rule["dns"]["value"]

res = await self.query(record_name, record_type.upper())

if record_type == "txt":
found = False

for _res in res:
if hasattr(_res, "text") and _res.text == record_value:
found = True

if not found:
raise DomainConfigurationError(
(dns_rule["info"], dns_rule["on_error"], status)
)

elif (
res is None
or not hasattr(res, record_type)
or getattr(res, record_type) != record_value
):
raise DomainConfigurationError(
(dns_rule["info"], dns_rule["on_error"], status)
)

status[dns_rule["rule_name"]] = True

return status

def get_required_dns_rules(self, url, target, owner: Optional[str] = None):
domain = self.url_to_domain(url)
target = target.lower()
dns_rules = []

if target == "ipfs":
cname_value = settings.DNS_IPFS_DOMAIN
elif target == "program":
cname_value = settings.DNS_PROGRAM_DOMAIN
elif target == "instance":
cname_value = f"{domain}.{settings.DNS_INSTANCE_DOMAIN}"

# cname rule
dns_rules.append(
{
"rule_name": "cname",
"dns": {"type": "cname", "name": domain, "value": cname_value},
"info": f"Create a CNAME record for {domain} with value {cname_value}",
"on_error": f"CNAME record not found: {domain}",
}
)

if target == "ipfs":
# ipfs rule
dns_rules.append(
{
"rule_name": "delegation",
"dns": {
"type": "cname",
"name": f"_dnslink.{domain}",
"value": f"_dnslink.{domain}.{settings.DNS_ROOT_DOMAIN}",
},
"info": f"Create a CNAME record for _dnslink.{domain} with value _dnslink.{domain}.{settings.DNS_ROOT_DOMAIN}",
"on_error": f"CNAME record not found: _dnslink.{domain}",
}
)

if owner:
# ownership rule
dns_rules.append(
{
"rule_name": "owner_proof",
"dns": {
"type": "txt",
"name": f"_control.{domain}",
"value": owner,
},
"info": f"Create a TXT record for _control.{domain} with value = owner address",
"on_error": "Owner address mismatch",
}
)

return dns_rules
5 changes: 5 additions & 0 deletions src/aleph/sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ class FileTooLarge(Exception):
"""

pass


class DomainConfigurationError(Exception):
"Raised when the domain checks are not satisfied"
pass
2 changes: 2 additions & 0 deletions src/aleph/sdk/wallets/ledger/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def from_address(
"""
device = device or init_dongle()
account = find_account(address=address, dongle=device, count=5)
if not account:
return None
return LedgerETHAccount(
account=account,
device=device,
Expand Down
48 changes: 48 additions & 0 deletions tests/unit/test_domains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest

from aleph.sdk.domain import AlephDNS
from aleph.sdk.exceptions import DomainConfigurationError


@pytest.mark.asyncio
async def test_url_to_domain():
alephdns = AlephDNS()
domain = alephdns.url_to_domain("https://aleph.im")
query = await alephdns.query(domain, "A")
assert query is not None
assert len(query) > 0
assert hasattr(query[0], "host")


@pytest.mark.asyncio
async def test_get_ipv6_address():
alephdns = AlephDNS()
url = "https://aleph.im"
ipv6_address = await alephdns.get_ipv6_address(url)
assert ipv6_address is not None
assert len(ipv6_address) > 0
assert ":" in ipv6_address[0]


@pytest.mark.asyncio
async def test_dnslink():
alephdns = AlephDNS()
url = "https://aleph.im"
dnslink = await alephdns.get_dnslink(url)
assert dnslink is not None


@pytest.mark.asyncio
async def test_configured_domain():
alephdns = AlephDNS()
url = "https://custom-domain-unit-test.aleph.sh"
status = await alephdns.check_domain(url, "ipfs", "0xfakeaddress")
assert type(status) is dict


@pytest.mark.asyncio
async def test_not_configured_domain():
alephdns = AlephDNS()
url = "https://not-configured-domain.aleph.sh"
with pytest.raises(DomainConfigurationError):
await alephdns.check_domain(url, "ipfs", "0xfakeaddress")