@@ -31,6 +31,26 @@ class TransactionsClient(BaseTransactionsClient):
3131 settings = ElasticsearchSettings ()
3232 client = settings .create_client
3333
34+ def _create_item_index (self ):
35+ mapping = {
36+ "mappings" : {
37+ "properties" : {
38+ "geometry" : {"type" : "geo_shape" },
39+ "id" : {"type" : "text" , "fields" : {"keyword" : {"type" : "keyword" }}},
40+ "properties__datetime" : {
41+ "type" : "text" ,
42+ "fields" : {"keyword" : {"type" : "keyword" }},
43+ },
44+ }
45+ }
46+ }
47+
48+ _ = self .client .indices .create (
49+ index = "stac_items" ,
50+ body = mapping ,
51+ ignore = 400 , # ignore 400 already exists code
52+ )
53+
3454 def create_item (self , model : stac_types .Item , ** kwargs ):
3555 """Create item."""
3656 base_url = str (kwargs ["request" ].base_url )
@@ -59,24 +79,7 @@ def create_item(self, model: stac_types.Item, **kwargs):
5979 if "created" not in model ["properties" ]:
6080 model ["properties" ]["created" ] = str (now )
6181
62- mapping = {
63- "mappings" : {
64- "properties" : {
65- "geometry" : {"type" : "geo_shape" },
66- "id" : {"type" : "text" , "fields" : {"keyword" : {"type" : "keyword" }}},
67- "properties__datetime" : {
68- "type" : "text" ,
69- "fields" : {"keyword" : {"type" : "keyword" }},
70- },
71- }
72- }
73- }
74-
75- _ = self .client .indices .create (
76- index = "stac_items" ,
77- body = mapping ,
78- ignore = 400 , # ignore 400 already exists code
79- )
82+ self ._create_item_index ()
8083
8184 self .client .index (
8285 index = "stac_items" , doc_type = "_doc" , id = model ["id" ], document = model
@@ -91,6 +94,8 @@ def create_collection(self, model: stac_types.Collection, **kwargs):
9194 ).create_links ()
9295 model ["links" ] = collection_links
9396
97+ self ._create_item_index ()
98+
9499 if self .client .exists (index = "stac_collections" , id = model ["id" ]):
95100 raise ConflictError (f"Collection { model ['id' ]} already exists" )
96101 self .client .index (
@@ -155,39 +160,79 @@ def __attrs_post_init__(self):
155160 settings = ElasticsearchSettings ()
156161 self .client = settings .create_client
157162
163+ def _create_item_index (self ):
164+ mapping = {
165+ "mappings" : {
166+ "properties" : {
167+ "geometry" : {"type" : "geo_shape" },
168+ "id" : {"type" : "text" , "fields" : {"keyword" : {"type" : "keyword" }}},
169+ "properties__datetime" : {
170+ "type" : "text" ,
171+ "fields" : {"keyword" : {"type" : "keyword" }},
172+ },
173+ }
174+ }
175+ }
176+
177+ _ = self .client .indices .create (
178+ index = "stac_items" ,
179+ body = mapping ,
180+ ignore = 400 , # ignore 400 already exists code
181+ )
182+
158183 def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
159184 """Preprocess items to match data model."""
160185 item_links = ItemLinks (
161186 collection_id = model ["collection" ], item_id = model ["id" ], base_url = base_url
162187 ).create_links ()
163188 model ["links" ] = item_links
164189
165- # with self.client.start_session(causal_consistency=True) as session:
166- # error_check = ErrorChecks(session=session, client=self.client)
167- # error_check._check_collection_foreign_key(model)
168- # error_check._check_item_conflict(model)
169- # now = datetime.utcnow().strftime(DATETIME_RFC339)
170- # if "created" not in model["properties"]:
171- # model["properties"]["created"] = str(now)
172- # return model
190+ if not self .client .exists (index = "stac_collections" , id = model ["collection" ]):
191+ raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
192+
193+ if self .client .exists (index = "stac_items" , id = model ["id" ]):
194+ raise ConflictError (
195+ f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
196+ )
197+
198+ now = datetime .utcnow ().strftime (DATETIME_RFC339 )
199+ if "created" not in model ["properties" ]:
200+ model ["properties" ]["created" ] = str (now )
201+
202+ # elasticsearch doesn't like the fact that some values are float and some were int
203+ if "eo:bands" in model ["properties" ]:
204+ for wave in model ["properties" ]["eo:bands" ]:
205+ for k , v in wave .items ():
206+ if type (v ) != str :
207+ v = float (v )
208+ wave .update ({k : v })
209+ return model
173210
174211 def bulk_item_insert (self , items : Items , ** kwargs ) -> str :
175212 """Bulk item insertion using es."""
213+ self ._create_item_index ()
176214 try :
177215 base_url = str (kwargs ["request" ].base_url )
178216 except Exception :
179217 base_url = ""
180218 processed_items = [self ._preprocess_item (item , base_url ) for item in items ]
181219 return_msg = f"Successfully added { len (processed_items )} items."
182- # with self.client.start_session(causal_consistency=True) as session:
183- # self.item_table.insert_many(processed_items, session=session)
184- # return return_msg
185220
186- helpers .bulk (
187- self .client ,
188- processed_items ,
189- index = "stac_items" ,
190- doc_type = "_doc" ,
191- request_timeout = 200 ,
192- )
221+ # helpers.bulk(
222+ # self.client,
223+ # processed_items,
224+ # index="stac_items",
225+ # doc_type="_doc",
226+ # request_timeout=200,
227+ # )
228+
229+ def bulk_sync (processed_items ):
230+ actions = [
231+ {"_index" : "stac_items" , "_source" : item } for item in processed_items
232+ ]
233+
234+ helpers .bulk (self .client , actions )
235+
236+ bulk_sync (processed_items )
237+
193238 return return_msg
0 commit comments