Skip to content

Commit c4ec3a0

Browse files
authored
fix(kbfile): reprocessing fixes (#246)
Because - Old failed files can't be reprocessed because the previous pipeline version is used - Even if reprocessing succeeds, failure reason isn't cleaned up from previous failure - Console passes an empty string as the conversion pipeline when setting it as the default pipeline This commit - Adds a migration so failed files use the new pipeline - Clears failure reason when reprocessing a file - Sanitizes conversion pipeline input on catalog creation / update
1 parent 1836e09 commit c4ec3a0

File tree

9 files changed

+236
-55
lines changed

9 files changed

+236
-55
lines changed

cmd/migration/main.go

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package main
22

33
import (
4+
"context"
45
"database/sql"
56
"fmt"
67
"os"
78

89
"github.com/golang-migrate/migrate/v4"
10+
"go.uber.org/zap"
911

1012
_ "github.com/golang-migrate/migrate/v4/database/postgres"
1113
_ "github.com/golang-migrate/migrate/v4/source/file"
1214

1315
"github.com/instill-ai/artifact-backend/config"
1416
"github.com/instill-ai/artifact-backend/pkg/db/migration"
17+
18+
database "github.com/instill-ai/artifact-backend/pkg/db"
19+
logx "github.com/instill-ai/x/log"
1520
)
1621

1722
func dbExistsOrCreate(databaseConfig config.DatabaseConfig) error {
@@ -54,15 +59,25 @@ func dbExistsOrCreate(databaseConfig config.DatabaseConfig) error {
5459

5560
return nil
5661
}
57-
func main() {
5862

63+
func main() {
5964
if err := config.Init(config.ParseConfigFlag()); err != nil {
6065
panic(err)
6166
}
6267

68+
ctx, cancel := context.WithCancel(context.Background())
69+
defer cancel()
70+
71+
logx.Debug = config.Config.Server.Debug
72+
logger, _ := logx.GetZapLogger(ctx)
73+
defer func() {
74+
// can't handle the error due to https://github.com/uber-go/zap/issues/880
75+
_ = logger.Sync()
76+
}()
77+
6378
databaseConfig := config.Config.Database
6479
if err := dbExistsOrCreate(databaseConfig); err != nil {
65-
panic(err)
80+
logger.Fatal("Checking database existence", zap.Error(err))
6681
}
6782

6883
dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?%s",
@@ -74,39 +89,70 @@ func main() {
7489
"sslmode=disable",
7590
)
7691

77-
migrateFolder, _ := os.Getwd()
92+
db := database.GetSharedConnection().WithContext(ctx)
93+
codeMigrator := &migration.CodeMigrator{
94+
Logger: logger,
95+
DB: db,
96+
}
97+
98+
defer func() { database.Close(db) }()
99+
100+
if err := runMigration(dsn, migration.TargetSchemaVersion, codeMigrator.Migrate, logger); err != nil {
101+
logger.Fatal("Running migration", zap.Error(err))
102+
}
103+
}
104+
105+
func runMigration(
106+
dsn string,
107+
expectedVersion uint,
108+
execCode func(version uint) error,
109+
logger *zap.Logger,
110+
) error {
111+
migrateFolder, err := os.Getwd()
112+
if err != nil {
113+
return fmt.Errorf("accessing base path: %w", err)
114+
}
115+
78116
m, err := migrate.New(fmt.Sprintf("file:///%s/pkg/db/migration", migrateFolder), dsn)
79117
if err != nil {
80-
panic(err)
118+
return fmt.Errorf("creating migration: %w", err)
81119
}
82120

83-
expectedVersion := migration.TargetSchemaVersion
84121
curVersion, dirty, err := m.Version()
85122
if err != nil && curVersion != 0 {
86-
panic(err)
123+
return fmt.Errorf("getting current version: %w", err)
87124
}
88125

89-
fmt.Printf("Expected migration version is %d\n", expectedVersion)
90-
fmt.Printf("The current schema version is %d, and dirty flag is %t\n", curVersion, dirty)
126+
logger.Info("Running migration",
127+
zap.Uint("expectedVersion", expectedVersion),
128+
zap.Uint("currentVersion", curVersion),
129+
zap.Bool("dirty", dirty),
130+
)
131+
91132
if dirty {
92-
panic("the database's dirty flag is set, please fix it")
133+
return fmt.Errorf("database is dirty, please fix it")
93134
}
94135

95136
step := curVersion
96137
for {
97138
if expectedVersion <= step {
98-
fmt.Printf("Migration to version %d complete\n", expectedVersion)
139+
logger.Info("Migration completed", zap.Uint("expectedVersion", expectedVersion))
99140
break
100141
}
101142

102-
fmt.Printf("Step up to version %d\n", step+1)
143+
logger.Info("Step up", zap.Uint("step", step+1))
103144
if err := m.Steps(1); err != nil {
104-
panic(err)
145+
return fmt.Errorf("stepping up: %w", err)
146+
}
147+
148+
if step, _, err = m.Version(); err != nil {
149+
return fmt.Errorf("getting new version: %w", err)
105150
}
106151

107-
step, _, err = m.Version()
108-
if err != nil {
109-
panic(err)
152+
if err := execCode(step); err != nil {
153+
return fmt.Errorf("running associated code: %w", err)
110154
}
111155
}
156+
157+
return nil
112158
}

pkg/db/migration/000025_bump_conversion_pipeline_on_failed_files.down.sql

Whitespace-only changes.

pkg/db/migration/000025_bump_conversion_pipeline_on_failed_files.up.sql

Whitespace-only changes.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package convert000025
2+
3+
import (
4+
"fmt"
5+
6+
"gorm.io/gorm"
7+
8+
"github.com/instill-ai/artifact-backend/pkg/db/migration/convert"
9+
"github.com/instill-ai/artifact-backend/pkg/repository"
10+
11+
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
12+
)
13+
14+
const batchSize = 100
15+
16+
type BumpConversionPipeline struct {
17+
convert.Basic
18+
}
19+
20+
const (
21+
oldConvertingPipe = "preset/indexing-advanced-convert-doc@v1.3.1"
22+
newConvertingPipe = "preset/indexing-advanced-convert-doc@v1.3.2"
23+
)
24+
25+
func (c *BumpConversionPipeline) Migrate() error {
26+
files := make([]*repository.KnowledgeBaseFile, 0, batchSize)
27+
q := c.DB.Select("uid").
28+
Where("process_status = ?", artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_FAILED.String()).
29+
Where("extra_meta_data->>'converting_pipe' = ?", oldConvertingPipe)
30+
31+
return q.FindInBatches(&files, batchSize, func(tx *gorm.DB, _ int) error {
32+
for _, f := range files {
33+
// Update the process status of the files
34+
updates := map[string]any{
35+
// Clear previous failure reason
36+
"extra_meta_data": gorm.Expr(
37+
"COALESCE(extra_meta_data, '{}'::jsonb) || ?::jsonb",
38+
fmt.Sprintf(`{"converting_pipe": "%s"}`, newConvertingPipe),
39+
),
40+
}
41+
42+
if err := tx.Model(f).Where("uid = ?", f.UID).Updates(updates).Error; err != nil {
43+
return fmt.Errorf("updating record %s: %w", f.UID.String(), err)
44+
}
45+
}
46+
47+
return nil
48+
}).Error
49+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package convert
2+
3+
import (
4+
"go.uber.org/zap"
5+
"gorm.io/gorm"
6+
)
7+
8+
// Basic contains the basic elements to execute a conversion migration.
9+
type Basic struct {
10+
DB *gorm.DB
11+
Logger *zap.Logger
12+
}

pkg/db/migration/migration.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,49 @@
11
package migration
22

3+
import (
4+
"go.uber.org/zap"
5+
"gorm.io/gorm"
6+
7+
"github.com/instill-ai/artifact-backend/pkg/db/migration/convert"
8+
"github.com/instill-ai/artifact-backend/pkg/db/migration/convert/convert000025"
9+
)
10+
311
// TargetSchemaVersion determines the database schema version.
4-
const TargetSchemaVersion uint = 24
12+
const TargetSchemaVersion uint = 25
13+
14+
type migration interface {
15+
Migrate() error
16+
}
17+
18+
// CodeMigrator orchestrates the execution of the code associated with the
19+
// different database migrations and holds their dependencies.
20+
type CodeMigrator struct {
21+
Logger *zap.Logger
22+
23+
DB *gorm.DB
24+
}
25+
26+
// Migrate executes custom code as part of a database migration. This code is
27+
// intended to be run only once and typically goes along a change
28+
// in the database schemas. Some use cases might be backfilling a table or
29+
// updating some existing records according to the schema changes.
30+
//
31+
// Note that the changes in the database schemas shouldn't be run here, only
32+
// code accompanying them.
33+
func (cm *CodeMigrator) Migrate(version uint) error {
34+
var m migration
35+
36+
bc := convert.Basic{
37+
DB: cm.DB,
38+
Logger: cm.Logger,
39+
}
40+
41+
switch version {
42+
case 25:
43+
m = &convert000025.BumpConversionPipeline{Basic: bc}
44+
default:
45+
return nil
46+
}
47+
48+
return m.Migrate()
49+
}

pkg/handler/knowledgebase.go

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math/rand"
77
"regexp"
8+
"slices"
89
"strings"
910
"time"
1011

@@ -101,25 +102,11 @@ func (ph *PublicHandler) CreateCatalog(ctx context.Context, req *artifactpb.Crea
101102
}
102103

103104
// Read conversion pipelines from request.
104-
// TODO jvallesm: validate existence, permissions & recipe of provided
105-
// pipelines.
106-
convertingPipelines := req.GetConvertingPipelines()
107-
for _, pipelineName := range convertingPipelines {
108-
if _, err := service.PipelineReleaseFromName(pipelineName); err != nil {
109-
err = fmt.Errorf("%w: invalid conversion pipeline format: %w", errorsx.ErrInvalidArgument, err)
110-
return nil, errorsx.AddMessage(
111-
err,
112-
`Conversion pipeline must have the format "{namespaceID}/{pipelineID}@{version}"`,
113-
)
114-
}
105+
convertingPipelines, err := sanitizeConvertingPipelines(req.GetConvertingPipelines())
106+
if err != nil {
107+
return nil, err
115108
}
116109

117-
/*
118-
if err := ph.service.ValidateConvertingPipelines(convertingPipelines); err != nil {
119-
return nil, fmt.Errorf("validating pipelines: %w")
120-
}
121-
*/
122-
123110
// create catalog
124111
dbData, err := ph.service.Repository().CreateKnowledgeBase(
125112
ctx,
@@ -296,18 +283,20 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
296283
return nil, fmt.Errorf("%w: no permission over catalog", errorsx.ErrUnauthorized)
297284
}
298285

286+
convertingPipelines, err := sanitizeConvertingPipelines(req.GetConvertingPipelines())
287+
if err != nil {
288+
return nil, err
289+
}
290+
299291
// update catalog
300292
kb, err = ph.service.Repository().UpdateKnowledgeBase(
301293
ctx,
302294
req.GetCatalogId(),
303295
ns.NsUID.String(),
304296
repository.KnowledgeBase{
305-
Description: req.GetDescription(),
306-
Tags: req.GetTags(),
307-
308-
// TODO jvallesm: validate existence, permissions & recipe of provided
309-
// pipelines.
310-
ConvertingPipelines: req.GetConvertingPipelines(),
297+
Description: req.GetDescription(),
298+
Tags: req.GetTags(),
299+
ConvertingPipelines: convertingPipelines,
311300
},
312301
)
313302
if err != nil {
@@ -349,12 +338,9 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
349338
UsedStorage: uint64(kb.Usage),
350339
}
351340

352-
if len(kb.ConvertingPipelines) == 0 {
353-
catalog.ConvertingPipelines = defaultConvertingPipelines()
354-
}
355-
356341
return &artifactpb.UpdateCatalogResponse{Catalog: catalog}, nil
357342
}
343+
358344
func (ph *PublicHandler) DeleteCatalog(ctx context.Context, req *artifactpb.DeleteCatalogRequest) (*artifactpb.DeleteCatalogResponse, error) {
359345
logger, _ := logx.GetZapLogger(ctx)
360346
authUID, err := getUserUIDFromContext(ctx)
@@ -518,3 +504,41 @@ func defaultConvertingPipelines() []string {
518504
service.ConvertDocToMDPipeline.Name(),
519505
}
520506
}
507+
508+
// sanitizeConvertingPipelines validates an input array of strings that
509+
// represent the conversion pipelines of a catalog. It checks the string format
510+
// is correct.
511+
// TODO we also want to validate the existence of the pipelines, permissions of
512+
// the requester over that pipeline and the validity of its recipe.
513+
func sanitizeConvertingPipelines(pipelines []string) ([]string, error) {
514+
validPipelines := make([]string, 0, len(pipelines))
515+
for _, pipelineName := range pipelines {
516+
// Console passes an empty string to reset the catalog conversion
517+
// pipeline to the default one.
518+
if pipelineName == "" {
519+
continue
520+
}
521+
522+
// Remove duplicates.
523+
if slices.Contains(validPipelines, pipelineName) {
524+
continue
525+
}
526+
527+
if _, err := service.PipelineReleaseFromName(pipelineName); err != nil {
528+
err = fmt.Errorf("%w: invalid conversion pipeline format: %w", errorsx.ErrInvalidArgument, err)
529+
return nil, errorsx.AddMessage(
530+
err,
531+
`Conversion pipeline must have the format "{namespaceID}/{pipelineID}@{version}"`,
532+
)
533+
}
534+
535+
validPipelines = append(validPipelines, pipelineName)
536+
537+
}
538+
539+
if len(validPipelines) == 0 {
540+
validPipelines = defaultConvertingPipelines()
541+
}
542+
543+
return validPipelines, nil
544+
}

0 commit comments

Comments
 (0)