From 8c81be1775d2d3f1a62e53ba4ae5a08edc3e3627 Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Thu, 25 Jul 2024 08:59:11 +0100 Subject: [PATCH] new deal making script (#136) * new metadata * minor --- dataprep-tools/filecoin/boost_create_deals.py | 217 ++++++++++-------- 1 file changed, 120 insertions(+), 97 deletions(-) diff --git a/dataprep-tools/filecoin/boost_create_deals.py b/dataprep-tools/filecoin/boost_create_deals.py index 1a16faba..8516b69d 100644 --- a/dataprep-tools/filecoin/boost_create_deals.py +++ b/dataprep-tools/filecoin/boost_create_deals.py @@ -21,7 +21,7 @@ sys.path.append(os.path.abspath("/usr/local/lib/triton-py")) from triton_upload_clients import BunnyCDNClient, S3Client -VERSION='0.0.1' +VERSION = "0.0.1" start_epoch_head_offset = int(604800 / 30) @@ -53,61 +53,56 @@ # To run this manually # (set -a && source '/etc/default/boost_create_deals' && python3 /usr/local/bin/boost_create_deals.py 27 index) -dry_run = environ.get('DRY_RUN') == 'true' +dry_run = environ.get("DRY_RUN") == "true" if dry_run: - BOOST_VERSION=1 + BOOST_VERSION = 1 else: try: - BOOST_VERSION = check_output(['boost', '--version'], text=True).strip() + BOOST_VERSION = check_output(["boost", "--version"], text=True).strip() except CalledProcessError as e: - print('FATAL: could not get binary version(s)', e, - file=sys.stderr) + print("FATAL: could not get binary version(s)", e, file=sys.stderr) sys.exit(1) logging.basicConfig(level=logging.INFO) -endpoint = environ.get('STORAGE_ENDPOINT') -key_id = environ.get('STORAGE_KEY_ID') -application_key = environ.get('STORAGE_KEY') -storage_name = environ.get('STORAGE_NAME') -url_format = environ.get('PUBLIC_URL_FORMAT') -upload_client = environ.get('UPLOAD_CLIENT') +endpoint = environ.get("STORAGE_ENDPOINT") +key_id = environ.get("STORAGE_KEY_ID") +application_key = environ.get("STORAGE_KEY") +storage_name = environ.get("STORAGE_NAME") +url_format = environ.get("PUBLIC_URL_FORMAT") +upload_client = environ.get("UPLOAD_CLIENT") if upload_client == "S3": client = S3Client(endpoint, storage_name, url_format, key_id, application_key) else: client = BunnyCDNClient(endpoint, storage_name, url_format, "", application_key) -online = (environ.get('DEALTYPE') == 'online') +online = environ.get("DEALTYPE") == "online" + def create_deals(metadata_obj): - """" + """ Create deals for the files in the metadata object provided as an argument. Will attempt to lock and update `deal.csv` in the remote storage container. """ metadata_reader = StringIO(metadata_obj) - metadata_split_lines = csv.reader(metadata_reader, delimiter=',') + metadata_split_lines = csv.reader(metadata_reader, delimiter=",") next(metadata_split_lines, None) # skip the headers deal_data = [] for line in metadata_split_lines: file_name = os.path.basename(line[2]) - # FIles that are created with split-and-commp have 5 columns where as if it's using the dataprep - # tool they have 6 (one column contains the root cid) - if len(line) == 5: - commp_piece_cid = line[3] - padded_size = line[4] - elif len(line) ==6: - commp_piece_cid = line[4] - padded_size = line[5] - else: - logging.error("incorrect line length %d" % line(line)) - sys.exit(1) - - check_obj = client.check_exists(epoch + '/' + file_name) + # Only allow the new metadata + assert ( + len(line) == 4 + ), f"metadata.csv should have 4 columns, instead found f{len(line)}" + commp_piece_cid = line[1] + padded_size = line[2] + + check_obj = client.check_exists(epoch + "/" + file_name) if not check_obj[0]: logging.info("%s not found" % file_name) sys.exit(1) @@ -117,15 +112,19 @@ def create_deals(metadata_obj): sys.exit(1) continue elif check_obj[1] != int(padded_size): - logging.debug("%s size mismatch %s != %s" % (file_name, check_obj[1], padded_size)) + logging.debug( + "%s size mismatch %s != %s" % (file_name, check_obj[1], padded_size) + ) - public_url = client.get_public_url(epoch +'/'+ file_name) + public_url = client.get_public_url(epoch + "/" + file_name) check_url = client.check_public_url(public_url) if not check_url[0]: - logging.info('%s not accessible' % public_url) + logging.info("%s not accessible" % public_url) continue elif int(check_url[1]) != int(check_obj[1]): - logging.info('%s size mismatch %s != %s' % (public_url, check_url[1], check_obj[1])) + logging.info( + "%s size mismatch %s != %s" % (public_url, check_url[1], check_obj[1]) + ) continue deal_data_item = { @@ -134,101 +133,121 @@ def create_deals(metadata_obj): "commp_piece_cid": commp_piece_cid, "file_size": check_obj[1], "padded_size": padded_size, - "payload_cid": payload_cid + "payload_cid": payload_cid, } deal_data.append(deal_data_item) - providers = environ.get("PROVIDERS").split(',') + providers = environ.get("PROVIDERS").split(",") shuffle(providers) - logging.info('provider set: ') + logging.info("provider set: ") logging.info(providers) - replication_factor = int(environ.get('REPLICATION_FACTOR')) + replication_factor = int(environ.get("REPLICATION_FACTOR")) deals_providers = {} - fields = ['provider', 'deal_uuid', 'file_name', 'url', 'commp_piece_cid', 'file_size', 'padded_size', 'payload_cid'] + fields = [ + "provider", + "deal_uuid", + "file_name", + "url", + "commp_piece_cid", + "file_size", + "padded_size", + "payload_cid", + ] - deals_url = f'{epoch}/deals.csv' - lockfile = f'{epoch}/deals.csv.lock' + deals_url = f"{epoch}/deals.csv" + lockfile = f"{epoch}/deals.csv.lock" if deal_type == "index": # avoid overwritting deal files when doing index only deals - deals_url = f'{epoch}/deals-index.csv' - lockfile = f'{epoch}/deals-index.csv.lock' + deals_url = f"{epoch}/deals-index.csv" + lockfile = f"{epoch}/deals-index.csv.lock" filetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # Create a lock file for the epoch to ensure that no one else is working on it if not client.check_exists(lockfile)[0]: - client.upload_obj(StringIO(socket.gethostname()+"_"+filetime), lockfile) + client.upload_obj(StringIO(socket.gethostname() + "_" + filetime), lockfile) else: lock_data = client.read_object(lockfile) logging.error("lock file exists, exiting: " + lock_data) return 1 - deals_folder = environ.get('DEALS_FOLDER') - deals_file = f'{deals_folder}/{epoch}_deals_{filetime}.csv' + deals_folder = environ.get("DEALS_FOLDER") + deals_file = f"{deals_folder}/{epoch}_deals_{filetime}.csv" replications = {} check_existing_deals = client.check_exists(deals_url) if check_existing_deals[0]: client.download_file(deals_url, deals_file) - with open(deals_file, 'r') as csv_file: - reader = csv.DictReader(csv_file, fieldnames = fields) + with open(deals_file, "r") as csv_file: + reader = csv.DictReader(csv_file, fieldnames=fields) next(reader, None) # skip the headers for row in reader: - if row['commp_piece_cid'] not in replications: - replications[row['commp_piece_cid']] = [] - replications[row['commp_piece_cid']].append(row['provider']) + if row["commp_piece_cid"] not in replications: + replications[row["commp_piece_cid"]] = [] + replications[row["commp_piece_cid"]].append(row["provider"]) csv_file.close() - with open(deals_file, 'a+') as log_file: - csv_writer = csv.DictWriter(log_file, fieldnames = fields) + with open(deals_file, "a+") as log_file: + csv_writer = csv.DictWriter(log_file, fieldnames=fields) if not check_existing_deals[0]: csv_writer.writeheader() for provider in providers: - logging.info('making deal with %s', provider) + logging.info("making deal with %s", provider) for file_item in deal_data: - logging.info('creating deal for ') + logging.info("creating deal for ") logging.info(file_item) - if file_item['commp_piece_cid'] in replications: - if provider in replications[file_item['commp_piece_cid']]: - logging.info('skipping %s, already have deal with %s' % (file_item['commp_piece_cid'], provider)) + if file_item["commp_piece_cid"] in replications: + if provider in replications[file_item["commp_piece_cid"]]: + logging.info( + "skipping %s, already have deal with %s" + % (file_item["commp_piece_cid"], provider) + ) continue - if file_item['commp_piece_cid'] not in replications: - replications[file_item['commp_piece_cid']] = [] - elif len(replications[file_item['commp_piece_cid']]) >= replication_factor: - logging.info('skipping %s, already replicated %s times' % (file_item['commp_piece_cid'], replications[file_item['commp_piece_cid']])) + if file_item["commp_piece_cid"] not in replications: + replications[file_item["commp_piece_cid"]] = [] + elif ( + len(replications[file_item["commp_piece_cid"]]) + >= replication_factor + ): + logging.info( + "skipping %s, already replicated %s times" + % ( + file_item["commp_piece_cid"], + replications[file_item["commp_piece_cid"]], + ) + ) continue params = { - 'provider': provider, - 'commp': file_item['commp_piece_cid'], - 'piece-size': file_item['padded_size'], - 'car-size': file_item['file_size'], - 'payload-cid': file_item['payload_cid'], - 'storage-price': '0', - 'start-epoch-head-offset': start_epoch_head_offset, - 'verified': 'true', - 'duration': 1468800, - 'wallet': environ.get('WALLET'), + "provider": provider, + "commp": file_item["commp_piece_cid"], + "piece-size": file_item["padded_size"], + "car-size": file_item["file_size"], + "payload-cid": file_item["payload_cid"], + "storage-price": "0", + "start-epoch-head-offset": start_epoch_head_offset, + "verified": "true", + "duration": 1468800, + "wallet": environ.get("WALLET"), } - deal_arg = 'deal' + deal_arg = "deal" if online: - params['http-url'] = file_item['url'] + params["http-url"] = file_item["url"] else: - deal_arg = 'offline-deal' + deal_arg = "offline-deal" logging.info(params) - cmd = [ 'boost', - '--vv', - '--json=1', - deal_arg ] + [ f"--{k}={v}" for k, v in params.items() ] + cmd = ["boost", "--vv", "--json=1", deal_arg] + [ + f"--{k}={v}" for k, v in params.items() + ] logging.info(cmd) @@ -250,7 +269,7 @@ def create_deals(metadata_obj): "deal_uuid": res.get("dealUuid"), } - replications[file_item['commp_piece_cid']].append(provider) + replications[file_item["commp_piece_cid"]].append(provider) deal_output.update(file_item) csv_writer.writerow(deal_output) @@ -260,21 +279,21 @@ def create_deals(metadata_obj): log_file.close() if dry_run: - logging.info('completed processing dry run mode') + logging.info("completed processing dry run mode") else: - logging.info(f'uploading deals file {deals_file} to {deals_url}') + logging.info(f"uploading deals file {deals_file} to {deals_url}") if client.upload_file(deals_file, deals_url): - logging.info('upload successful') + logging.info("upload successful") else: - logging.info('upload failed') + logging.info("upload failed") # Print the number of replications - logging.info("total providers: "+str(len(deals_providers))) - for key,value in deals_providers.items(): - logging.info(f'{key} provider got {len(value)}/{len(deal_data)} deals') + logging.info("total providers: " + str(len(deals_providers))) + for key, value in deals_providers.items(): + logging.info(f"{key} provider got {len(value)}/{len(deal_data)} deals") logging.info("replication summary") - for key,value in replications.items(): - logging.info(f'{key} replicated {len(value)} times') + for key, value in replications.items(): + logging.info(f"{key} replicated {len(value)} times") if not client.delete_file(lockfile): logging.warning("WARNING: could not delete lock file") @@ -284,13 +303,15 @@ def create_deals(metadata_obj): # Code below should be agnostic to storage backend -if __name__ == '__main__': +if __name__ == "__main__": if len(sys.argv) < 2: - raise ValueError("Not enough arguments. usage: ", sys.argv[0], " []") + raise ValueError( + "Not enough arguments. usage: ", sys.argv[0], " []" + ) - logging.info('boost create deals version %s ' - '(boost version: %s).', - VERSION, BOOST_VERSION) + logging.info( + "boost create deals version %s " "(boost version: %s).", VERSION, BOOST_VERSION + ) epoch = sys.argv[1] @@ -303,25 +324,27 @@ def create_deals(metadata_obj): # Load the payload CI payload_cid = client.read_object("%s/epoch-%s.cid" % (epoch, epoch)).strip() - logging.info('creating deals for epoch %s with payload %s', epoch, payload_cid) + logging.info("creating deals for epoch %s with payload %s", epoch, payload_cid) # Load metadata csv produced by split-and-commp ret = 0 if len(deal_type) == 0: - logging.info('deal type not specified, creating for both epoch objects and index files') + logging.info( + "deal type not specified, creating for both epoch objects and index files" + ) metadata_obj = client.read_object(epoch + "/metadata.csv") ret += create_deals(metadata_obj) - logging.info('created deals for epoch files %d', ret) + logging.info("created deals for epoch files %d", ret) index_obj = client.read_object(epoch + "/index.csv") ret += create_deals(index_obj) - logging.info('created deals for index files %d', ret) + logging.info("created deals for index files %d", ret) else: metadata_obj = client.read_object(epoch + "/" + deal_type + ".csv") ret += create_deals(metadata_obj) - logging.info('created deals for %s files %d', deal_type, ret) + logging.info("created deals for %s files %d", deal_type, ret) sys.exit(ret)