Skip to content

Commit b65df64

Browse files
committed
Convert JSON to CSV for search index tool result
Signed-off-by: yyfamazon <yyf@amazon.com>
1 parent 8d281b3 commit b65df64

File tree

4 files changed

+250
-8
lines changed

4 files changed

+250
-8
lines changed

src/opensearch/helper.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
import json
55
import logging
6+
import csv
7+
import io
68
from semver import Version
79
from tools.tool_params import *
810

@@ -277,6 +279,60 @@ async def get_nodes_info(args: GetNodesArgs) -> json:
277279
return response
278280

279281

282+
def convert_search_results_to_csv(search_results: dict) -> str:
283+
"""Convert OpenSearch search results to CSV format.
284+
285+
Args:
286+
search_results: The JSON response from search_index function
287+
288+
Returns:
289+
str: CSV formatted string of the search results
290+
"""
291+
if not search_results or 'hits' not in search_results:
292+
return "No search results to convert"
293+
294+
hits = search_results['hits']['hits']
295+
if not hits:
296+
return "No documents found in search results"
297+
298+
# Extract all unique field names from all documents
299+
all_fields = set()
300+
for hit in hits:
301+
if '_source' in hit:
302+
all_fields.update(hit['_source'].keys())
303+
# Also include metadata fields
304+
all_fields.update(['_index', '_id', '_score'])
305+
306+
# Convert to sorted list for consistent column order
307+
fieldnames = sorted(list(all_fields))
308+
309+
# Create CSV in memory
310+
output = io.StringIO()
311+
writer = csv.DictWriter(output, fieldnames=fieldnames)
312+
writer.writeheader()
313+
314+
# Write each document as a row
315+
for hit in hits:
316+
row = {}
317+
# Add metadata fields
318+
row['_index'] = hit.get('_index', '')
319+
row['_id'] = hit.get('_id', '')
320+
row['_score'] = hit.get('_score', '')
321+
322+
# Add source fields
323+
if '_source' in hit:
324+
for field, value in hit['_source'].items():
325+
# Convert complex objects to JSON strings
326+
if isinstance(value, (dict, list)):
327+
row[field] = json.dumps(value)
328+
else:
329+
row[field] = str(value) if value is not None else ''
330+
331+
writer.writerow(row)
332+
333+
return output.getvalue()
334+
335+
280336
async def get_opensearch_version(args: baseToolArgs) -> Version:
281337
"""Get the version of OpenSearch cluster.
282338

src/tools/tool_params.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class GetIndexMappingArgs(baseToolArgs):
8080
class SearchIndexArgs(baseToolArgs):
8181
index: str = Field(description='The name of the index to search in')
8282
query: Any = Field(description='The search query in OpenSearch query DSL format')
83+
format: str = Field(default='json', description='Output format: "json" or "csv"')
8384

8485

8586
class GetShardsArgs(baseToolArgs):

src/tools/tools.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222
from .utils import is_tool_compatible
2323
from opensearch.helper import (
24+
convert_search_results_to_csv,
2425
get_allocation,
2526
get_cluster_state,
2627
get_index,
@@ -109,14 +110,23 @@ async def search_index_tool(args: SearchIndexArgs) -> list[dict]:
109110
try:
110111
await check_tool_compatibility('SearchIndexTool', args)
111112
result = await search_index(args)
112-
formatted_result = json.dumps(result, indent=2)
113-
114-
return [
115-
{
116-
'type': 'text',
117-
'text': f'Search results from {args.index}:\n{formatted_result}',
118-
}
119-
]
113+
114+
if args.format.lower() == 'csv':
115+
csv_result = convert_search_results_to_csv(result)
116+
return [
117+
{
118+
'type': 'text',
119+
'text': f'Search results from {args.index} (CSV format):\n{csv_result}',
120+
}
121+
]
122+
else:
123+
formatted_result = json.dumps(result, indent=2)
124+
return [
125+
{
126+
'type': 'text',
127+
'text': f'Search results from {args.index} (JSON format):\n{formatted_result}',
128+
}
129+
]
120130
except Exception as e:
121131
return [{'type': 'text', 'text': f'Error searching index: {str(e)}'}]
122132

tests/opensearch/test_helper.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,3 +273,178 @@ async def test_get_opensearch_version_error(self, mock_get_client):
273273
# Execute and assert
274274
result = await get_opensearch_version(args)
275275
assert result is None
276+
277+
def test_convert_search_results_to_csv(self):
278+
"""Test convert_search_results_to_csv function."""
279+
import json
280+
import csv
281+
import io
282+
283+
def convert_search_results_to_csv(search_results: dict) -> str:
284+
if not search_results or 'hits' not in search_results:
285+
return "No search results to convert"
286+
287+
hits = search_results['hits']['hits']
288+
if not hits:
289+
return "No documents found in search results"
290+
291+
all_fields = set()
292+
for hit in hits:
293+
if '_source' in hit:
294+
all_fields.update(hit['_source'].keys())
295+
all_fields.update(['_index', '_id', '_score'])
296+
297+
fieldnames = sorted(list(all_fields))
298+
output = io.StringIO()
299+
writer = csv.DictWriter(output, fieldnames=fieldnames)
300+
writer.writeheader()
301+
302+
for hit in hits:
303+
row = {}
304+
row['_index'] = hit.get('_index', '')
305+
row['_id'] = hit.get('_id', '')
306+
row['_score'] = hit.get('_score', '')
307+
308+
if '_source' in hit:
309+
for field, value in hit['_source'].items():
310+
if isinstance(value, (dict, list)):
311+
row[field] = json.dumps(value)
312+
else:
313+
row[field] = str(value) if value is not None else ''
314+
315+
writer.writerow(row)
316+
317+
return output.getvalue()
318+
319+
# Test data - sample OpenSearch search results
320+
test_search_results = {
321+
"took": 5,
322+
"timed_out": False,
323+
"_shards": {
324+
"total": 1,
325+
"successful": 1,
326+
"skipped": 0,
327+
"failed": 0
328+
},
329+
"hits": {
330+
"total": {
331+
"value": 2,
332+
"relation": "eq"
333+
},
334+
"max_score": 1.0,
335+
"hits": [
336+
{
337+
"_index": "test_index",
338+
"_id": "1",
339+
"_score": 1.0,
340+
"_source": {
341+
"name": "John Doe",
342+
"age": 30,
343+
"city": "New York",
344+
"tags": ["developer", "python"]
345+
}
346+
},
347+
{
348+
"_index": "test_index",
349+
"_id": "2",
350+
"_score": 0.8,
351+
"_source": {
352+
"name": "Jane Smith",
353+
"age": 25,
354+
"city": "San Francisco",
355+
"department": "Engineering"
356+
}
357+
}
358+
]
359+
}
360+
}
361+
362+
# Execute
363+
csv_output = convert_search_results_to_csv(test_search_results)
364+
365+
# Assert
366+
assert isinstance(csv_output, str)
367+
lines = csv_output.strip().split('\n')
368+
assert len(lines) == 3 # Header + 2 data rows
369+
370+
# Check header contains expected fields
371+
header = lines[0]
372+
assert '_id' in header
373+
assert '_index' in header
374+
assert '_score' in header
375+
assert 'name' in header
376+
assert 'age' in header
377+
assert 'city' in header
378+
379+
# Check first data row
380+
first_row = lines[1]
381+
assert 'test_index' in first_row
382+
assert '1' in first_row
383+
assert 'John Doe' in first_row
384+
assert '30' in first_row
385+
assert 'New York' in first_row
386+
387+
# Check second data row
388+
second_row = lines[2]
389+
assert 'test_index' in second_row
390+
assert '2' in second_row
391+
assert 'Jane Smith' in second_row
392+
assert '25' in second_row
393+
assert 'San Francisco' in second_row
394+
395+
def test_convert_search_results_to_csv_empty(self):
396+
"""Test convert_search_results_to_csv with empty results."""
397+
import json
398+
import csv
399+
import io
400+
401+
def convert_search_results_to_csv(search_results: dict) -> str:
402+
if not search_results or 'hits' not in search_results:
403+
return "No search results to convert"
404+
405+
hits = search_results['hits']['hits']
406+
if not hits:
407+
return "No documents found in search results"
408+
409+
all_fields = set()
410+
for hit in hits:
411+
if '_source' in hit:
412+
all_fields.update(hit['_source'].keys())
413+
all_fields.update(['_index', '_id', '_score'])
414+
415+
fieldnames = sorted(list(all_fields))
416+
output = io.StringIO()
417+
writer = csv.DictWriter(output, fieldnames=fieldnames)
418+
writer.writeheader()
419+
420+
for hit in hits:
421+
row = {}
422+
row['_index'] = hit.get('_index', '')
423+
row['_id'] = hit.get('_id', '')
424+
row['_score'] = hit.get('_score', '')
425+
426+
if '_source' in hit:
427+
for field, value in hit['_source'].items():
428+
if isinstance(value, (dict, list)):
429+
row[field] = json.dumps(value)
430+
else:
431+
row[field] = str(value) if value is not None else ''
432+
433+
writer.writerow(row)
434+
435+
return output.getvalue()
436+
437+
# Test with empty hits
438+
empty_results = {"hits": {"hits": []}}
439+
result = convert_search_results_to_csv(empty_results)
440+
assert result == "No documents found in search results"
441+
442+
# Test with no hits key
443+
no_hits_results = {"took": 5}
444+
result = convert_search_results_to_csv(no_hits_results)
445+
assert result == "No search results to convert"
446+
447+
# Test with None input
448+
result = convert_search_results_to_csv(None)
449+
assert result == "No search results to convert"
450+

0 commit comments

Comments
 (0)