Skip to content

Commit 3987f74

Browse files
authored
feat(kb): add abort capability and enum standardization for zero-downtime update (#274)
Because - Knowledge Base updates lack graceful cancellation - once started, updates must complete or fail, with no way to manually abort them when issues arise - Status values were represented as untyped strings across the system, causing inconsistencies between protobuf definitions, backend code, database storage, and test assertions - Files that haven't started processing (`NOTSTARTED`, `WAITING`) were incorrectly blocking KB synchronization, causing integration tests to timeout indefinitely - The system lacked comprehensive integration test coverage for update lifecycle edge cases and concurrent operations during updates This commit **Adds Abort API for Knowledge Base Updates** - **Introduces `AbortKnowledgeBaseUpdateAdmin` API** enabling administrators to gracefully cancel ongoing KB updates - **Cancels active Temporal workflows** and cleans up staging KB resources (both completed and incomplete files) - **Supports selective abort** - can abort specific catalogs by ID or all currently updating catalogs - **Sets KB status to `ABORTED`** allowing immediate re-upload after cancellation **Standardizes Enum Values Across the Stack** - **Replaces string-based status values** (`"updating"`, `"completed"`) with strongly-typed protobuf enum constants (`KNOWLEDGE_BASE_UPDATE_STATUS_UPDATING`, `KNOWLEDGE_BASE_UPDATE_STATUS_COMPLETED`) - **Adds database migration (000034)** to update all existing records to use standardized enum values - **Ensures consistency** between protobuf definitions, Go backend code (via `.String()`), database storage, and integration test assertions - **Introduces 10 well-documented lifecycle states**: UNSPECIFIED, NONE, UPDATING, SYNCING, VALIDATING, SWAPPING, COMPLETED, FAILED, ROLLED_BACK, ABORTED **Fixes Synchronization Logic** - **Excludes unstarted files** from synchronization wait - only actively processing files (`PROCESSING`, `CONVERTING`, `CHUNKING`, `EMBEDDING`) now block the swap - **Prevents indefinite hangs** when files are uploaded but not yet processed during the update lifecycle - **Enables proper KB locking** - after lock, no new processing starts, so unstarted files can be safely ignored **Adds Comprehensive Test Coverage** - **Group 5: 10 corner case tests** covering file operations during different update phases (adding/deleting during swap, race conditions, rapid operations, late-phase uploads) - **Updated all test assertions** to handle both legacy string format and new enum format for backward compatibility - **Fixed `pollUpdateCompletion` helper** to recognize new enum-based status values ## Architecture Highlights ### Zero-Downtime Update Strategy 1. **Phase 1-2 (UPDATING)**: Create staging KB, reprocess all files with new config 2. **Phase 3 (SYNCING)**: Lock KB, wait for active processing to complete (excluding unstarted files) 3. **Phase 4 (VALIDATING)**: Verify data integrity (file counts, embeddings, chunks) 4. **Phase 5 (SWAPPING)**: Atomic pointer swap - production → rollback, staging → production 5. **Phase 6 (COMPLETED)**: Cleanup, retain rollback KB for configured period ### Dual-Processing During Updates - Files uploaded during update are **processed to both production and staging KBs** - After swap, these files exist in correct collections without reprocessing - Files deleted during update are **removed from both KBs** (dual deletion) ### Graceful Abort - Cancels ongoing Temporal workflows via `workflowClient.CancelWorkflow()` - Cleans up staging KB resources (files, Milvus collection, metadata) - Production KB remains unchanged and immediately available for new updates - No orphaned resources or stuck workflows
1 parent efc589e commit 3987f74

File tree

82 files changed

+7401
-6603
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+7401
-6603
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ unit-test: ## Run unit test
9292
integration-test: ## Run integration tests in parallel using GNU parallel
9393
@echo "Running integration tests in parallel..."
9494
@parallel --halt now,fail=1 --tag --line-buffer \
95-
"TEST_FOLDER_ABS_PATH=${PWD} k6 run \
95+
"TEST_FOLDER_ABS_PATH=${PWD} k6 run --address=\"\" \
9696
-e API_GATEWAY_PROTOCOL=${API_GATEWAY_PROTOCOL} -e API_GATEWAY_URL=${API_GATEWAY_URL} \
9797
-e DB_HOST=${DB_HOST} \
9898
{} --no-usage-report" ::: \

cmd/worker/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ func main() {
168168

169169
// ===== CleanupKnowledgeBaseWorkflow Activities =====
170170
// Activities for cleaning up entire knowledge base resources
171+
w.RegisterActivity(cw.GetInProgressFileCountActivity) // Check for in-progress files before cleanup
171172
w.RegisterActivity(cw.DeleteKBFilesFromMinIOActivity) // Delete all KB files from MinIO
172173
w.RegisterActivity(cw.DropVectorDBCollectionActivity) // Drop Milvus collection for KB
173174
w.RegisterActivity(cw.DeleteKBFileRecordsActivity) // Delete all file records from database

config/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ database:
2626
timezone: Etc/UTC
2727
pool:
2828
idleconnections: 10
29-
maxconnections: 30
29+
maxconnections: 100
3030
connlifetime: 30m # In minutes, e.g., '60m'
3131
influxdb:
3232
url: http://influxdb:8086

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/google/go-cmp v0.7.0
1414
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
1515
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1
16-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20251020215452-9cefa8fe58b7
16+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20251021120500-309474e9c8c4
1717
github.com/instill-ai/usage-client v0.4.0
1818
github.com/instill-ai/x v0.10.1-alpha
1919
github.com/knadh/koanf v1.5.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
301301
github.com/hydrogen18/memlistener v0.0.0-20200120041712-dcc25e7acd91/go.mod h1:qEIFzExnS6016fRpRfxrExeVn2gbClQA99gQhnIcdhE=
302302
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
303303
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
304-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20251020215452-9cefa8fe58b7 h1:fPIHRjajavlqcTdOmjUEBkar8QKxQBS4LciLJnTv6as=
305-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20251020215452-9cefa8fe58b7/go.mod h1:bCnBosofpaUxKBuTTJM3/I3thAK37kvfBnKByjnLsl4=
304+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20251021120500-309474e9c8c4 h1:tEPHqGxFwKeHEm3OhZ+WLRRmQ9BY0i1u48VNO59N0oY=
305+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20251021120500-309474e9c8c4/go.mod h1:bCnBosofpaUxKBuTTJM3/I3thAK37kvfBnKByjnLsl4=
306306
github.com/instill-ai/usage-client v0.4.0 h1:xf1hAlO4a8lZwZzz9bprZOJqU3ghIcIsavUUB7UURyg=
307307
github.com/instill-ai/usage-client v0.4.0/go.mod h1:zZ9LRoXps2u63ARYPAbR2YvqTib3dWJLObZn+9YqhF0=
308308
github.com/instill-ai/x v0.10.1-alpha h1:15+SntJqoh1ab+QhS8OdoagyUMhR8NU4UV12Xt43qmo=

integration-test/const.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ if (__ENV.API_GATEWAY_PROTOCOL) {
1717
proto = "http"
1818
}
1919

20-
export const artifactPrivateHost = `http://artifact-backend:3082`;
21-
export const artifactPublicHost = apiGatewayMode ? `${proto}://${__ENV.API_GATEWAY_URL}` : `http://api-gateway:8080`
22-
export const mgmtPublicHost = apiGatewayMode ? `${proto}://${__ENV.API_GATEWAY_URL}` : `http://api-gateway:8080`
23-
export const artifactGRPCPrivateHost = `artifact-backend:3082`;
20+
export const artifactRESTPublicHost = apiGatewayMode ? `${proto}://${__ENV.API_GATEWAY_URL}` : `http://api-gateway:8080`
21+
export const mgmtRESTPublicHost = apiGatewayMode ? `${proto}://${__ENV.API_GATEWAY_URL}` : `http://api-gateway:8080`
22+
2423
export const artifactGRPCPublicHost = apiGatewayMode ? `${__ENV.API_GATEWAY_URL}` : `api-gateway:8080`;
24+
export const artifactGRPCPrivateHost = apiGatewayMode ? `localhost:3082` : `artifact-backend:3082`;
25+
2526
export const mgmtGRPCPublicHost = apiGatewayMode ? `${__ENV.API_GATEWAY_URL}` : `api-gateway:8080`;
26-
export const mgmtGRPCPrivateHost = `mgmt-backend:3084`;
27+
export const mgmtGRPCPrivateHost = apiGatewayMode ? `localhost:3084` : `mgmt-backend:3084`;
28+
2729
export const mgmtVersion = 'v1beta';
2830

2931
export const namespace = "users/admin"

integration-test/grpc-kb-update.js

Lines changed: 967 additions & 3038 deletions
Large diffs are not rendered by default.

integration-test/grpc-private.js

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@ export function CheckListRepositoryTags(client, data) {
88
check(true, { [constant.banner(groupName)]: () => true });
99

1010
var resOrigin = client.invoke(
11-
"artifact.artifact.v1alpha.ArtifactPrivateService/ListRepositoryTags",
11+
"artifact.artifact.v1alpha.ArtifactPrivateService/ListRepositoryTagsAdmin",
1212
{
1313
parent: `repositories/${data.expectedOwner.id}`,
1414
pageSize: 10
1515
},
1616
data.metadata
1717
);
1818
check(resOrigin, {
19-
"artifact.artifact.v1alpha.ArtifactPrivateService/ListRepositoryTags response status is StatusOK": (r) => r.status === grpc.StatusOK,
20-
"artifact.artifact.v1alpha.ArtifactPrivateService/ListRepositoryTags response tags is array": (r) =>
19+
"artifact.artifact.v1alpha.ArtifactPrivateService/ListRepositoryTagsAdmin response status is StatusOK": (r) => r.status === grpc.StatusOK,
20+
"artifact.artifact.v1alpha.ArtifactPrivateService/ListRepositoryTagsAdmin response tags is array": (r) =>
2121
Array.isArray(r.message.tags),
2222
});
2323
});
@@ -35,7 +35,7 @@ export function CheckGetRepositoryTag(client, data) {
3535
}
3636

3737
var res = client.invoke(
38-
"artifact.artifact.v1alpha.ArtifactPrivateService/GetRepositoryTag",
38+
"artifact.artifact.v1alpha.ArtifactPrivateService/GetRepositoryTagAdmin",
3939
{
4040
name: tagName,
4141
},
@@ -58,7 +58,7 @@ export function CheckCreateAndDeleteRepositoryTag(client, data) {
5858
const tagName = `repositories/${repoId}/tags/${tagId}`;
5959

6060
var createRes = client.invoke(
61-
"artifact.artifact.v1alpha.ArtifactPrivateService/CreateRepositoryTag",
61+
"artifact.artifact.v1alpha.ArtifactPrivateService/CreateRepositoryTagAdmin",
6262
{
6363
tag: {
6464
name: tagName,
@@ -72,7 +72,7 @@ export function CheckCreateAndDeleteRepositoryTag(client, data) {
7272
});
7373

7474
var deleteRes = client.invoke(
75-
"artifact.artifact.v1alpha.ArtifactPrivateService/DeleteRepositoryTag",
75+
"artifact.artifact.v1alpha.ArtifactPrivateService/DeleteRepositoryTagAdmin",
7676
{
7777
name: tagName,
7878
},
@@ -94,7 +94,7 @@ export function CheckGetObject(client, data) {
9494
return;
9595
}
9696
var res = client.invoke(
97-
"artifact.artifact.v1alpha.ArtifactPrivateService/GetObject",
97+
"artifact.artifact.v1alpha.ArtifactPrivateService/GetObjectAdmin",
9898
{ uid: uid },
9999
data.metadata
100100
);
@@ -116,7 +116,7 @@ export function CheckGetObjectURL(client, data) {
116116
return;
117117
}
118118
var res = client.invoke(
119-
"artifact.artifact.v1alpha.ArtifactPrivateService/GetObjectURL",
119+
"artifact.artifact.v1alpha.ArtifactPrivateService/GetObjectURLAdmin",
120120
uid ? { uid: uid } : { encodedUrlPath: encoded },
121121
data.metadata
122122
);
@@ -136,7 +136,7 @@ export function CheckUpdateObject(client, data) {
136136
return;
137137
}
138138
var res = client.invoke(
139-
"artifact.artifact.v1alpha.ArtifactPrivateService/UpdateObject",
139+
"artifact.artifact.v1alpha.ArtifactPrivateService/UpdateObjectAdmin",
140140
{ uid: uid, isUploaded: true },
141141
data.metadata
142142
);
@@ -156,7 +156,7 @@ export function CheckGetFileAsMarkdown(client, data) {
156156
return;
157157
}
158158
var res = client.invoke(
159-
"artifact.artifact.v1alpha.ArtifactPrivateService/GetFileAsMarkdown",
159+
"artifact.artifact.v1alpha.ArtifactPrivateService/GetFileAsMarkdownAdmin",
160160
{ fileUid: fileUid },
161161
data.metadata
162162
);

integration-test/grpc-public.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ export function CheckCleanupOnFileDeletion(client, data) {
218218
const checkAfter = `
219219
SELECT
220220
(SELECT COUNT(*) FROM converted_file WHERE file_uid = '${file.fileUid}') as converted,
221-
(SELECT COUNT(*) FROM text_chunk WHERE kb_file_uid = '${file.fileUid}') as chunks,
222-
(SELECT COUNT(*) FROM embedding WHERE kb_file_uid = '${file.fileUid}') as embeddings
221+
(SELECT COUNT(*) FROM text_chunk WHERE file_uid = '${file.fileUid}') as chunks,
222+
(SELECT COUNT(*) FROM embedding WHERE file_uid = '${file.fileUid}') as embeddings
223223
`;
224224
try {
225225
const result = constant.db.exec(checkAfter);

integration-test/grpc.js

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import http from "k6/http";
33

44
import { check, group } from "k6";
55

6-
import * as catalogPrivate from "./grpc-private.js";
6+
import * as grpcPrivate from "./grpc-private.js";
77
import * as grpcPublic from "./grpc-public.js";
88
import * as grpcPublicWithJwt from "./grpc-public-with-jwt.js";
99

@@ -39,22 +39,22 @@ export function setup() {
3939

4040
// Clean up any leftover test data from previous runs
4141
try {
42-
constant.db.exec(`DELETE FROM text_chunk WHERE kb_file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
43-
constant.db.exec(`DELETE FROM embedding WHERE kb_file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
42+
constant.db.exec(`DELETE FROM text_chunk WHERE file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
43+
constant.db.exec(`DELETE FROM embedding WHERE file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
4444
constant.db.exec(`DELETE FROM converted_file WHERE file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
4545
constant.db.exec(`DELETE FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%'`);
4646
constant.db.exec(`DELETE FROM knowledge_base WHERE id LIKE '${constant.dbIDPrefix}%'`);
4747
} catch (e) {
4848
console.log(`Setup cleanup warning: ${e}`);
4949
}
5050

51-
var loginResp = http.request("POST", `${constant.mgmtPublicHost}/v1beta/auth/login`, JSON.stringify({
51+
var loginResp = http.request("POST", `${constant.mgmtRESTPublicHost}/v1beta/auth/login`, JSON.stringify({
5252
"username": constant.defaultUsername,
5353
"password": constant.defaultPassword,
5454
}))
5555

5656
check(loginResp, {
57-
[`POST ${constant.mgmtPublicHost}/v1beta/auth/login response status is 200`]: (
57+
[`POST ${constant.mgmtRESTPublicHost}/v1beta/auth/login response status is 200`]: (
5858
r
5959
) => r.status === 200,
6060
});
@@ -115,13 +115,13 @@ export default function (data) {
115115
privateClient.connect(constant.artifactGRPCPrivateHost, {
116116
plaintext: true,
117117
});
118-
catalogPrivate.CheckListRepositoryTags(privateClient, data);
119-
catalogPrivate.CheckGetRepositoryTag(privateClient, data);
120-
catalogPrivate.CheckCreateAndDeleteRepositoryTag(privateClient, data);
121-
catalogPrivate.CheckGetObject(privateClient, data);
122-
catalogPrivate.CheckGetObjectURL(privateClient, data);
123-
catalogPrivate.CheckUpdateObject(privateClient, data);
124-
catalogPrivate.CheckGetFileAsMarkdown(privateClient, data);
118+
grpcPrivate.CheckListRepositoryTags(privateClient, data);
119+
grpcPrivate.CheckGetRepositoryTag(privateClient, data);
120+
grpcPrivate.CheckCreateAndDeleteRepositoryTag(privateClient, data);
121+
grpcPrivate.CheckGetObject(privateClient, data);
122+
grpcPrivate.CheckGetObjectURL(privateClient, data);
123+
grpcPrivate.CheckUpdateObject(privateClient, data);
124+
grpcPrivate.CheckGetFileAsMarkdown(privateClient, data);
125125
privateClient.close();
126126
}
127127

@@ -156,8 +156,8 @@ export function teardown(data) {
156156
check(true, { [constant.banner(groupName)]: () => true });
157157

158158
// Delete from child tables first, before deleting parent records
159-
constant.db.exec(`DELETE FROM text_chunk WHERE kb_file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
160-
constant.db.exec(`DELETE FROM embedding WHERE kb_file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
159+
constant.db.exec(`DELETE FROM text_chunk WHERE file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
160+
constant.db.exec(`DELETE FROM embedding WHERE file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
161161
constant.db.exec(`DELETE FROM converted_file WHERE file_uid IN (SELECT uid FROM knowledge_base_file WHERE name LIKE '${constant.dbIDPrefix}%')`);
162162

163163
// Now delete parent tables

0 commit comments

Comments
 (0)