From df5a8d1ee58e36252b4f22d1298f9f2d1fb778c0 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 22 Jul 2024 14:51:03 +0200 Subject: [PATCH 1/4] Make sqliteparserutils.CreateStatementIterator public Signed-off-by: Piotr Jastrzebski --- sqliteparserutils/utils.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sqliteparserutils/utils.go b/sqliteparserutils/utils.go index b6192c5..eab4e1a 100644 --- a/sqliteparserutils/utils.go +++ b/sqliteparserutils/utils.go @@ -19,8 +19,7 @@ type StatementIterator struct { currentToken antlr.Token } -// keep createStatementIterator here for the future uses but do not expose it for now as we will not use it immediately -func createStatementIterator(statement string) *StatementIterator { +func CreateStatementIterator(statement string) *StatementIterator { return &StatementIterator{tokenizer: createStringTokenizer(statement)} } @@ -77,7 +76,7 @@ func (iterator *StatementIterator) Next() (statement string, extraInfo SplitStat } func SplitStatement(statement string) (statements []string, extraInfo SplitStatementExtraInfo) { - iterator := createStatementIterator(statement) + iterator := CreateStatementIterator(statement) statements = make([]string, 0) for { From b05527b882135854b005bdf0bca351e7b639864f Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 22 Jul 2024 14:52:11 +0200 Subject: [PATCH 2/4] Extract hranaV2Conn.closeStream Signed-off-by: Piotr Jastrzebski --- libsql/internal/http/hranaV2/hranaV2.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libsql/internal/http/hranaV2/hranaV2.go b/libsql/internal/http/hranaV2/hranaV2.go index 6c79014..105e6bb 100644 --- a/libsql/internal/http/hranaV2/hranaV2.go +++ b/libsql/internal/http/hranaV2/hranaV2.go @@ -477,7 +477,7 @@ func (h *hranaV2Conn) QueryContext(ctx context.Context, query string, args []dri } } -func (h *hranaV2Conn) ResetSession(ctx context.Context) error { +func (h *hranaV2Conn) closeStream() { if h.baton != "" { go func(baton, url, jwt, host string) { msg := hrana.PipelineRequest{Baton: baton} @@ -486,5 +486,9 @@ func (h *hranaV2Conn) ResetSession(ctx context.Context) error { }(h.baton, h.url, h.jwt, h.host) h.baton = "" } +} + +func (h *hranaV2Conn) ResetSession(ctx context.Context) error { + h.closeStream() return nil } From b0e77e616eb5aa88afa8880bae9f7179c8efe5da Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 22 Jul 2024 14:55:06 +0200 Subject: [PATCH 3/4] Exctract hranaV2Conn.executeMsg Signed-off-by: Piotr Jastrzebski --- libsql/internal/http/hranaV2/hranaV2.go | 28 +++++++++++++++++-------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/libsql/internal/http/hranaV2/hranaV2.go b/libsql/internal/http/hranaV2/hranaV2.go index 105e6bb..e6219c0 100644 --- a/libsql/internal/http/hranaV2/hranaV2.go +++ b/libsql/internal/http/hranaV2/hranaV2.go @@ -286,6 +286,23 @@ func sendPipelineRequest(ctx context.Context, msg *hrana.PipelineRequest, url st return result, false, nil } +func (h *hranaV2Conn) executeMsg(ctx context.Context, msg *hrana.PipelineRequest) (*hrana.PipelineResponse, error) { + result, err := h.sendPipelineRequest(ctx, msg, false) + if err != nil { + return nil, err + } + + for _, r := range result.Results { + if r.Error != nil { + return nil, errors.New(r.Error.Message) + } + if r.Response == nil { + return nil, errors.New("no response received") + } + } + return result, nil +} + func (h *hranaV2Conn) executeStmt(ctx context.Context, query string, args []driver.NamedValue, wantRows bool) (*hrana.PipelineResponse, error) { stmts, params, err := shared.ParseStatementAndArgs(query, args) if err != nil { @@ -310,18 +327,11 @@ func (h *hranaV2Conn) executeStmt(ctx context.Context, query string, args []driv msg.Add(*batchStream) } - result, err := h.sendPipelineRequest(ctx, msg, false) + resp, err := h.executeMsg(ctx, msg) if err != nil { return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err) } - - if result.Results[0].Error != nil { - return nil, fmt.Errorf("failed to execute SQL: %s\n%s", query, result.Results[0].Error.Message) - } - if result.Results[0].Response == nil { - return nil, fmt.Errorf("failed to execute SQL: %s\n%s", query, "no response received") - } - return result, nil + return resp, nil } func (h *hranaV2Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { From f3e03d5f0e9d72f91ffc14aa2176ed3fb6ced76f Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 22 Jul 2024 14:55:56 +0200 Subject: [PATCH 4/4] Execute big batches in chunks Fixes https://github.com/tursodatabase/turso-cli/issues/839 Refs https://github.com/tursodatabase/turso-cli/issues/735 Signed-off-by: Piotr Jastrzebski --- libsql/internal/http/hranaV2/hranaV2.go | 113 +++++++++++++++++++++++- 1 file changed, 109 insertions(+), 4 deletions(-) diff --git a/libsql/internal/http/hranaV2/hranaV2.go b/libsql/internal/http/hranaV2/hranaV2.go index e6219c0..7e6ef05 100644 --- a/libsql/internal/http/hranaV2/hranaV2.go +++ b/libsql/internal/http/hranaV2/hranaV2.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/tursodatabase/libsql-client-go/sqliteparserutils" "io" "net/http" net_url "net/url" @@ -303,10 +304,114 @@ func (h *hranaV2Conn) executeMsg(ctx context.Context, msg *hrana.PipelineRequest return result, nil } +type chunker struct { + chunk []string + iterator *sqliteparserutils.StatementIterator + limit int +} + +func newChunker(iterator *sqliteparserutils.StatementIterator, limit int) *chunker { + return &chunker{iterator: iterator, chunk: make([]string, 0, limit), limit: limit} +} + +func isTransactionStatement(stmt string) bool { + patterns := [][]byte{[]byte("begin"), []byte("commit"), []byte("end"), []byte("rollback")} + for _, p := range patterns { + if len(stmt) >= len(p) && bytes.Equal(bytes.ToLower([]byte(stmt[0:len(p)])), p) { + return true + } + } + return false +} + +func (c *chunker) Next() (chunk []string, isEOF bool) { + c.chunk = c.chunk[:0] + var stmt string + for !isEOF && len(c.chunk) < c.limit { + stmt, _, isEOF = c.iterator.Next() + // We need to skip transaction statements. Chunks run in a transaction by default. + if stmt != "" && !isTransactionStatement(stmt) { + c.chunk = append(c.chunk, stmt) + } + } + return c.chunk, isEOF +} + +func (h *hranaV2Conn) executeSingleStmt(ctx context.Context, stmt string, wantRows bool) (*hrana.PipelineResponse, error) { + msg := &hrana.PipelineRequest{} + executeStream, err := hrana.ExecuteStream(stmt, nil, wantRows) + if err != nil { + return nil, fmt.Errorf("failed to execute SQL: %s\n%w", stmt, err) + } + msg.Add(*executeStream) + res, err := h.executeMsg(ctx, msg) + if err != nil { + return nil, fmt.Errorf("failed to execute SQL: %s\n%w", stmt, err) + } + return res, nil +} + +func (h *hranaV2Conn) executeInChunks(ctx context.Context, query string, wantRows bool) (*hrana.PipelineResponse, error) { + const chunkSize = 4096 + iterator := sqliteparserutils.CreateStatementIterator(query) + chunker := newChunker(iterator, chunkSize) + + chunk, isEOF := chunker.Next() + if isEOF && len(chunk) == 1 { + return h.executeSingleStmt(ctx, chunk[0], wantRows) + } + + _, err := h.executeSingleStmt(ctx, "BEGIN", false) + if err != nil { + return nil, err + } + + batch := &hrana.Batch{Steps: make([]hrana.BatchStep, chunkSize)} + msg := &hrana.PipelineRequest{} + msg.Add(hrana.StreamRequest{Type: "batch", Batch: batch}) + for idx := range batch.Steps { + batch.Steps[idx].Stmt.WantRows = wantRows + } + + result := &hrana.PipelineResponse{} + for { + for idx := range chunk { + batch.Steps[idx].Stmt.Sql = &chunk[idx] + } + if len(chunk) < chunkSize { + // We can trim batch.Steps because this is the last chunk anyway. + // isEOF has to be true at this point. + batch.Steps = batch.Steps[:len(chunk)] + } + res, err := h.executeMsg(ctx, msg) + if err != nil { + h.closeStream() + return nil, fmt.Errorf("failed to execute SQL:\n%w", err) + } + result.Baton = res.Baton + result.BaseUrl = res.BaseUrl + result.Results = append(result.Results, res.Results...) + if isEOF { + break + } + chunk, isEOF = chunker.Next() + } + _, err = h.executeSingleStmt(ctx, "COMMIT", false) + if err != nil { + h.closeStream() + return nil, err + } + return result, nil +} + func (h *hranaV2Conn) executeStmt(ctx context.Context, query string, args []driver.NamedValue, wantRows bool) (*hrana.PipelineResponse, error) { + const querySizeLimitForChunking = 20 * 1024 * 1024 + if len(args) == 0 && len(query) > querySizeLimitForChunking && !h.schemaDb { + return h.executeInChunks(ctx, query, wantRows) + } stmts, params, err := shared.ParseStatementAndArgs(query, args) if err != nil { - return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err) + return nil, fmt.Errorf("failed to execute SQL:\n%w", err) } msg := &hrana.PipelineRequest{} if len(stmts) == 1 { @@ -316,20 +421,20 @@ func (h *hranaV2Conn) executeStmt(ctx context.Context, query string, args []driv } executeStream, err := hrana.ExecuteStream(stmts[0], p, wantRows) if err != nil { - return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err) + return nil, fmt.Errorf("failed to execute SQL:\n%w", err) } msg.Add(*executeStream) } else { batchStream, err := hrana.BatchStream(stmts, params, wantRows, !h.schemaDb) if err != nil { - return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err) + return nil, fmt.Errorf("failed to execute SQL:\n%w", err) } msg.Add(*batchStream) } resp, err := h.executeMsg(ctx, msg) if err != nil { - return nil, fmt.Errorf("failed to execute SQL: %s\n%w", query, err) + return nil, fmt.Errorf("failed to execute SQL:\n%w", err) } return resp, nil }