Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export const DataImporterPluginApp = ({
selectedDataSourceId: dataSourceId,
});
setIsLoadingPreview(false);

if (response) {
setFilePreviewData(response);
notifications.toasts.addSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const PreviewComponent = ({
existingMapping,
}: PreviewComponentProps) => {
const [searchQuery, setSearchQuery] = useState('');
previewData = previewData.flat();
const totalRows = previewData?.length;
const loadedRows = Math.min(visibleRows, totalRows);

Expand Down
36 changes: 31 additions & 5 deletions src/plugins/data_importer/server/routes/import_file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,44 @@ export function importFileRoute(

const file = request.body.file as FileStream;

// Batching logic for large CSV uploads using bulk API
try {
const message = await processor.ingestFile(file, {
indexName: request.query.indexName,
client,
// Parse the file into rows (assume processor.parseFile returns an array of objects)
// Use a large previewCount to get all rows
const previewCount = Number.MAX_SAFE_INTEGER;
const rows = await processor.parseFile(file, previewCount, {
delimiter: request.query.delimiter,
dataSourceId: request.query.dataSource,
});
const flatRows = Array.isArray(rows[0]) ? rows.flat() : rows;

const BATCH_SIZE = 1000;
const failedRows: any[] = [];
for (let i = 0; i < flatRows.length; i += BATCH_SIZE) {
const batch = flatRows.slice(i, i + BATCH_SIZE);
// Build bulk body
const bulkBody = [];
for (const row of batch) {
bulkBody.push({ index: { _index: request.query.indexName } });
bulkBody.push(row);
}
const bulkResponse = await client.bulk({ body: bulkBody });
// Collect failed rows
if (bulkResponse.body.errors) {
bulkResponse.body.items.forEach((item: any, idx: number) => {
if (item.index && item.index.error) {
failedRows.push(batch[idx]);
}
});
}
}
const message = {
failedRows,
totalRows: rows.length,
};
return response.ok({
body: {
message,
success: message.failedRows.length < 1,
success: failedRows.length < 1,
},
});
} catch (e) {
Expand Down
Loading