|
| 1 | +import asyncio |
| 2 | +import aioipfs |
| 3 | +import asyncpg |
| 4 | +import argparse |
| 5 | +import logging |
| 6 | +import os |
| 7 | + |
| 8 | +# Configure logging to provide clear output |
| 9 | +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| 10 | + |
| 11 | +DATABASE_DSN="postgres://USERNAME:PASSWORD@localhost:8080/aleph" |
| 12 | +IPFS_API="'/ip4/127.0.0.1/tcp/5001'" |
| 13 | + |
| 14 | +async def get_ipfs_pins(api_addr: str) -> set: |
| 15 | + """ |
| 16 | + Connects to an IPFS instance and retrieves a set of recursively pinned CIDs. |
| 17 | +
|
| 18 | + Args: |
| 19 | + api_addr: The multiaddress of the IPFS API endpoint (e.g., '/ip4/127.0.0.1/tcp/5001'). |
| 20 | +
|
| 21 | + Returns: |
| 22 | + A set of recursively pinned CIDs (as strings). |
| 23 | + """ |
| 24 | + logging.info(f"Connecting to IPFS API at {api_addr}...") |
| 25 | + client = None |
| 26 | + try: |
| 27 | + client = aioipfs.AsyncIPFS(maddr=api_addr) |
| 28 | + pins = set() |
| 29 | + # The 'type' argument filters for recursive pins directly in the API call. |
| 30 | + # The result is an async generator, so we iterate through it. |
| 31 | + pin_list = await client.pin.ls(pintype='recursive', quiet=True) |
| 32 | + pinned = list(pin_list['Keys'].keys()) |
| 33 | + for pin in pinned: |
| 34 | + pins.add(pin) |
| 35 | + logging.info(f"Found {len(pins)} recursively pinned files in IPFS.") |
| 36 | + return pins |
| 37 | + except Exception as e: |
| 38 | + logging.error(f"Failed to connect or retrieve pins from IPFS: {e}") |
| 39 | + return set() |
| 40 | + finally: |
| 41 | + if client: |
| 42 | + await client.close() |
| 43 | + logging.info("IPFS client connection closed.") |
| 44 | + |
| 45 | + |
| 46 | +async def get_database_hashes(dsn: str) -> set: |
| 47 | + """ |
| 48 | + Connects to a PostgreSQL database and retrieves a set of file hashes that should be pinned. |
| 49 | +
|
| 50 | + Args: |
| 51 | + dsn: The PostgreSQL connection string. |
| 52 | +
|
| 53 | + Returns: |
| 54 | + A set of file hashes (as strings) that should be pinned. |
| 55 | + """ |
| 56 | + logging.info("Connecting to PostgreSQL database...") |
| 57 | + conn = None |
| 58 | + try: |
| 59 | + conn = await asyncpg.connect(dsn) |
| 60 | + # The query provided by the user |
| 61 | + # query = """ |
| 62 | + # SELECT f.hash FROM file_pins fp |
| 63 | + # INNER JOIN files f ON f.hash = fp.file_hash |
| 64 | + # INNER JOIN messages m ON m.item_hash = fp.item_hash |
| 65 | + # WHERE m."type" = 'STORE' and m."content"->>'item_type' = 'ipfs' \ |
| 66 | + # """ |
| 67 | + query = """ |
| 68 | + SELECT f.hash FROM files f |
| 69 | + WHERE f.hash like 'Qm%' or f.hash like 'bafkrei%' \ |
| 70 | + """ |
| 71 | + rows = await conn.fetch(query) |
| 72 | + hashes = {row['hash'] for row in rows} |
| 73 | + logging.info(f"Found {len(hashes)} files that should be pinned in the database.") |
| 74 | + return hashes |
| 75 | + except Exception as e: |
| 76 | + logging.error(f"Failed to connect or query the database: {e}") |
| 77 | + return set() |
| 78 | + finally: |
| 79 | + if conn: |
| 80 | + await conn.close() |
| 81 | + logging.info("Database connection closed.") |
| 82 | + |
| 83 | + |
| 84 | +async def unpin_files(api_addr: str, cids_to_unpin: list): |
| 85 | + """ |
| 86 | + Removes pins for a given list of CIDs from the IPFS node. |
| 87 | +
|
| 88 | + Args: |
| 89 | + api_addr: The multiaddress of the IPFS API endpoint. |
| 90 | + cids_to_unpin: A list of CID strings to unpin. |
| 91 | + """ |
| 92 | + if not cids_to_unpin: |
| 93 | + logging.info("No files to unpin.") |
| 94 | + return |
| 95 | + |
| 96 | + logging.info(f"Connecting to IPFS API at {api_addr} to unpin files...") |
| 97 | + client = None |
| 98 | + try: |
| 99 | + client = aioipfs.AsyncIPFS(maddr=api_addr) |
| 100 | + for cid in cids_to_unpin: |
| 101 | + try: |
| 102 | + logging.warning(f"Unpinning {cid}...") |
| 103 | + await client.pin.rm(cid) |
| 104 | + logging.info(f"Successfully unpinned {cid}.") |
| 105 | + except Exception as e: |
| 106 | + logging.error(f"Failed to unpin {cid}: {e}") |
| 107 | + except Exception as e: |
| 108 | + logging.error(f"Failed to connect to IPFS for unpinning: {e}") |
| 109 | + finally: |
| 110 | + if client: |
| 111 | + await client.close() |
| 112 | + logging.info("IPFS client connection closed after unpinning.") |
| 113 | + |
| 114 | +async def pin_files(api_addr: str, cids_to_pin: list): |
| 115 | + """ |
| 116 | + Pins a given list of CIDs to the IPFS node. |
| 117 | +
|
| 118 | + Args: |
| 119 | + api_addr: The multiaddress of the IPFS API endpoint. |
| 120 | + cids_to_pin: A list of CID strings to pin. |
| 121 | + """ |
| 122 | + if not cids_to_pin: |
| 123 | + logging.info("No files to pin.") |
| 124 | + return |
| 125 | + |
| 126 | + logging.info(f"Connecting to IPFS API at {api_addr} to pin files...") |
| 127 | + client = None |
| 128 | + try: |
| 129 | + client = aioipfs.AsyncIPFS(maddr=api_addr) |
| 130 | + for cid in cids_to_pin: |
| 131 | + try: |
| 132 | + logging.info(f"Pinning {cid}...") |
| 133 | + # The 'add' method pins recursively by default |
| 134 | + async for cid_pin in client.pin.add(cid): |
| 135 | + print('Pin progress', cid_pin['Progress']) |
| 136 | + logging.info(f"Successfully pinned {cid}.") |
| 137 | + except Exception as e: |
| 138 | + logging.error(f"Failed to pin {cid}: {e}") |
| 139 | + except Exception as e: |
| 140 | + logging.error(f"Failed to connect to IPFS for pinning: {e}") |
| 141 | + finally: |
| 142 | + if client: |
| 143 | + await client.close() |
| 144 | + logging.info("IPFS client connection closed after pinning.") |
| 145 | + |
| 146 | + |
| 147 | +async def main(): |
| 148 | + """ |
| 149 | + Main function to orchestrate the IPFS pin synchronization process. |
| 150 | + """ |
| 151 | + parser = argparse.ArgumentParser( |
| 152 | + description="Compares IPFS pins with a database record and optionally syncs the state." |
| 153 | + ) |
| 154 | + # IPFS arguments |
| 155 | + parser.add_argument( |
| 156 | + '--ipfs-api', |
| 157 | + default=os.getenv('IPFS_API', IPFS_API), |
| 158 | + help="IPFS API multiaddress (default: /ip4/127.0.0.1/tcp/5001)" |
| 159 | + ) |
| 160 | + # PostgreSQL arguments from environment variables for security |
| 161 | + parser.add_argument( |
| 162 | + '--db-dsn', |
| 163 | + default=os.getenv('DATABASE_DSN', DATABASE_DSN), |
| 164 | + help="PostgreSQL DSN (e.g., 'postgres://user:pass@host:port/dbname'). " |
| 165 | + "Can also be set via DATABASE_DSN environment variable." |
| 166 | + ) |
| 167 | + # Action arguments |
| 168 | + parser.add_argument( |
| 169 | + '--unpin', |
| 170 | + action='store_true', |
| 171 | + help="Actually perform the unpinning of files. Default is a dry run." |
| 172 | + ) |
| 173 | + parser.add_argument( |
| 174 | + '--pin', |
| 175 | + action='store_true', |
| 176 | + help="Actually perform the pinning of missing files. Default is a dry run." |
| 177 | + ) |
| 178 | + args = parser.parse_args() |
| 179 | + |
| 180 | + if not args.db_dsn: |
| 181 | + logging.error("Database DSN must be provided via --db-dsn argument or DATABASE_DSN environment variable.") |
| 182 | + return |
| 183 | + |
| 184 | + # Get the two sets of hashes/CIDs |
| 185 | + ipfs_pins = await get_ipfs_pins(args.ipfs_api) |
| 186 | + db_hashes = await get_database_hashes(args.db_dsn) |
| 187 | + |
| 188 | + if not ipfs_pins and not db_hashes: |
| 189 | + logging.warning("Both IPFS and database checks returned empty sets. Exiting.") |
| 190 | + return |
| 191 | + |
| 192 | + # --- 1. Check for files in IPFS that should be UNPINNED --- |
| 193 | + pins_to_remove = ipfs_pins - db_hashes |
| 194 | + |
| 195 | + if not pins_to_remove: |
| 196 | + logging.info("All pinned files are correctly referenced in the database.") |
| 197 | + else: |
| 198 | + logging.warning(f"Found {len(pins_to_remove)} files to UNPIN (in IPFS, not in DB):") |
| 199 | + for cid in pins_to_remove: |
| 200 | + print(f" - {cid}") |
| 201 | + |
| 202 | + if args.unpin: |
| 203 | + logging.info("--- UNPINNING ENABLED ---") |
| 204 | + await unpin_files(args.ipfs_api, list(pins_to_remove)) |
| 205 | + logging.info("--- UNPINNING PROCESS COMPLETE ---") |
| 206 | + else: |
| 207 | + logging.info("-> This was a dry run. Use --unpin to remove them.") |
| 208 | + |
| 209 | + print("-" * 50) |
| 210 | + |
| 211 | + # --- 2. Check for files in DB that should be PINNED --- |
| 212 | + hashes_to_add = db_hashes - ipfs_pins |
| 213 | + |
| 214 | + if not hashes_to_add: |
| 215 | + logging.info("All necessary files from the database are already pinned in IPFS.") |
| 216 | + else: |
| 217 | + for cid in hashes_to_add: |
| 218 | + print(f" + {cid}") |
| 219 | + |
| 220 | + if args.pin: |
| 221 | + logging.info("--- PINNING ENABLED ---") |
| 222 | + await pin_files(args.ipfs_api, list(hashes_to_add)) |
| 223 | + logging.info("--- PINNING PROCESS COMPLETE ---") |
| 224 | + else: |
| 225 | + logging.info("-> This was a dry run. Use --pin to add them.") |
| 226 | + |
| 227 | + logging.warning(f"Found {len(pins_to_remove)} files to UNPIN (in IPFS, not in DB):") |
| 228 | + logging.warning(f"Found {len(hashes_to_add)} files to PIN (in DB, not in IPFS):") |
| 229 | + |
| 230 | + |
| 231 | +if __name__ == "__main__": |
| 232 | + asyncio.run(main()) |
0 commit comments