Skip to content

Add a verifier for token status list, as well as issuer and verifier for bitstring status list. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f3487a1
chore: readme
dbluhm Jul 6, 2024
45f6931
feat: first attempt at verification for jwt token status list, minor …
Dec 6, 2024
e2a92bd
fix: basic jwt verifier implemented and tested
Dec 9, 2024
c4ba042
fix: create test using es256, signature verification now matches issu…
Dec 9, 2024
53372de
chore: cleanup
Dec 9, 2024
58d5de7
feat: cwb verification done and tested
Dec 9, 2024
a735835
feat: support for parsing referenced tokens
Dec 9, 2024
fec3807
chore: save progress
Dec 9, 2024
1ee324e
fix: slightly more functional nginx server
Dec 13, 2024
39cb236
fix: add nginx info
Dec 13, 2024
f4ea94a
fix: refactoring, functional web server container with correct pdm be…
Dec 13, 2024
d3f5938
fix: refactor
Dec 13, 2024
9f97f11
feat: web server now issues actual credentials
Dec 20, 2024
1cccb12
feat: verification from web server is done
Dec 20, 2024
049f9c2
feat: add helper methods to TokenStatusListVerifier for serialising a…
Dec 20, 2024
8ff624a
fix: update readme, minor refactor
Dec 20, 2024
c064ff4
feat: create initial bitstring status list issuer, actual issuance is…
Dec 22, 2024
b6a95a8
feat: implement jwt format for bitstring statuslist issuer
Dec 22, 2024
2436710
fix: minor fix
Jan 13, 2025
1d27806
feat: initial verifier for bitstring status list
Jan 14, 2025
28b9add
chore: rename tests
Jan 14, 2025
cd516c7
fix: add support for embedding proofs
Jan 16, 2025
8ebfebc
fix: tests for embedding proofs
Jan 17, 2025
d262be4
fix: bugfix, bitstring status list now supports lists with multiple-b…
Jan 18, 2025
d32c3b5
feat: add test for statusMessage feature
Jan 18, 2025
051295d
fix: add support for multibit statuses to bitstring status list
Jan 18, 2025
57df28a
fix: add test for advanced sign/verify for bitstring status list
Jan 18, 2025
3e65ff1
fix: update readme
Jan 18, 2025
a0f942a
Update README.md
Athan13 Jan 18, 2025
9399f8a
fix: refactor token status list verifier
Jan 26, 2025
31cd172
gerge branch 'feature/verifier-status-list' of github.com:Indicio-tec…
Jan 26, 2025
73a6eb2
fix: refactor bitstring status list verifier
Jan 26, 2025
ff512c2
fix: add serialization/deserialization to bitstring-status-list, fix …
Jan 26, 2025
1df4254
fix: fix docstrings
Jan 26, 2025
956e410
fix: fix error messages
May 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: initial verifier for bitstring status list
untested, only supports jwts

Signed-off-by: Athan Massouras <athan@indicio.tech>
  • Loading branch information
Athan Massouras committed Jan 18, 2025
commit 1d27806e6f9d17e3add14fb8f3e1f683dde6489c
49 changes: 24 additions & 25 deletions src/bitstring_status_list/issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@

MIN_LIST_LENGTH = 131072

class StatusListLengthError(Exception):
"""Raised when the status list is insufficiently long."""

class BitstringStatusListIssuer(Issuer):
"""Bitstring Status List Issuer."""
def __init__(
self,
status_list: BitArray[N],
allocator: IndexAllocator,
min_list_length: int = MIN_LIST_LENGTH,
):
super().__init__
if self.status_list.bits != 1:
raise ValueError("Bitstring status list must have single bit statuses.")

if len(self.status_list) < MIN_LIST_LENGTH:
raise ValueError(f"Bitstring status list must be at least {MIN_LIST_LENGTH} bits long,
but was {len(self.status_list)} bits long instead.")
super().__init__(
status_list=status_list,
allocator=allocator
)
self.min_list_length = min_list_length

if len(self.status_list) < self.min_list_length:
raise StatusListLengthError(f"Bitstring status list must be at least {self.min_list_length}
bits long, but was {len(self.status_list)} bits long instead.")

@classmethod
def load(cls, value: dict) -> "BitstringStatusListIssuer":
Expand All @@ -57,33 +63,25 @@ def load(cls, value: dict) -> "BitstringStatusListIssuer":
raise TypeError("status_list must be dict")

parsed_status_list = BitArray.load(status_list)

if parsed_status_list.bits != 1:
raise ValueError("Bitstring status list must have single bit statuses.")

if len(parsed_status_list) < MIN_LIST_LENGTH:
raise ValueError(f"Bitstring status list must be at least {MIN_LIST_LENGTH} bits long,
but was {len(parsed_status_list)} bits long instead.")

return cls(parsed_status_list, allocator)

@classmethod
def new(cls, size: int, strategy: Literal["linear", "random"] = "random") -> "BitstringStatusListIssuer":
def new(cls, size: int, bits: Bits = 1, strategy: Literal["linear", "random"] = "random", min_list_length: int = MIN_LIST_LENGTH) -> "BitstringStatusListIssuer":
"""Return a new Issuer."""
if size < MIN_LIST_LENGTH:
raise ValueError(f"Bitstring status list must be at least {MIN_LIST_LENGTH} bits long,
but was {size} bits long instead.")
if size < min_list_length:
raise StatusListLengthError(f"Bitstring status list must be at least {min_list_length} bits
long, but was {size} bits long instead.")

if strategy == "linear":
allocator = LinearIndexAllocator(size)
elif strategy == "random":
allocator = RandomIndexAllocator(
BitArray.with_at_least(1, size), num_allocated=0
BitArray.with_at_least(bits, size), num_allocated=0
)
else:
raise ValueError(f"Invalid strategy: {strategy}")

status_list = BitArray.with_at_least(1, size)
status_list = BitArray.with_at_least(bits, size)
return cls(status_list, allocator)

def generate_jwt(
Expand Down Expand Up @@ -153,9 +151,9 @@ def sign_jwt_enveloping(
ttl=ttl,
)

enc_headers = dict_to_b64(headers).decode()
enc_payload = dict_to_b64(payload).decode()
enc_to_sign = f"{enc_headers}.{enc_payload}".encode()
enc_headers = dict_to_b64(headers)
enc_payload = dict_to_b64(payload)
enc_to_sign = enc_headers + b"." + enc_payload

signature = signer(enc_to_sign)
return enc_to_sign + b"." + signature
Expand Down Expand Up @@ -183,7 +181,8 @@ def sign_jwt_embedding(
ttl=ttl,
)

payload.update(headers)
payload["proof"] = proof # TODO: is this correct?
return dict_to_b64(headers) + b"." + dict_to_b64(payload)
return dict_to_b64(payload)


141 changes: 138 additions & 3 deletions src/bitstring_status_list/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
cast,
)

import requests as r

from bit_array import *
from bitstring_status_list.issuer import MIN_LIST_LENGTH, StatusListLengthError

class EnvelopingTokenVerifier(Protocol):
"""Protocol defining the verifying callable for enveloping signatures."""

Expand All @@ -22,9 +27,139 @@ def __call__(self, payload: bytes, signature: bytes) -> bool:
class EmbeddingTokenVerifier(Protocol):
"""Protocol defining the verifying callable for embedding signatures."""

def __call_(self, payload: bytes, signature: dict) -> bool:
def __call__(self, payload: bytes, signature: dict) -> bool:
...

class StatusRetrievalError(Exception):
"""Raised if dereference of URL fails. See Bitstring Status List Spec S. 3.2"""

class StatusVerificationError(Exception):
"""Raised if proofs fail or if format is invalid. See Bitstring Status List Spec S. 3.2"""

class BitstringStatusListVerifier():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments above about the TokenStatusListVerifier; many of the same recommendations apply here.

def __init__(self):
pass
def __init__(
self,
credential_status: dict,

headers: Optional[dict] = None,
payload: Optional[dict] = None,

bit_array: Optional[BitArray] = None,
):
self.credential_status = credential_status
assert all(key in credential_status.keys() for key in ["id", "type", "statusPurpose", "statusListIndex", "statusListCredential"]),\
"Invalid credentialStatus"

self.headers = headers
self.payload = payload

self._bit_array = bit_array

def establish_connection(
self,
status_list_format: Literal["CWT", "JWT"],
) -> bytes:
""" Establish connection. Returns base64 encoded response. """
issuer_uri = self.credential_status["statusListCredential"]
try:
response = r.get(issuer_uri)
except Exception as e:
raise StatusRetrievalError(f"Dereference of uri {issuer_uri} failed: {e}.")

if not (200 <= response.status_code < 300):
raise StatusRetrievalError(f"Response status from {issuer_uri} was {response.status_code}.")

# When establishing a new connection, clear previous cached values.
self.headers = None
self.payload = None
self._bit_array = None

return response.content

def verify_jwt(
self,
sl_response: bytes,
verifier: EnvelopingTokenVerifier | EmbeddingTokenVerifier,
min_list_length: int = MIN_LIST_LENGTH,
):
"""
Takes a status-list response and a verifier, and ensures that the response matches the
required format, verifying the signature using verifier.

Will assign the headers and payload fields in the class if the format is valid and the
signature is correct, and raise an exception if not.

Args:
sl_response: REQUIRED. A base64-encoded status_list response, acquired (eg.) from
establish_connection().

verifier: REQUIRED. A callable that verifies the signature of a payload.

min_list_length: OPTIONAL. The minimum list length, recommended to be 131,072 (see S. 6.1)
"""

if b"." in sl_response:
# Enveloping proof

# Check that message is in valid JWT format
headers_bytes, payload_bytes, signature = sl_response.split(b".")
assert headers_bytes and payload_bytes and signature

# Verify signature. verifier must be of type EnvelopingTokenVerifier
if not verifier(headers_bytes + b"." + payload_bytes, b64url_decode(signature)):
raise StatusVerificationError("Invalid signature on payload.")

# Extract data
self.headers = json.loads(b64url_decode(headers_bytes))
self.payload = json.loads(b64url_decode(payload_bytes))
else:
# Embedding proof

# Extract data
self.payload = json.loads(b64url_decode(sl_response))

# TODO: Verification of signature. not sure how this works for embedded proofs

# Check values of status list against provided credential
credential_subject = self.payload["credentialSubject"]
if credential_subject["statusPurpose"] != self.credential_status["statusPurpose"]:
raise StatusVerificationError(
f"statusPurpose in credential is {self.credential_status["statusPurpose"]}, while
statusPurpose in status list is {credential_subject["statusPurpose"]}"
)

# Cache returned status list as BitArray
bits = self.credential_status.get("statusSize")
self._bit_array = BitArray.from_b64(1 if bits is None else bits, credential_subject["encodedList"])
if self._bit_array.size < min_list_length:
raise StatusListLengthError(f"Bitstring status list must be at least {min_list_length}
bits long, but was {self._bit_array.size} bits long instead.")

def get_status(self, idx: Optional[int] = None):
assert self._bit_array is not None, "Before accessing the status, please verify using jwt_verify or cwt_verify"
if idx is None:
idx = self.credential_status["statusListIndex"]

status = self._bit_array[idx]

return_dict = {
"status": status,
"valid": bool(status),
}

# If purpose == message, extract the relevant message and add it to the return_dict, as
# described in S. 3.2 Part 14.
purpose = self.credential_status.get("statusPurpose")
if purpose is not None and purpose == "message":
try:
for message in self.credential_status["statusMessage"]:
if int(message["status"], 16) == status:
return_dict["message"] = message["message"]
break

raise StatusVerificationError(f"Status not found in message list: {self.credential_status["statusMessage"]}")
except KeyError as k:
raise StatusVerificationError(f"statusMessage is malformed or not present: {k}")

return return_dict

29 changes: 18 additions & 11 deletions src/token_status_list/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(

self.payload = payload

self.bit_array = bit_array
self._bit_array = bit_array

def establish_connection(
self,
Expand All @@ -69,6 +69,14 @@ def establish_connection(
assert 200 <= response.status_code < 300, f"Unable to establish connection."
self.issuer_uri = issuer_uri
self.encoding = status_list_format

# When establishing a new connection, clear previous cached values.
self.headers = None
self.protected_headers = None
self.unprotected_headers = None
self.payload = None
self._bit_array = None

return response.content

def jwt_verify(self, sl_response: bytes, verifier: TokenVerifier):
Expand All @@ -87,7 +95,7 @@ def jwt_verify(self, sl_response: bytes, verifier: TokenVerifier):
signer in sign_jwt() in issuer.py.
"""
# Ensure that the format is correct
assert(self.encoding != "CWT", "Please use TokenStatusListVerifier.cwt_verifier() for tokens in cwt format.")
assert self.encoding != "CWT", "Please use TokenStatusListVerifier.cwt_verifier() for tokens in cwt format."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a single verify method that detects the encoding of the current status list and then calls an encoding specific verify; e.g.:

def verify(self, verifier: TokenVerifier):
    assert self.encoding in ("JWT", "CWT"), f"Invalid value for encoding: {self.encoding}"
    if self.encoding == "JWT":
        return self.jwt_verify(verifier)
    if self.encoding == "CWT":
        return self.cwt_verify(verifier)

if self.encoding is None:
self.encoding = "JWT"

Expand Down Expand Up @@ -125,6 +133,7 @@ def jwt_verify(self, sl_response: bytes, verifier: TokenVerifier):

self.headers = headers
self.payload = payload
self._bit_array = BitArray.load(payload["status_list"])

def cwt_verify(self, token: bytes, verifier: TokenVerifier):
"""
Expand Down Expand Up @@ -152,7 +161,7 @@ def cwt_verify(self, token: bytes, verifier: TokenVerifier):
raise ImportError("cbor extra required to use this function") from err

# Ensure that the format is correct
assert(self.encoding != "JWT", "Please use TokenStatusListVerifier.jwt_verifier() for tokens in jwt format.")
assert self.encoding != "JWT", "Please use TokenStatusListVerifier.jwt_verifier() for tokens in jwt format."
if self.encoding is None:
self.encoding = "CWT"

Expand Down Expand Up @@ -194,6 +203,8 @@ def cwt_verify(self, token: bytes, verifier: TokenVerifier):
self.unprotected_headers = unprotected_headers
self.payload = payload

self._bit_array = BitArray.load(payload[STATUS_LIST])

def get_status(self, idx: int) -> int:
"""
Returns the status of an object from the status_list in payload.
Expand All @@ -209,14 +220,10 @@ def get_status(self, idx: int) -> int:
The status of the requested token.
"""

assert self.encoding is not None and self.payload is not None,\
assert self.encoding is not None and self.payload is not None and self._bit_array is not None,\
"Before accessing the status, please verify using jwt_verify or cwt_verify"

if self.bit_array is None:
status_list = self.payload["status_list"] if self.encoding == "JWT" else self.payload[STATUS_LIST]
self.bit_array = BitArray.load(status_list)

return self.bit_array[idx]
return self._bit_array[idx]


def serialize_verifier(self) -> dict:
Expand Down Expand Up @@ -266,11 +273,11 @@ def deserialize_verifier(cls, seralized_verifier: dict) -> "TokenStatusListVerif

if new_verifier.encoding == "JWT":
new_verifier.headers = seralized_verifier["headers"]
new_verifier.bit_array = BitArray.load(seralized_verifier["payload"]["status_list"])
new_verifier._bit_array = BitArray.load(seralized_verifier["payload"]["status_list"])
elif new_verifier.encoding == "CWT":
new_verifier.protected_headers = seralized_verifier["protected_headers"]
new_verifier.unprotected_headers = seralized_verifier["unprotected_headers"]
new_verifier.bit_array = BitArray.load(seralized_verifier["payload"][STATUS_LIST])
new_verifier._bit_array = BitArray.load(seralized_verifier["payload"][STATUS_LIST])
else:
raise ValueError(f"Invalid encoding: was {seralized_verifier["encoding"]} but needs to be JWT or CWT")

Expand Down