Skip to content

Feature: Signature verification for Solana and Ethereum messages #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions src/aleph/sdk/chains/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
from aleph.sdk.conf import settings


def get_verification_buffer(message):
"""Returns a serialized string to verify the message integrity
(this is was it signed)
def get_verification_buffer(message: Dict) -> bytes:
"""
Returns the verification buffer that Aleph nodes use to verify the signature of a message.
Note:
The verification buffer is a string of the following format:
b"{chain}\\n{sender}\\n{type}\\n{item_hash}"
Args:
message: Message to get the verification buffer for
Returns:
bytes: Verification buffer
"""
return "{chain}\n{sender}\n{type}\n{item_hash}".format(**message).encode("utf-8")

Expand All @@ -27,8 +34,13 @@ class BaseAccount(ABC):
private_key: bytes

def _setup_sender(self, message: Dict) -> Dict:
"""Set the sender of the message as the account's public key.
"""
Set the sender of the message as the account's public key.
If a sender is already specified, check that it matches the account's public key.
Args:
message: Message to add the sender to
Returns:
Dict: Message with the sender set
"""
if not message.get("sender"):
message["sender"] = self.get_address()
Expand All @@ -40,24 +52,51 @@ def _setup_sender(self, message: Dict) -> Dict:

@abstractmethod
async def sign_message(self, message: Dict) -> Dict:
"""
Returns a signed message from an Aleph message.
Args:
message: Message to sign
Returns:
Dict: Signed message
"""
raise NotImplementedError

@abstractmethod
def get_address(self) -> str:
"""
Returns the account's displayed address.
"""
raise NotImplementedError

@abstractmethod
def get_public_key(self) -> str:
"""
Returns the account's public key.
"""
raise NotImplementedError

async def encrypt(self, content) -> bytes:
async def encrypt(self, content: bytes) -> bytes:
"""
Encrypts a message using the account's public key.
Args:
content: Content bytes to encrypt
Returns:
bytes: Encrypted content as bytes
"""
if self.CURVE == "secp256k1":
value: bytes = encrypt(self.get_public_key(), content)
return value
else:
raise NotImplementedError

async def decrypt(self, content) -> bytes:
async def decrypt(self, content: bytes) -> bytes:
"""
Decrypts a message using the account's private key.
Args:
content: Content bytes to decrypt
Returns:
bytes: Decrypted content as bytes
"""
if self.CURVE == "secp256k1":
value: bytes = decrypt(self.private_key, content)
return value
Expand Down
10 changes: 10 additions & 0 deletions src/aleph/sdk/chains/cosmos.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import hashlib
import json
from typing import Union

import ecdsa
from cosmospy._wallet import privkey_to_address, privkey_to_pubkey
Expand Down Expand Up @@ -80,3 +81,12 @@ def get_public_key(self) -> str:

def get_fallback_account(hrp=DEFAULT_HRP):
return CSDKAccount(private_key=get_fallback_private_key(), hrp=hrp)


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
) -> bool:
"""TODO: Implement this"""
raise NotImplementedError("Not implemented yet")
40 changes: 39 additions & 1 deletion src/aleph/sdk/chains/ethereum.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from pathlib import Path
from typing import Dict, Optional
from typing import Dict, Optional, Union

from eth_account import Account
from eth_account.messages import encode_defunct
from eth_account.signers.local import LocalAccount
from eth_keys.exceptions import BadSignature as EthBadSignatureError

from ..exceptions import BadSignatureError
from .common import (
BaseAccount,
get_fallback_private_key,
Expand Down Expand Up @@ -41,3 +43,39 @@ def get_public_key(self) -> str:

def get_fallback_account(path: Optional[Path] = None) -> ETHAccount:
return ETHAccount(private_key=get_fallback_private_key(path=path))


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
):
"""
Verifies a signature.
Args:
signature: The signature to verify. Can be a hex encoded string or bytes.
public_key: The sender's public key to use for verification. Can be a checksummed, hex encoded string or bytes.
message: The message to verify. Can be an utf-8 string or bytes.
Raises:
BadSignatureError: If the signature is invalid.
"""
if isinstance(signature, str):
if signature.startswith("0x"):
signature = signature[2:]
signature = bytes.fromhex(signature)
else:
if signature.startswith(b"0x"):
signature = signature[2:]
signature = bytes.fromhex(signature.decode("utf-8"))
if isinstance(public_key, bytes):
public_key = "0x" + public_key.hex()
if isinstance(message, bytes):
message = message.decode("utf-8")

message_hash = encode_defunct(text=message)
try:
address = Account.recover_message(message_hash, signature=signature)
if address != public_key:
raise BadSignatureError
except (EthBadSignatureError, BadSignatureError) as e:
raise BadSignatureError from e
11 changes: 10 additions & 1 deletion src/aleph/sdk/chains/nuls1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import struct
from binascii import hexlify, unhexlify
from typing import Optional
from typing import Optional, Union

from coincurve.keys import PrivateKey, PublicKey

Expand Down Expand Up @@ -324,3 +324,12 @@ def get_public_key(self):

def get_fallback_account(chain_id=8964):
return NULSAccount(private_key=get_fallback_private_key(), chain_id=chain_id)


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
) -> bool:
"""TODO: Implement this"""
raise NotImplementedError("Not implemented yet")
10 changes: 10 additions & 0 deletions src/aleph/sdk/chains/nuls2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
from typing import Union

from nuls2.model.data import (
NETWORKS,
Expand Down Expand Up @@ -60,3 +61,12 @@ def get_public_key(self):
def get_fallback_account(chain_id=1):
acc = NULSAccount(private_key=get_fallback_private_key(), chain_id=chain_id)
return acc


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
) -> bool:
"""TODO: Implement this"""
raise NotImplementedError("Not implemented yet")
32 changes: 30 additions & 2 deletions src/aleph/sdk/chains/sol.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json
import os
from pathlib import Path
from typing import Dict, Optional
from typing import Dict, Optional, Union

import base58
from nacl.exceptions import BadSignatureError as NaclBadSignatureError
from nacl.public import PrivateKey, SealedBox
from nacl.signing import SigningKey
from nacl.signing import SigningKey, VerifyKey

from ..conf import settings
from ..exceptions import BadSignatureError
from .common import BaseAccount, get_verification_buffer


Expand Down Expand Up @@ -81,3 +83,29 @@ def get_fallback_private_key(path: Optional[Path] = None) -> bytes:
# Create a symlink to use this key by default
os.symlink(path, default_key_path)
return private_key


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
):
"""
Verifies a signature.
Args:
signature: The signature to verify. Can be a base58 encoded string or bytes.
public_key: The public key to use for verification. Can be a base58 encoded string or bytes.
message: The message to verify. Can be an utf-8 string or bytes.
Raises:
BadSignatureError: If the signature is invalid.
"""
if isinstance(signature, str):
signature = base58.b58decode(signature)
if isinstance(message, str):
message = message.encode("utf-8")
if isinstance(public_key, str):
public_key = base58.b58decode(public_key)
try:
VerifyKey(public_key).verify(message, signature)
except NaclBadSignatureError as e:
raise BadSignatureError from e
10 changes: 10 additions & 0 deletions src/aleph/sdk/chains/substrate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from typing import Union

from substrateinterface import Keypair

Expand Down Expand Up @@ -45,3 +46,12 @@ def get_fallback_mnemonics():
prvfile.write(mnemonic)

return mnemonic


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
) -> bool:
"""TODO: Implement this"""
raise NotImplementedError("Not implemented yet")
23 changes: 22 additions & 1 deletion src/aleph/sdk/chains/tezos.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Dict, Optional
from typing import Dict, Optional, Union

from aleph_pytezos.crypto.key import Key
from nacl.public import SealedBox
Expand Down Expand Up @@ -48,3 +48,24 @@ async def decrypt(self, content) -> bytes:

def get_fallback_account(path: Optional[Path] = None) -> TezosAccount:
return TezosAccount(private_key=get_fallback_private_key(path=path))


def verify_signature(
signature: Union[bytes, str],
public_key: Union[bytes, str],
message: Union[bytes, str],
) -> bool:
"""
Verify a signature using the public key (hash) of a tezos account.

Note: It requires the public key hash (sp, p2, ed-prefix), not the address (tz1, tz2 prefix)!
Args:
signature: The signature to verify. Can be a base58 encoded string or bytes.
public_key: The public key (hash) of the account. Can be a base58 encoded string or bytes.
message: The message that was signed. Is a sequence of bytes in raw format or hexadecimal notation.
"""
key = Key.from_encoded_key(public_key)
try:
return key.verify(signature, message)
except ValueError:
return False
8 changes: 8 additions & 0 deletions src/aleph/sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ class InvalidMessageError(BroadcastError):
"""

pass


class BadSignatureError(Exception):
"""
The signature of a message is invalid.
"""

pass
46 changes: 45 additions & 1 deletion tests/unit/test_chain_ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import pytest

from aleph.sdk.chains.ethereum import get_fallback_account
from aleph.sdk.chains.common import get_verification_buffer
from aleph.sdk.chains.ethereum import get_fallback_account, verify_signature
from aleph.sdk.exceptions import BadSignatureError


@dataclass
Expand Down Expand Up @@ -42,6 +44,48 @@ async def test_ETHAccount(ethereum_account):
assert len(pubkey) == 68


@pytest.mark.asyncio
async def test_verify_signature(ethereum_account):
account = ethereum_account

message = asdict(
Message(
"ETH",
account.get_address(),
"POST",
"SomeHash",
)
)
await account.sign_message(message)
assert message["signature"]

verify_signature(
message["signature"], message["sender"], get_verification_buffer(message)
)


@pytest.mark.asyncio
async def test_verify_signature_with_forged_signature(ethereum_account):
account = ethereum_account

message = asdict(
Message(
"ETH",
account.get_address(),
"POST",
"SomeHash",
)
)
await account.sign_message(message)
assert message["signature"]

forged_signature = "0x" + "0" * 130
with pytest.raises(BadSignatureError):
verify_signature(
forged_signature, message["sender"], get_verification_buffer(message)
)


@pytest.mark.asyncio
async def test_decrypt_secp256k1(ethereum_account):
account = ethereum_account
Expand Down
Loading