|
1 |
| -from typing import Dict, List |
| 1 | +import logging |
2 | 2 |
|
3 | 3 | import base58
|
4 | 4 | import cid
|
5 | 5 |
|
6 | 6 | from ..codecs import CodecBase
|
| 7 | +from ..exceptions import BinaryParseError |
7 | 8 | from . import LENGTH_PREFIXED_VAR_SIZE
|
8 | 9 |
|
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
9 | 12 | SIZE = LENGTH_PREFIXED_VAR_SIZE
|
10 | 13 | IS_PATH = False
|
11 | 14 |
|
12 | 15 |
|
13 | 16 | # Spec: https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation
|
14 |
| -CIDv0_PREFIX_TO_LENGTH: Dict[str, List[int]] = { |
| 17 | +CIDv0_PREFIX_TO_LENGTH: dict[str, list[int]] = { |
15 | 18 | # base58btc prefixes for valid lengths 1 - 42 with the identity "hash" function
|
16 | 19 | "12": [5, 12, 19, 23, 30, 41, 52, 56],
|
17 | 20 | "13": [9, 16, 34, 45],
|
|
63 | 66 | "Qm": [46],
|
64 | 67 | }
|
65 | 68 |
|
66 |
| -PROTO_NAME_TO_CIDv1_CODEC: Dict[str, str] = { |
67 |
| - # The "p2p" multiaddr protocol requires all keys to use the "libp2p-key" multicodec |
| 69 | +PROTO_NAME_TO_CIDv1_CODEC = { |
68 | 70 | "p2p": "libp2p-key",
|
| 71 | + "ipfs": "dag-pb", |
69 | 72 | }
|
70 | 73 |
|
71 | 74 |
|
| 75 | +def _is_binary_cidv0_multihash(buf: bytes) -> bool: |
| 76 | + """Check if the given bytes represent a CIDv0 multihash.""" |
| 77 | + try: |
| 78 | + # CIDv0 is just a base58btc encoded multihash |
| 79 | + # The first byte is the hash function code, second byte is the length |
| 80 | + if len(buf) < 2: |
| 81 | + return False |
| 82 | + hash_code = buf[0] |
| 83 | + length = buf[1] |
| 84 | + if len(buf) != length + 2: # +2 for the hash code and length bytes |
| 85 | + return False |
| 86 | + # For CIDv0, we only support sha2-256 (0x12) and identity (0x00) |
| 87 | + return hash_code in (0x12, 0x00) |
| 88 | + except Exception: |
| 89 | + return False |
| 90 | + |
| 91 | + |
72 | 92 | class Codec(CodecBase):
|
73 | 93 | SIZE = SIZE
|
74 | 94 | IS_PATH = IS_PATH
|
75 | 95 |
|
76 |
| - def to_bytes(self, proto, string): |
77 |
| - expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name) |
78 |
| - |
79 |
| - if len(string) in CIDv0_PREFIX_TO_LENGTH.get(string[0:2], ()): # CIDv0 |
80 |
| - # Upgrade the wire (binary) representation of any received CIDv0 string |
81 |
| - # to CIDv1 if we can determine which multicodec value to use |
82 |
| - if expected_codec: |
83 |
| - cid_obj = cid.make_cid(1, expected_codec, base58.b58decode(string)) |
84 |
| - assert isinstance(cid_obj.buffer, bytes) |
85 |
| - return cid_obj.buffer |
86 |
| - |
87 |
| - return base58.b58decode(string) |
88 |
| - else: # CIDv1+ |
89 |
| - parsed = cid.from_string(string) |
| 96 | + def to_bytes(self, proto, string: str) -> bytes: |
| 97 | + """Convert a CID string to its binary representation.""" |
| 98 | + if not string: |
| 99 | + raise ValueError("CID string cannot be empty") |
| 100 | + |
| 101 | + logger.debug(f"[DEBUG CID to_bytes] Input value: {string}") |
| 102 | + |
| 103 | + # First try to parse as CIDv0 (base58btc encoded multihash) |
| 104 | + try: |
| 105 | + decoded = base58.b58decode(string) |
| 106 | + if _is_binary_cidv0_multihash(decoded): |
| 107 | + logger.debug(f"[DEBUG CID to_bytes] Parsed as CIDv0: {decoded.hex()}") |
| 108 | + # Do not add length prefix here; the framework handles it |
| 109 | + return decoded |
| 110 | + except Exception as e: |
| 111 | + logger.debug(f"[DEBUG CID to_bytes] Failed to parse as CIDv0: {e}") |
| 112 | + |
| 113 | + # If not CIDv0, try to parse as CIDv1 |
| 114 | + try: |
| 115 | + parsed = cid.make_cid(string) |
| 116 | + |
| 117 | + # Do not add length prefix here; the framework handles it |
| 118 | + if not isinstance(parsed.buffer, bytes): |
| 119 | + raise ValueError("CID buffer must be bytes") |
| 120 | + return parsed.buffer |
| 121 | + except ValueError as e: |
| 122 | + logger.debug(f"[DEBUG CID to_bytes] Failed to parse as CIDv1: {e}") |
| 123 | + raise ValueError(f"Invalid CID: {string}") |
90 | 124 |
|
91 |
| - # Ensure CID has correct codec for protocol |
92 |
| - if expected_codec and parsed.codec != expected_codec: |
93 |
| - raise ValueError( |
94 |
| - '"{0}" multiaddr CIDs must use the "{1}" multicodec'.format( |
95 |
| - proto.name, expected_codec |
96 |
| - ) |
97 |
| - ) |
| 125 | + def to_string(self, proto, buf: bytes) -> str: |
| 126 | + """Convert a binary CID to its string representation.""" |
| 127 | + if not buf: |
| 128 | + raise ValueError("CID buffer cannot be empty") |
98 | 129 |
|
99 |
| - return parsed.buffer |
| 130 | + logger.debug(f"[DEBUG CID to_string] Input buffer: {buf.hex()}") |
| 131 | + logger.debug(f"[DEBUG CID to_string] Protocol: {proto.name}") |
100 | 132 |
|
101 |
| - def to_string(self, proto, buf): |
102 | 133 | expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name)
|
| 134 | + logger.debug(f"[DEBUG CID to_string] Expected codec: {expected_codec}") |
| 135 | + |
| 136 | + try: |
| 137 | + # First try to parse as CIDv0 |
| 138 | + if _is_binary_cidv0_multihash(buf): |
| 139 | + result = base58.b58encode(buf).decode("ascii") |
| 140 | + logger.debug(f"[DEBUG CID to_string] Parsed as CIDv0: {result}") |
| 141 | + return result |
103 | 142 |
|
104 |
| - if _is_binary_cidv0_multihash(buf): # CIDv0 |
105 |
| - if not expected_codec: |
106 |
| - # Simply encode as base58btc as there is nothing better to do |
107 |
| - return base58.b58encode(buf).decode("ascii") |
108 |
| - |
109 |
| - # "Implementations SHOULD display peer IDs using the first (raw |
110 |
| - # base58btc encoded multihash) format until the second format is |
111 |
| - # widely supported." |
112 |
| - # |
113 |
| - # In the future the following line should instead convert the multihash |
114 |
| - # to CIDv1 and with the `expected_codec` and wrap it in base32: |
115 |
| - # return cid.make_cid(1, expected_codec, buf).encode("base32").decode("ascii") |
116 |
| - return base58.b58encode(buf).decode("ascii") |
117 |
| - else: # CIDv1+ |
| 143 | + # If not CIDv0, try to parse as CIDv1 |
118 | 144 | parsed = cid.from_bytes(buf)
|
| 145 | + logger.debug(f"[DEBUG CID to_string] Parsed as CIDv1: {parsed}") |
119 | 146 |
|
120 | 147 | # Ensure CID has correct codec for protocol
|
121 | 148 | if expected_codec and parsed.codec != expected_codec:
|
122 | 149 | raise ValueError(
|
123 |
| - '"{0}" multiaddr CIDs must use the "{1}" multicodec'.format( |
| 150 | + '"{}" multiaddr CIDs must use the "{}" multicodec'.format( |
124 | 151 | proto.name, expected_codec
|
125 | 152 | )
|
126 | 153 | )
|
127 | 154 |
|
128 |
| - # "Implementations SHOULD display peer IDs using the first (raw |
129 |
| - # base58btc encoded multihash) format until the second format is |
130 |
| - # widely supported." |
131 |
| - if expected_codec and _is_binary_cidv0_multihash(parsed.multihash): |
132 |
| - return base58.b58encode(parsed.multihash).decode("ascii") |
133 |
| - |
134 |
| - return parsed.encode("base32").decode("ascii") |
135 |
| - |
136 |
| - |
137 |
| -def _is_binary_cidv0_multihash(buf: bytes) -> bool: |
138 |
| - if buf.startswith(b"\x12\x20") and len(buf) == 34: # SHA2-256 |
139 |
| - return True |
140 |
| - |
141 |
| - if (buf[0] == 0x00 and buf[1] in range(43)) and len(buf) == (buf[1] + 2): # Identity hash |
142 |
| - return True |
143 |
| - |
144 |
| - return False |
| 155 | + # For peer IDs (p2p/ipfs), always try to use CIDv0 format if possible |
| 156 | + if expected_codec: |
| 157 | + # Try to convert to CIDv0 format |
| 158 | + try: |
| 159 | + # Extract the multihash bytes |
| 160 | + multihash = parsed.multihash |
| 161 | + logger.debug(f"[DEBUG CID to_string] Extracted multihash: {multihash.hex()}") |
| 162 | + # Check if it's a valid CIDv0 multihash |
| 163 | + if _is_binary_cidv0_multihash(multihash): |
| 164 | + result = base58.b58encode(multihash).decode("ascii") |
| 165 | + logger.debug(f"[DEBUG CID to_string] Converted to CIDv0: {result}") |
| 166 | + return result |
| 167 | + except Exception as e: |
| 168 | + logger.debug(f"[DEBUG CID to_string] Failed to convert to CIDv0: {e}") |
| 169 | + |
| 170 | + # If we can't convert to CIDv0, use base32 CIDv1 format |
| 171 | + result = parsed.encode("base32").decode("ascii") |
| 172 | + logger.debug(f"[DEBUG CID to_string] Using CIDv1 format: {result}") |
| 173 | + return result |
| 174 | + except Exception as e: |
| 175 | + logger.debug(f"[DEBUG CID to_string] Error: {e}") |
| 176 | + raise BinaryParseError(str(e), buf, proto.name, e) from e |
0 commit comments