@@ -124,7 +124,7 @@ def upsert_data(self, via_cql=False):
124124 data_path = namespace_meta ["data_path" ]
125125 final_data_path = self .get_final_data_path (data_path )
126126 new_index_name = index_name + (
127- f' _{ namespace_meta [" namespace" ] } '
127+ f" _{ namespace_meta [' namespace' ] } "
128128 if namespace_meta ["namespace" ]
129129 else ""
130130 )
@@ -162,7 +162,7 @@ def upsert_data(self, via_cql=False):
162162
163163 self .session .execute (
164164 f"CREATE TABLE IF NOT EXISTS { self .args ['keyspace' ]} .{ new_index_name } "
165- f" (id text PRIMARY KEY, \ " $vector\ " vector<float,{ namespace_meta [' dimensions' ]} >)"
165+ f' (id text PRIMARY KEY, "$vector" vector<float,{ namespace_meta [" dimensions" ]} >)'
166166 )
167167 parquet_files = self .get_parquet_files (final_data_path )
168168 vectors = {}
@@ -208,7 +208,7 @@ def flush_to_db(self, vectors, metadata, collection, via_cql, parallel=True):
208208 keys = list (set (vectors .keys ()).union (set (metadata .keys ())))
209209 for id in keys :
210210 self .session .execute (
211- f" INSERT INTO { self .args [' keyspace' ]} .{ collection .name } (id, \ " $vector\ " , { ', ' .join (metadata [id ].keys ())} ) "
211+ f' INSERT INTO { self .args [" keyspace" ]} .{ collection .name } (id, "$vector", { ", " .join (metadata [id ].keys ())} ) '
212212 f"VALUES ('{ id } ', { vectors [id ]} , { ', ' .join ([str (v ) for v in metadata [id ].values ()])} )"
213213 )
214214 return len (vectors )
@@ -248,12 +248,15 @@ def flush_batch_to_db(collection, keys, vectors, metadata):
248248 for i in range (0 , total_points , BATCH_SIZE )
249249 ]
250250
251- with concurrent .futures .ThreadPoolExecutor (
252- max_workers = num_parallel_threads
253- ) as executor , tqdm (
254- total = total_points ,
255- desc = f"Flushing to DB in batches of { BATCH_SIZE } in { num_parallel_threads } threads" ,
256- ) as pbar :
251+ with (
252+ concurrent .futures .ThreadPoolExecutor (
253+ max_workers = num_parallel_threads
254+ ) as executor ,
255+ tqdm (
256+ total = total_points ,
257+ desc = f"Flushing to DB in batches of { BATCH_SIZE } in { num_parallel_threads } threads" ,
258+ ) as pbar ,
259+ ):
257260 future_to_batch = {
258261 executor .submit (flush_batch_to_db , collection , * batch ): batch
259262 for batch in batches
0 commit comments