Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 37 additions & 105 deletions frontend/components/knowledge-dropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import {
import { useRouter } from "next/navigation";
import { useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery";
import { useUploadIngest } from "@/app/api/mutations/useUploadIngest";
import { useUploadPath } from "@/app/api/mutations/useUploadPath";
import { useUploadBucket } from "@/app/api/mutations/useUploadBucket";
import { DuplicateHandlingDialog } from "@/components/duplicate-handling-dialog";
import { Button } from "@/components/ui/button";
import {
Expand All @@ -30,8 +32,10 @@ import type { File as SearchFile } from "@/src/app/api/queries/useGetSearchQuery

export function KnowledgeDropdown() {
const { addTask } = useTask();
const { refetch: refetchTasks } = useGetTasksQuery();
const queryClient = useQueryClient();
const uploadIngestMutation = useUploadIngest();
const uploadPathMutation = useUploadPath();
const uploadBucketMutation = useUploadBucket();
const router = useRouter();
const [isOpen, setIsOpen] = useState(false);
const [showFolderDialog, setShowFolderDialog] = useState(false);
Expand Down Expand Up @@ -234,60 +238,22 @@ export function KnowledgeDropdown() {
);

try {
const formData = new FormData();
formData.append("file", file);
formData.append("replace_duplicates", replace.toString());

// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
const result = await uploadIngestMutation.mutateAsync({
file,
replaceDuplicates: replace,
});

const uploadIngestJson = await uploadIngestRes.json();

if (!uploadIngestRes.ok) {
throw new Error(uploadIngestJson?.error || "Upload and ingest failed");
}

// Extract results from the response - handle both unified and simple formats
const fileId =
uploadIngestJson?.upload?.id ||
uploadIngestJson?.id ||
uploadIngestJson?.task_id;
result?.upload?.id ||
result?.id ||
result?.task_id;
const filePath =
uploadIngestJson?.upload?.path || uploadIngestJson?.path || "uploaded";
const runJson = uploadIngestJson?.ingestion;
const deleteResult = uploadIngestJson?.deletion;
console.log("c", uploadIngestJson);
if (!fileId) {
throw new Error("Upload successful but no file id returned");
}
// Check if ingestion actually succeeded
if (
runJson &&
runJson.status !== "COMPLETED" &&
runJson.status !== "SUCCESS"
) {
const errorMsg = runJson.error || "Ingestion pipeline failed";
throw new Error(
`Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`
);
}
// Log deletion status if provided
if (deleteResult) {
if (deleteResult.status === "deleted") {
console.log(
"File successfully cleaned up from Langflow:",
deleteResult.file_id
);
} else if (deleteResult.status === "delete_failed") {
console.warn(
"Failed to cleanup file from Langflow:",
deleteResult.error
);
}
}
result?.upload?.path || result?.path || "uploaded";
const runJson = result?.ingestion;
const deleteResult = result?.deletion;
console.log("c", result);

// Notify UI
window.dispatchEvent(
new CustomEvent("fileUploaded", {
Expand All @@ -303,8 +269,6 @@ export function KnowledgeDropdown() {
},
})
);

refetchTasks();
} catch (error) {
window.dispatchEvent(
new CustomEvent("fileUploadError", {
Expand Down Expand Up @@ -344,41 +308,23 @@ export function KnowledgeDropdown() {
setShowFolderDialog(false);

try {
const response = await fetch("/api/upload_path", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ path: folderPath }),
const result = await uploadPathMutation.mutateAsync({
path: folderPath,
});

const result = await response.json();

if (response.status === 201) {
if (result.status === 201) {
const taskId = result.task_id || result.id;

if (!taskId) {
throw new Error("No task ID received from server");
}

addTask(taskId);
setFolderPath("");
// Refetch tasks to show the new task
refetchTasks();
} else if (response.ok) {
setFolderPath("");
// Refetch tasks even for direct uploads in case tasks were created
refetchTasks();
} else {
console.error("Folder upload failed:", result.error);
if (response.status === 400) {
toast.error("Upload failed", {
description: result.error || "Bad request",
});
if (taskId) {
addTask(taskId);
}
}

setFolderPath("");
} catch (error) {
console.error("Folder upload error:", error);
toast.error("Upload failed", {
description: error instanceof Error ? error.message : "Unknown error",
});
} finally {
setFolderLoading(false);
}
Expand All @@ -391,37 +337,23 @@ export function KnowledgeDropdown() {
setShowS3Dialog(false);

try {
const response = await fetch("/api/upload_bucket", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ s3_url: bucketUrl }),
const result = await uploadBucketMutation.mutateAsync({
s3Url: bucketUrl,
});

const result = await response.json();

if (response.status === 201) {
if (result.status === 201) {
const taskId = result.task_id || result.id;

if (!taskId) {
throw new Error("No task ID received from server");
}

addTask(taskId);
setBucketUrl("s3://");
// Refetch tasks to show the new task
refetchTasks();
} else {
console.error("S3 upload failed:", result.error);
if (response.status === 400) {
toast.error("Upload failed", {
description: result.error || "Bad request",
});
if (taskId) {
addTask(taskId);
}
}

setBucketUrl("s3://");
} catch (error) {
console.error("S3 upload error:", error);
toast.error("Upload failed", {
description: error instanceof Error ? error.message : "Unknown error",
});
} finally {
setS3Loading(false);
}
Expand Down
60 changes: 60 additions & 0 deletions frontend/src/app/api/mutations/useUploadBucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"use client";

import { useMutation, useQueryClient } from "@tanstack/react-query";

export interface UploadBucketRequest {
s3Url: string;
}

export interface UploadBucketResponse {
task_id?: string;
id?: string;
error?: string;
status?: number;
}

const uploadBucket = async (
data: UploadBucketRequest
): Promise<UploadBucketResponse> => {
const response = await fetch("/api/upload_bucket", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ s3_url: data.s3Url }),
});

const result = await response.json();

// Handle different response statuses
if (response.status === 201) {
const taskId = result.task_id || result.id;

if (!taskId) {
throw new Error("No task ID received from server");
}

return { ...result, status: response.status };
} else if (response.ok) {
// Success but not 201
return { ...result, status: response.status };
} else {
// Error response
if (response.status === 400) {
throw new Error(result.error || "Bad request");
}
throw new Error(result.error || "S3 upload failed");
}
};

export const useUploadBucket = () => {
const queryClient = useQueryClient();

return useMutation({
mutationFn: uploadBucket,
onSettled: () => {
// Invalidate tasks query to refresh the UI
queryClient.invalidateQueries({ queryKey: ["tasks"] });
},
});
};
97 changes: 97 additions & 0 deletions frontend/src/app/api/mutations/useUploadIngest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"use client";

import { useMutation, useQueryClient } from "@tanstack/react-query";

export interface UploadIngestRequest {
file: File;
replaceDuplicates: boolean;
}

export interface UploadIngestResponse {
upload?: {
id?: string;
path?: string;
};
id?: string;
task_id?: string;
path?: string;
ingestion?: {
status: string;
error?: string;
};
deletion?: {
status: string;
file_id?: string;
error?: string;
};
error?: string;
}

const uploadIngest = async (
data: UploadIngestRequest
): Promise<UploadIngestResponse> => {
const formData = new FormData();
formData.append("file", data.file);
formData.append("replace_duplicates", data.replaceDuplicates.toString());

const response = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
});

const json = await response.json();

if (!response.ok) {
throw new Error(json?.error || "Upload and ingest failed");
}

// Extract results from the response
const fileId = json?.upload?.id || json?.id || json?.task_id;

if (!fileId) {
throw new Error("Upload successful but no file id returned");
}

// Check if ingestion actually succeeded
const runJson = json?.ingestion;
if (
runJson &&
runJson.status !== "COMPLETED" &&
runJson.status !== "SUCCESS"
) {
const errorMsg = runJson.error || "Ingestion pipeline failed";
throw new Error(
`Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`
);
}

return json;
};

export const useUploadIngest = () => {
const queryClient = useQueryClient();

return useMutation({
mutationFn: uploadIngest,
onSettled: (data) => {
// Log deletion status if provided
const deleteResult = data?.deletion;
if (deleteResult) {
if (deleteResult.status === "deleted") {
console.log(
"File successfully cleaned up from Langflow:",
deleteResult.file_id
);
} else if (deleteResult.status === "delete_failed") {
console.warn(
"Failed to cleanup file from Langflow:",
deleteResult.error
);
}
}

// Invalidate tasks query to refresh the UI
queryClient.invalidateQueries({ queryKey: ["tasks"] });
},
});
};
Loading