Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions internal/rows/arrowbased/arrowRecordIterator.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package arrowbased

import (
"bytes"
"context"
"fmt"
"io"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/config"
dbsqlerr "github.com/databricks/databricks-sql-go/internal/errors"
Expand Down Expand Up @@ -34,6 +37,7 @@ type arrowRecordIterator struct {
currentBatch SparkArrowBatch
isFinished bool
arrowSchemaBytes []byte
arrowSchema *arrow.Schema
}

var _ rows.ArrowBatchIterator = (*arrowRecordIterator)(nil)
Expand Down Expand Up @@ -170,3 +174,36 @@ func (ri *arrowRecordIterator) newBatchIterator(fr *cli_service.TFetchResultsRes
return NewLocalBatchIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
}
}

// Return the schema of the records.
func (ri *arrowRecordIterator) Schema() (*arrow.Schema, error) {
// Return cached schema if available
if ri.arrowSchema != nil {
return ri.arrowSchema, nil
}

// Try to get schema bytes if not already available
if ri.arrowSchemaBytes == nil {
if ri.HasNext() {
if err := ri.getCurrentBatch(); err != nil {
return nil, err
}
}

// If still no schema bytes, we can't create a schema
if ri.arrowSchemaBytes == nil {
return nil, fmt.Errorf("no schema available")
}
}

// Convert schema bytes to Arrow schema
reader, err := ipc.NewReader(bytes.NewReader(ri.arrowSchemaBytes))
if err != nil {
return nil, fmt.Errorf("failed to create Arrow IPC reader: %w", err)
}
defer reader.Release()

// Cache and return the schema
ri.arrowSchema = reader.Schema()
return ri.arrowSchema, nil
}
129 changes: 129 additions & 0 deletions internal/rows/arrowbased/arrowRecordIterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,135 @@ func TestArrowRecordIterator(t *testing.T) {
})
}

func TestArrowRecordIteratorSchema(t *testing.T) {
// Test with arrowSchemaBytes available
t.Run("schema with initial schema bytes", func(t *testing.T) {
logger := dbsqllog.WithContext("connectionId", "correlationId", "")

executeStatementResp := cli_service.TExecuteStatementResp{}
loadTestData2(t, "directResultsMultipleFetch/ExecuteStatement.json", &executeStatementResp)

fetchResp1 := cli_service.TFetchResultsResp{}
loadTestData2(t, "directResultsMultipleFetch/FetchResults1.json", &fetchResp1)

var fetchesInfo []fetchResultsInfo
simpleClient := getSimpleClient(&fetchesInfo, []cli_service.TFetchResultsResp{fetchResp1})
rpi := rowscanner.NewResultPageIterator(
rowscanner.NewDelimiter(0, 0),
5000,
nil,
false,
simpleClient,
"connectionId",
"correlationId",
logger,
)

cfg := *config.WithDefaults()

bi, err := NewLocalBatchIterator(
context.Background(),
executeStatementResp.DirectResults.ResultSet.Results.ArrowBatches,
0,
executeStatementResp.DirectResults.ResultSetMetadata.ArrowSchema,
&cfg,
)
assert.Nil(t, err)

// Create arrowRecordIterator with schema bytes already available
rs := NewArrowRecordIterator(
context.Background(),
rpi,
bi,
executeStatementResp.DirectResults.ResultSetMetadata.ArrowSchema,
cfg,
)
defer rs.Close()

// Test Schema() method
schema, schemaErr := rs.Schema()
assert.NoError(t, schemaErr)
assert.NotNil(t, schema)

// Cache works - we should get same schema object on second call
secondSchema, schemaErr2 := rs.Schema()
assert.NoError(t, schemaErr2)
assert.Same(t, schema, secondSchema)
})

// Test with arrowSchemaBytes that needs to be populated via a batch
t.Run("schema with lazy loading", func(t *testing.T) {
logger := dbsqllog.WithContext("connectionId", "correlationId", "")

fetchResp1 := cli_service.TFetchResultsResp{}
loadTestData2(t, "multipleFetch/FetchResults1.json", &fetchResp1)

var fetchesInfo []fetchResultsInfo
simpleClient := getSimpleClient(&fetchesInfo, []cli_service.TFetchResultsResp{fetchResp1})
rpi := rowscanner.NewResultPageIterator(
rowscanner.NewDelimiter(0, 0),
5000,
nil,
false,
simpleClient,
"connectionId",
"correlationId",
logger,
)

cfg := *config.WithDefaults()

// Create arrowRecordIterator without initial schema bytes
rs := NewArrowRecordIterator(context.Background(), rpi, nil, nil, cfg)
defer rs.Close()

// Schema() should trigger loading a batch to get schema
schema, schemaErr := rs.Schema()
assert.NoError(t, schemaErr)
assert.NotNil(t, schema)

// Cache works - we should get same schema object on second call
secondSchema, schemaErr2 := rs.Schema()
assert.NoError(t, schemaErr2)
assert.Same(t, schema, secondSchema)
})

// Test with no schema available
t.Run("schema with no data available", func(t *testing.T) {
logger := dbsqllog.WithContext("connectionId", "correlationId", "")

// Instead of using an empty response list, let's create a custom client
// that returns an error when trying to fetch results
failingClient := &client.TestClient{
FnFetchResults: func(ctx context.Context, req *cli_service.TFetchResultsReq) (*cli_service.TFetchResultsResp, error) {
return nil, fmt.Errorf("no data available")
},
}

rpi := rowscanner.NewResultPageIterator(
rowscanner.NewDelimiter(0, 0),
5000,
nil,
false,
failingClient,
"connectionId",
"correlationId",
logger,
)

cfg := *config.WithDefaults()

// Create arrowRecordIterator without schema bytes and with failing client
rs := NewArrowRecordIterator(context.Background(), rpi, nil, nil, cfg)
defer rs.Close()

// Schema() should return error since no schema can be obtained
schema, schemaErr := rs.Schema()
assert.Error(t, schemaErr)
assert.Nil(t, schema)
})
}

type fetchResultsInfo struct {
direction cli_service.TFetchOrientation
resultStartRec int
Expand Down
3 changes: 3 additions & 0 deletions rows/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ type ArrowBatchIterator interface {

// Release any resources in use by the iterator.
Close()

// Return the schema of the records.
Schema() (*arrow.Schema, error)
}
Loading