@@ -273,3 +273,178 @@ async def test_get_opensearch_version_error(self, mock_get_client):
273273 # Execute and assert
274274 result = await get_opensearch_version (args )
275275 assert result is None
276+
277+ def test_convert_search_results_to_csv (self ):
278+ """Test convert_search_results_to_csv function."""
279+ import json
280+ import csv
281+ import io
282+
283+ def convert_search_results_to_csv (search_results : dict ) -> str :
284+ if not search_results or 'hits' not in search_results :
285+ return "No search results to convert"
286+
287+ hits = search_results ['hits' ]['hits' ]
288+ if not hits :
289+ return "No documents found in search results"
290+
291+ all_fields = set ()
292+ for hit in hits :
293+ if '_source' in hit :
294+ all_fields .update (hit ['_source' ].keys ())
295+ all_fields .update (['_index' , '_id' , '_score' ])
296+
297+ fieldnames = sorted (list (all_fields ))
298+ output = io .StringIO ()
299+ writer = csv .DictWriter (output , fieldnames = fieldnames )
300+ writer .writeheader ()
301+
302+ for hit in hits :
303+ row = {}
304+ row ['_index' ] = hit .get ('_index' , '' )
305+ row ['_id' ] = hit .get ('_id' , '' )
306+ row ['_score' ] = hit .get ('_score' , '' )
307+
308+ if '_source' in hit :
309+ for field , value in hit ['_source' ].items ():
310+ if isinstance (value , (dict , list )):
311+ row [field ] = json .dumps (value )
312+ else :
313+ row [field ] = str (value ) if value is not None else ''
314+
315+ writer .writerow (row )
316+
317+ return output .getvalue ()
318+
319+ # Test data - sample OpenSearch search results
320+ test_search_results = {
321+ "took" : 5 ,
322+ "timed_out" : False ,
323+ "_shards" : {
324+ "total" : 1 ,
325+ "successful" : 1 ,
326+ "skipped" : 0 ,
327+ "failed" : 0
328+ },
329+ "hits" : {
330+ "total" : {
331+ "value" : 2 ,
332+ "relation" : "eq"
333+ },
334+ "max_score" : 1.0 ,
335+ "hits" : [
336+ {
337+ "_index" : "test_index" ,
338+ "_id" : "1" ,
339+ "_score" : 1.0 ,
340+ "_source" : {
341+ "name" : "John Doe" ,
342+ "age" : 30 ,
343+ "city" : "New York" ,
344+ "tags" : ["developer" , "python" ]
345+ }
346+ },
347+ {
348+ "_index" : "test_index" ,
349+ "_id" : "2" ,
350+ "_score" : 0.8 ,
351+ "_source" : {
352+ "name" : "Jane Smith" ,
353+ "age" : 25 ,
354+ "city" : "San Francisco" ,
355+ "department" : "Engineering"
356+ }
357+ }
358+ ]
359+ }
360+ }
361+
362+ # Execute
363+ csv_output = convert_search_results_to_csv (test_search_results )
364+
365+ # Assert
366+ assert isinstance (csv_output , str )
367+ lines = csv_output .strip ().split ('\n ' )
368+ assert len (lines ) == 3 # Header + 2 data rows
369+
370+ # Check header contains expected fields
371+ header = lines [0 ]
372+ assert '_id' in header
373+ assert '_index' in header
374+ assert '_score' in header
375+ assert 'name' in header
376+ assert 'age' in header
377+ assert 'city' in header
378+
379+ # Check first data row
380+ first_row = lines [1 ]
381+ assert 'test_index' in first_row
382+ assert '1' in first_row
383+ assert 'John Doe' in first_row
384+ assert '30' in first_row
385+ assert 'New York' in first_row
386+
387+ # Check second data row
388+ second_row = lines [2 ]
389+ assert 'test_index' in second_row
390+ assert '2' in second_row
391+ assert 'Jane Smith' in second_row
392+ assert '25' in second_row
393+ assert 'San Francisco' in second_row
394+
395+ def test_convert_search_results_to_csv_empty (self ):
396+ """Test convert_search_results_to_csv with empty results."""
397+ import json
398+ import csv
399+ import io
400+
401+ def convert_search_results_to_csv (search_results : dict ) -> str :
402+ if not search_results or 'hits' not in search_results :
403+ return "No search results to convert"
404+
405+ hits = search_results ['hits' ]['hits' ]
406+ if not hits :
407+ return "No documents found in search results"
408+
409+ all_fields = set ()
410+ for hit in hits :
411+ if '_source' in hit :
412+ all_fields .update (hit ['_source' ].keys ())
413+ all_fields .update (['_index' , '_id' , '_score' ])
414+
415+ fieldnames = sorted (list (all_fields ))
416+ output = io .StringIO ()
417+ writer = csv .DictWriter (output , fieldnames = fieldnames )
418+ writer .writeheader ()
419+
420+ for hit in hits :
421+ row = {}
422+ row ['_index' ] = hit .get ('_index' , '' )
423+ row ['_id' ] = hit .get ('_id' , '' )
424+ row ['_score' ] = hit .get ('_score' , '' )
425+
426+ if '_source' in hit :
427+ for field , value in hit ['_source' ].items ():
428+ if isinstance (value , (dict , list )):
429+ row [field ] = json .dumps (value )
430+ else :
431+ row [field ] = str (value ) if value is not None else ''
432+
433+ writer .writerow (row )
434+
435+ return output .getvalue ()
436+
437+ # Test with empty hits
438+ empty_results = {"hits" : {"hits" : []}}
439+ result = convert_search_results_to_csv (empty_results )
440+ assert result == "No documents found in search results"
441+
442+ # Test with no hits key
443+ no_hits_results = {"took" : 5 }
444+ result = convert_search_results_to_csv (no_hits_results )
445+ assert result == "No search results to convert"
446+
447+ # Test with None input
448+ result = convert_search_results_to_csv (None )
449+ assert result == "No search results to convert"
450+
0 commit comments