Skip to content

Commit b1c734d

Browse files
authored
refactor(artifact): support blob upload/download for catalog file (#157)
Because - catalog file has not adopt blob upload/download This commit - support blob upload/download for catalog file
1 parent e620fc6 commit b1c734d

File tree

6 files changed

+127
-32
lines changed

6 files changed

+127
-32
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
1414
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
1515
github.com/influxdata/influxdb-client-go/v2 v2.12.3
16-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20250219010801-09f3eb30f063
16+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20250225000224-8cd79b606322
1717
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61
1818
github.com/instill-ai/x v0.6.0-alpha.0.20250220113648-be48bc78368d
1919
github.com/knadh/koanf v1.5.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
350350
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
351351
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
352352
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
353-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20250219010801-09f3eb30f063 h1:zf6H3XmrHiVRhsYw44oeDVh4ATS0T6HchWWXBFKF0a0=
354-
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20250219010801-09f3eb30f063/go.mod h1:fusT92ceR5+GVn1LT5mT4XcOq1DlemBjpb6JpodLLdc=
353+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20250225000224-8cd79b606322 h1:0D/hXTTQ8sMAZ2zxmXCf1wN5ZDQDWTbq+2vN4P63Be8=
354+
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20250225000224-8cd79b606322/go.mod h1:fusT92ceR5+GVn1LT5mT4XcOq1DlemBjpb6JpodLLdc=
355355
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE=
356356
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61/go.mod h1:/TAHs4ybuylk5icuy+MQtHRc4XUnIyXzeNKxX9qDFhw=
357357
github.com/instill-ai/x v0.6.0-alpha.0.20250220113648-be48bc78368d h1:HsnJ1inBFWXurGVLsH7jJFkqRYU1O2Nwg3ixtOCk98E=

pkg/handler/knowledgebasefiles.go

Lines changed: 96 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,12 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
2929
err := fmt.Errorf("failed to get user id from header: %v. err: %w", err, customerror.ErrUnauthenticated)
3030
return nil, err
3131
}
32-
err = checkUploadKnowledgeBaseFileRequest(req)
32+
33+
hasObject, err := checkUploadKnowledgeBaseFileRequest(req)
3334
if err != nil {
3435
return nil, err
3536
}
36-
// check file name length based on character count
37-
if len(req.File.Name) > 255 {
38-
return nil, fmt.Errorf("file name is too long. max length is 255. name: %s err: %w",
39-
req.File.Name, customerror.ErrInvalidArgument)
40-
}
41-
// determine the file type by its extension
42-
req.File.Type = DetermineFileType(req.File.Name)
43-
if req.File.Type == artifactpb.FileType_FILE_TYPE_UNSPECIFIED {
44-
return nil, fmt.Errorf("file extension is not supported. name: %s err: %w",
45-
req.File.Name, customerror.ErrInvalidArgument)
46-
}
4737

48-
if strings.Contains(req.File.Name, "/") {
49-
return nil, fmt.Errorf("file name cannot contain '/'. err: %w", customerror.ErrInvalidArgument)
50-
}
5138
ns, err := ph.service.GetNamespaceByNsID(ctx, req.GetNamespaceId())
5239
if err != nil {
5340
log.Error(
@@ -89,9 +76,26 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
8976
log.Error("failed to get namespace tier", zap.Error(err))
9077
return nil, fmt.Errorf("failed to get namespace tier. err: %w", err)
9178
}
79+
9280
// upload file to minio and database
9381
var res *repository.KnowledgeBaseFile
94-
{
82+
if !hasObject {
83+
// check file name length based on character count
84+
if len(req.File.Name) > 255 {
85+
return nil, fmt.Errorf("file name is too long. max length is 255. name: %s err: %w",
86+
req.File.Name, customerror.ErrInvalidArgument)
87+
}
88+
// determine the file type by its extension
89+
req.File.Type = DetermineFileType(req.File.Name)
90+
if req.File.Type == artifactpb.FileType_FILE_TYPE_UNSPECIFIED {
91+
return nil, fmt.Errorf("file extension is not supported. name: %s err: %w",
92+
req.File.Name, customerror.ErrInvalidArgument)
93+
}
94+
95+
if strings.Contains(req.File.Name, "/") {
96+
return nil, fmt.Errorf("file name cannot contain '/'. err: %w", customerror.ErrInvalidArgument)
97+
}
98+
9599
creatorUID, err := uuid.FromString(authUID)
96100
if err != nil {
97101
log.Error("failed to parse creator uid", zap.Error(err))
@@ -152,8 +156,64 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
152156
log.Error("failed to increase catalog usage", zap.Error(err))
153157
return nil, err
154158
}
155-
}
159+
} else {
160+
object, err := ph.service.Repository.GetObjectByUID(ctx, uuid.FromStringOrNil(req.GetFile().GetObjectUid()))
161+
if err != nil {
162+
log.Error("failed to get catalog object with provided UID", zap.Error(err))
163+
return nil, err
164+
}
156165

166+
if !object.IsUploaded {
167+
log.Error("file has not been uploaded yet")
168+
return nil, fmt.Errorf("file has not been uploaded yet")
169+
}
170+
171+
// check if file size is more than 150MB
172+
if object.Size > int64(tier.GetMaxUploadFileSize()) {
173+
return nil, fmt.Errorf(
174+
"file size is more than %v. err: %w",
175+
tier.GetMaxUploadFileSize(),
176+
customerror.ErrInvalidArgument)
177+
}
178+
179+
// check if total usage in namespace
180+
quota, humanReadable := tier.GetFileStorageTotalQuota()
181+
if totalUsageInNamespace+object.Size > int64(quota) {
182+
return nil, fmt.Errorf(
183+
"file storage total quota exceeded. max: %v. tier:%v, err: %w",
184+
humanReadable, tier.String(), customerror.ErrInvalidArgument)
185+
}
186+
187+
req.File.Type = DetermineFileType(object.Name)
188+
189+
kbFile := repository.KnowledgeBaseFile{
190+
Name: object.Name,
191+
Type: req.File.Type.String(),
192+
Owner: ns.NsUID,
193+
CreatorUID: object.CreatorUID,
194+
KnowledgeBaseUID: kb.UID,
195+
Destination: object.Destination,
196+
ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_NOTSTARTED)],
197+
Size: object.Size,
198+
ExternalMetadataUnmarshal: req.File.ExternalMetadata,
199+
}
200+
201+
// create catalog file in database
202+
res, err = ph.service.Repository.CreateKnowledgeBaseFile(ctx, kbFile, nil)
203+
204+
if err != nil {
205+
log.Error("failed to create catalog file", zap.Error(err))
206+
return nil, err
207+
}
208+
209+
// increase catalog usage. need to increase after the file is created.
210+
// Note: in the future, we need to increase the usage in transaction with creating the file.
211+
err = ph.service.Repository.IncreaseKnowledgeBaseUsage(ctx, nil, kb.UID.String(), int(object.Size))
212+
if err != nil {
213+
log.Error("failed to increase catalog usage", zap.Error(err))
214+
return nil, err
215+
}
216+
}
157217
return &artifactpb.UploadCatalogFileResponse{
158218
File: &artifactpb.File{
159219
FileUid: res.UID.String(),
@@ -169,6 +229,7 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
169229
TotalChunks: 0,
170230
TotalTokens: 0,
171231
ExternalMetadata: res.ExternalMetadataUnmarshal,
232+
ObjectUid: req.File.ObjectUid,
172233
},
173234
}, nil
174235
}
@@ -204,20 +265,23 @@ func getFileSize(base64String string) (int64, string) {
204265
return int64(decodedSize), fmt.Sprintf("%.1f %cB", size, "KMGTPE"[exp])
205266
}
206267

207-
func checkUploadKnowledgeBaseFileRequest(req *artifactpb.UploadCatalogFileRequest) error {
268+
// Check if objectUID is provided, and all other required fields if not
269+
func checkUploadKnowledgeBaseFileRequest(req *artifactpb.UploadCatalogFileRequest) (bool, error) {
208270
if req.GetNamespaceId() == "" {
209-
return fmt.Errorf("owner uid is required. err: %w", ErrCheckRequiredFields)
271+
return false, fmt.Errorf("owner uid is required. err: %w", ErrCheckRequiredFields)
210272
} else if req.CatalogId == "" {
211-
return fmt.Errorf("catalog uid is required. err: %w", ErrCheckRequiredFields)
273+
return false, fmt.Errorf("catalog uid is required. err: %w", ErrCheckRequiredFields)
212274
} else if req.File == nil {
213-
return fmt.Errorf("file is required. err: %w", ErrCheckRequiredFields)
275+
return false, fmt.Errorf("file is required. err: %w", ErrCheckRequiredFields)
276+
} else if req.File.GetObjectUid() != "" {
277+
return true, nil
214278
} else if req.File.Name == "" {
215-
return fmt.Errorf("file name is required. err: %w", ErrCheckRequiredFields)
279+
return false, fmt.Errorf("file name is required. err: %w", ErrCheckRequiredFields)
216280
} else if req.File.Content == "" {
217-
return fmt.Errorf("file content is required. err: %w", ErrCheckRequiredFields)
281+
return false, fmt.Errorf("file content is required. err: %w", ErrCheckRequiredFields)
218282
}
219283

220-
return nil
284+
return false, nil
221285
}
222286

223287
// MoveFileToCatalog moves a file from one catalog to another within the same namespace.
@@ -394,6 +458,9 @@ func (ph *PublicHandler) ListCatalogFiles(ctx context.Context, req *artifactpb.L
394458
totalSize = size
395459
nextPageToken = nextToken
396460
for _, kbFile := range kbFiles {
461+
462+
objectUID := uuid.FromStringOrNil(strings.TrimPrefix(strings.Split(kbFile.Destination, "/")[1], "obj-"))
463+
397464
files = append(files, &artifactpb.File{
398465
FileUid: kbFile.UID.String(),
399466
OwnerUid: kbFile.Owner.String(),
@@ -408,6 +475,7 @@ func (ph *PublicHandler) ListCatalogFiles(ctx context.Context, req *artifactpb.L
408475
ExternalMetadata: kbFile.ExternalMetadataUnmarshal,
409476
TotalChunks: int32(totalChunks[kbFile.UID]),
410477
TotalTokens: int32(totalTokens[kbFile.UID]),
478+
ObjectUid: objectUID.String(),
411479
})
412480
}
413481
}
@@ -614,6 +682,9 @@ func (ph *PublicHandler) ProcessCatalogFiles(ctx context.Context, req *artifactp
614682
// populate the files into response
615683
var resFiles []*artifactpb.File
616684
for _, file := range files {
685+
686+
objectUID := uuid.FromStringOrNil(strings.TrimPrefix(strings.Split(file.Destination, "/")[1], "obj-"))
687+
617688
resFiles = append(resFiles, &artifactpb.File{
618689
FileUid: file.UID.String(),
619690
OwnerUid: file.Owner.String(),
@@ -624,6 +695,7 @@ func (ph *PublicHandler) ProcessCatalogFiles(ctx context.Context, req *artifactp
624695
CreateTime: timestamppb.New(*file.CreateTime),
625696
UpdateTime: timestamppb.New(*file.UpdateTime),
626697
ProcessStatus: artifactpb.FileProcessStatus(artifactpb.FileProcessStatus_value[file.ProcessStatus]),
698+
ObjectUid: objectUID.String(),
627699
})
628700
}
629701
return &artifactpb.ProcessCatalogFilesResponse{

pkg/repository/objectUrl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (r *Repository) UpdateObjectURL(ctx context.Context, objectURL ObjectURL) (
104104
}
105105

106106
// CreateObjectURLWithUIDInEncodedURLPath creates an ObjectURL record in the database with the UID in the encoded_url_path.
107-
func (r *Repository) CreateObjectURLWithUIDInEncodedURLPath(ctx context.Context, objectURL ObjectURL,namespaceID string, EncodedMinioURLPath func(namespaceID string, objectURLUUID uuid.UUID) string) (*ObjectURL, error) {
107+
func (r *Repository) CreateObjectURLWithUIDInEncodedURLPath(ctx context.Context, objectURL ObjectURL, namespaceID string, EncodedMinioURLPath func(namespaceID string, objectURLUUID uuid.UUID) string) (*ObjectURL, error) {
108108
var result ObjectURL
109109
err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
110110
// Create the initial object URL

pkg/worker/persistentcatalogworker.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,8 @@ func (wp *persistentCatalogFileToEmbWorkerPool) processConvertingFile(ctx contex
365365
logger, _ := logger.GetZapLogger(ctx)
366366

367367
fileInMinIOPath := file.Destination
368-
data, err := wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, fileInMinIOPath)
368+
bucket := checkIfUploadedByBlobURL(fileInMinIOPath)
369+
data, err := wp.svc.MinIO.GetFile(ctx, bucket, fileInMinIOPath)
369370
if err != nil {
370371
logger.Error("Failed to get file from minIO.", zap.String("File path", fileInMinIOPath))
371372
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
@@ -445,8 +446,11 @@ func (wp *persistentCatalogFileToEmbWorkerPool) procesSummarizingFile(ctx contex
445446

446447
case artifactpb.FileType_FILE_TYPE_TEXT.String(),
447448
artifactpb.FileType_FILE_TYPE_MARKDOWN.String():
449+
450+
fileInMinIOPath := file.Destination
451+
bucket := checkIfUploadedByBlobURL(fileInMinIOPath)
448452
// Get original file for text/markdown types
449-
fileData, err = wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, file.Destination)
453+
fileData, err = wp.svc.MinIO.GetFile(ctx, bucket, fileInMinIOPath)
450454
if err != nil {
451455
logger.Error("Failed to get file from minIO.", zap.String("File uid", file.UID.String()))
452456
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
@@ -567,7 +571,9 @@ func (wp *persistentCatalogFileToEmbWorkerPool) processChunkingFile(ctx context.
567571

568572
case artifactpb.FileType_FILE_TYPE_MARKDOWN.String():
569573
// Get original file for markdown types
570-
fileData, err = wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, file.Destination)
574+
fileInMinIOPath := file.Destination
575+
bucket := checkIfUploadedByBlobURL(fileInMinIOPath)
576+
fileData, err = wp.svc.MinIO.GetFile(ctx, bucket, fileInMinIOPath)
571577
if err != nil {
572578
logger.Error("Failed to get file from minIO.", zap.String("File uid", file.UID.String()))
573579
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
@@ -583,7 +589,9 @@ func (wp *persistentCatalogFileToEmbWorkerPool) processChunkingFile(ctx context.
583589

584590
case artifactpb.FileType_FILE_TYPE_TEXT.String():
585591
// Get original file for text types
586-
fileData, err = wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, file.Destination)
592+
fileInMinIOPath := file.Destination
593+
bucket := checkIfUploadedByBlobURL(fileInMinIOPath)
594+
fileData, err = wp.svc.MinIO.GetFile(ctx, bucket, fileInMinIOPath)
587595
if err != nil {
588596
logger.Error("Failed to get file from minIO.", zap.String("File uid", file.UID.String()))
589597
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err

pkg/worker/utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package worker
2+
3+
import (
4+
"strings"
5+
6+
"github.com/instill-ai/artifact-backend/pkg/minio"
7+
)
8+
9+
func checkIfUploadedByBlobURL(destination string) string {
10+
if strings.Contains(destination, "uploaded-file") {
11+
return minio.KnowledgeBaseBucketName
12+
} else {
13+
return minio.BlobBucketName
14+
}
15+
}

0 commit comments

Comments
 (0)