@@ -43,20 +43,37 @@ def get_cnm_response_json_file(self, potential_file, granule_id):
4343 return None
4444 if len (cnm_response_keys ) > 1 :
4545 LOGGER .warning (f'more than 1 cnm response file: { cnm_response_keys } ' )
46- cnm_response_keys = cnm_response_keys [0 ]
46+ # assuming the names are the same, and it has processing date in the filename, it is easier to reverse it
47+ cnm_response_keys = sorted (cnm_response_keys )[- 1 ] # sort and get the last one which is supposed to be the most recent one.
4748 LOGGER .debug (f'cnm_response_keys: { cnm_response_keys } ' )
4849 local_file = self .__s3 .set_s3_url (f's3://{ self .__s3 .target_bucket } /{ cnm_response_keys } ' ).download ('/tmp' )
4950 cnm_response_json = FileUtils .read_json (local_file )
5051 FileUtils .remove_if_exists (local_file )
5152 return cnm_response_json
5253
54+ @staticmethod
55+ def revert_to_s3_url (input_url ):
56+ if input_url .startswith ("s3://" ):
57+ return input_url
58+ if input_url .startswith ("http://" ) or input_url .startswith ("https://" ):
59+ parts = input_url .split ('/' , 3 )
60+ if len (parts ) < 4 :
61+ ValueError (f'invalid url: { input_url } ' )
62+ path_parts = parts [3 ].split ('/' , 1 )
63+ if len (path_parts ) != 2 :
64+ ValueError (f'invalid url: { input_url } ' )
65+ bucket , key = path_parts
66+ return f"s3://{ bucket } /{ key } "
67+ raise ValueError (f'unknown schema: { input_url } ' )
68+
5369 def __extract_files (self , uds_cnm_json : dict , daac_config : dict ):
5470 granule_files = uds_cnm_json ['product' ]['files' ]
5571 if 'archiving_types' not in daac_config or len (daac_config ['archiving_types' ]) < 1 :
5672 return granule_files # TODO remove missing md5?
5773 archiving_types = {k ['data_type' ]: [] if 'file_extension' not in k else k ['file_extension' ] for k in daac_config ['archiving_types' ]}
5874 result_files = []
5975 for each_file in granule_files :
76+ LOGGER .debug (f'each_file: { each_file } ' )
6077 """
6178 {
6279 "type": "data",
@@ -71,6 +88,7 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
7188 if each_file ['type' ] not in archiving_types :
7289 continue
7390 file_extensions = archiving_types [each_file ['type' ]]
91+ each_file ['uri' ] = self .revert_to_s3_url (each_file ['uri' ])
7492 if len (file_extensions ) < 1 :
7593 result_files .append (each_file ) # TODO remove missing md5?
7694 temp_filename = each_file ['name' ].upper ().strip ()
@@ -79,28 +97,36 @@ def __extract_files(self, uds_cnm_json: dict, daac_config: dict):
7997 return result_files
8098
8199 def send_to_daac_internal (self , uds_cnm_json : dict ):
100+ LOGGER .debug (f'uds_cnm_json: { uds_cnm_json } ' )
82101 granule_identifier = UdsCollections .decode_identifier (uds_cnm_json ['identifier' ]) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
83102 self .__archive_index_logic .set_tenant_venue (granule_identifier .tenant , granule_identifier .venue )
84103 daac_config = self .__archive_index_logic .percolate_document (uds_cnm_json ['identifier' ])
85104 if daac_config is None or len (daac_config ) < 1 :
86105 LOGGER .debug (f'uds_cnm_json is not configured for archival. uds_cnm_json: { uds_cnm_json } ' )
87106 return
88107 daac_config = daac_config [0 ] # TODO This is currently not supporting more than 1 daac.
108+ result = JsonValidator (UdsArchiveConfigIndex .basic_schema ).validate (daac_config )
109+ if result is not None :
110+ raise ValueError (f'daac_config does not have valid schema. Pls re-add the daac config: { result } for { daac_config } ' )
89111 try :
90112 self .__sns .set_topic_arn (daac_config ['daac_sns_topic_arn' ])
91113 daac_cnm_message = {
92- "collection" : daac_config ['daac_collection_name' ],
114+ "collection" : {
115+ 'name' : daac_config ['daac_collection_name' ],
116+ 'version' : daac_config ['daac_data_version' ],
117+ },
93118 "identifier" : uds_cnm_json ['identifier' ],
94119 "submissionTime" : f'{ TimeUtils .get_current_time ()} Z' ,
95120 "provider" : granule_identifier .tenant ,
96121 "version" : "1.6.0" , # TODO this is hardcoded?
97122 "product" : {
98123 "name" : granule_identifier .id ,
99- "dataVersion" : daac_config ['daac_data_version' ],
124+ # "dataVersion": daac_config['daac_data_version'],
100125 'files' : self .__extract_files (uds_cnm_json , daac_config ),
101126 }
102127 }
103- self .__sns .publish_message (json .dumps (daac_cnm_message ))
128+ LOGGER .debug (f'daac_cnm_message: { daac_cnm_message } ' )
129+ self .__sns .set_external_role (daac_config ['daac_role_arn' ], daac_config ['daac_role_session_name' ]).publish_message (json .dumps (daac_cnm_message ), True )
104130 self .__granules_index .update_entry (granule_identifier .tenant , granule_identifier .venue , {
105131 'archive_status' : 'cnm_s_success' ,
106132 'archive_error_message' : '' ,
0 commit comments