Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/operations/ddl/create-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ For the `fts` method, the following options are required:

For advanced tokenizer configuration, refer to the [Lance FTS documentation](https://lance.org/format/table/index/scalar/fts/#tokenizers).

To query the index once it's built, see [Full-Text Search](../dql/fts.md).

### FTS Format Version

Lance FTS index format v2 is selected by the Lance runtime environment variable `LANCE_FTS_FORMAT_VERSION=2`. Configure it on both the Spark driver and executors before creating the index.
Expand Down
72 changes: 72 additions & 0 deletions docs/src/operations/dql/fts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Full-Text Search

Query Lance tables with full-text search (FTS) using the `lance_match` SQL function.

!!! warning "Spark Extension Required"
This feature requires the Lance Spark SQL extension to be enabled. See [Spark SQL Extensions](../../config.md#spark-sql-extensions) for configuration details.

!!! note "FTS Index Required for Acceleration"
`lance_match` uses the Lance inverted index when the queried column has one. Without an index, it falls back to plain substring matching — correct but slow. See [CREATE INDEX — FTS Options](../ddl/create-index.md#fts-options) for how to build one.

## Overview

`lance_match(column, 'query')` is a scalar predicate function that translates to a native Lance full-text query at scan time. The optimizer rewrites matching filter expressions into `ScanOptions.fullTextQuery(...)`, so BM25 relevance ranking and tokenizer rules (language, stemming, lower-casing, etc.) from the index definition apply during the scan — not as a post-scan filter in Spark.

## Basic Usage

=== "SQL"
```sql
SELECT id, content
FROM lance.db.docs
WHERE lance_match(content, 'apache spark');
```

The default operator between query terms is OR (multiple terms → documents containing at least one term match).

## Combining with Scalar Filters

FTS and scalar predicates combine naturally with `AND`. The scalar part stays as a regular Spark filter; the FTS part goes into the index scan.

=== "SQL"
```sql
SELECT id, content
FROM lance.db.docs
WHERE lance_match(content, 'python') AND year = 2026;
```

## The `_score` Column

When an FTS query is active, Lance's native scanner materializes a virtual `_score` column of type `Float` (BM25 relevance). Reference it explicitly in the projection to retrieve scores.

=== "SQL"
```sql
SELECT id, _score
FROM lance.db.docs
WHERE lance_match(content, 'python programming');
```

Referencing `_score` outside a query that uses `lance_match` raises an error:

```
_score can only be selected in queries using lance_match(); no FTS predicate found
```

## Requirements and Fallback Behavior

- The queried column must have an FTS index for the query to use the index. See [CREATE INDEX](../ddl/create-index.md) for the `USING fts` options (`base_tokenizer`, `language`, `stem`, etc.).
- Without an index, `lance_match` evaluates as a plain substring match in Spark — correct results, but no inverted-index acceleration and no BM25 `_score`.
- `lance_match` on a non-string column is rejected at analysis time.

## How It Works

1. `lance_match` is registered as a Spark SQL function via `SparkSessionExtensions.injectFunction`, producing a Catalyst `LanceMatch` expression at analysis time.
2. `LanceFtsPushdownRule` (an optimizer rule) detects `Filter(..., DataSourceV2Relation(LanceTable, ...))` patterns containing `LanceMatch` and moves the column/query text into the table's scan options before the V2 pushdown batch builds the physical scan.
3. `LanceDataset.newScanBuilder` reads those options and configures the `LanceScanBuilder` with an `FtsQuerySpec`.
4. `LanceFragmentScanner` calls `ScanOptions.Builder.fullTextQuery(FullTextQuery.match(queryText, column))`, so each fragment scan executes an FTS query natively.

## Notes and Limitations

- **Single FTS predicate per query**: `lance_match(col, 'x') AND lance_match(col, 'y')`, `OR` combinations, and nested `NOT lance_match(...)` fall back to Catalyst evaluation of each `LanceMatch` separately (correct but no index use). Multi-term / phrase / boolean variants are on the roadmap.
- **Default operator is `OR`**: query `'apache spark'` matches documents containing either term; explicit `AND` operator, fuzziness, and boost are not yet exposed through the SQL function.
- **`ORDER BY _score DESC` with single-query limits**: Spark's `RangePartitioner.sketch` may run a sampling scan in which the optimizer rule does not reapply, leading to a runtime error about `_score` without FTS. Workaround: collect results first, then sort on the client, or use a subquery/CTE to isolate the ranked scan.
- **Statistics**: FTS selectivity is not currently reported to Spark's cost-based optimizer, so `JoinSelection` may overestimate post-FTS row count when joining FTS-filtered tables.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.lance.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule}
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule, LanceFtsPushdownRule}
import org.apache.spark.sql.catalyst.parser.extensions.LanceSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.LanceDataSourceV2Strategy

Expand All @@ -24,11 +24,16 @@ class LanceSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
// parser extensions
extensions.injectParser { case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }

// SQL functions (lance_match, ...)
LanceFunctions.register(extensions)

// optimizer rules for fragment-aware joins
extensions.injectOptimizerRule(_ => LanceFragmentAwareJoinRule())

// propagate blob source credentials from read scans to the write side
extensions.injectOptimizerRule(_ => LanceBlobSourceContextRule())
// optimizer rule that pushes lance_match(...) predicates into Lance scans as FTS queries
extensions.injectOptimizerRule(_ => LanceFtsPushdownRule())

extensions.injectPlannerStrategy(LanceDataSourceV2Strategy(_))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

public class FtsQueryTest extends BaseFtsQueryTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.lance.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule}
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule, LanceFtsPushdownRule}
import org.apache.spark.sql.catalyst.parser.extensions.LanceSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.LanceDataSourceV2Strategy

Expand All @@ -24,11 +24,16 @@ class LanceSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
// parser extensions
extensions.injectParser { case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }

// SQL functions (lance_match, ...)
LanceFunctions.register(extensions)

// optimizer rules for fragment-aware joins
extensions.injectOptimizerRule(_ => LanceFragmentAwareJoinRule())

// propagate blob source credentials from read scans to the write side
extensions.injectOptimizerRule(_ => LanceBlobSourceContextRule())
// optimizer rule that pushes lance_match(...) predicates into Lance scans as FTS queries
extensions.injectOptimizerRule(_ => LanceFtsPushdownRule())

extensions.injectPlannerStrategy(LanceDataSourceV2Strategy(_))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

public class FtsQueryTest extends BaseFtsQueryTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.lance.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule}
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule, LanceFtsPushdownRule}
import org.apache.spark.sql.catalyst.parser.extensions.LanceSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.LanceDataSourceV2Strategy

Expand All @@ -24,11 +24,16 @@ class LanceSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
// parser extensions
extensions.injectParser { case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }

// SQL functions (lance_match, ...)
LanceFunctions.register(extensions)

// optimizer rules for fragment-aware joins
extensions.injectOptimizerRule(_ => LanceFragmentAwareJoinRule())

// propagate blob source credentials from read scans to the write side
extensions.injectOptimizerRule(_ => LanceBlobSourceContextRule())
// optimizer rule that pushes lance_match(...) predicates into Lance scans as FTS queries
extensions.injectOptimizerRule(_ => LanceFtsPushdownRule())

extensions.injectPlannerStrategy(LanceDataSourceV2Strategy(_))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.lance.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule}
import org.apache.spark.sql.catalyst.optimizer.{LanceBlobSourceContextRule, LanceFragmentAwareJoinRule, LanceFtsPushdownRule}
import org.apache.spark.sql.catalyst.parser.extensions.LanceSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.LanceDataSourceV2Strategy

Expand All @@ -24,11 +24,16 @@ class LanceSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
// parser extensions
extensions.injectParser { case (_, parser) => new LanceSparkSqlExtensionsParser(parser) }

// SQL functions (lance_match, ...)
LanceFunctions.register(extensions)

// optimizer rules for fragment-aware joins
extensions.injectOptimizerRule(_ => LanceFragmentAwareJoinRule())

// propagate blob source credentials from read scans to the write side
extensions.injectOptimizerRule(_ => LanceBlobSourceContextRule())
// optimizer rule that pushes lance_match(...) predicates into Lance scans as FTS queries
extensions.injectOptimizerRule(_ => LanceFtsPushdownRule())

extensions.injectPlannerStrategy(LanceDataSourceV2Strategy(_))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ public class LanceConstant {
public static final String ROW_ID = "_rowid";
public static final String ROW_ADDRESS = "_rowaddr";

/**
* Virtual FTS relevance column. Populated by the Lance native scanner automatically when a
* full-text query is active (see {@code lance_index::scalar::inverted::SCORE_COL} in Rust).
* Referencing it outside an FTS query is rejected at scan-build time.
*/
public static final String FTS_SCORE = "_score";

/**
* Internal scan options carrying FTS info from {@code LanceFtsPushdownRule} (logical plan) down
* to {@code LanceScanBuilder}. They are injected into the table options by the optimizer rule and
* read back by {@code LanceDataset.newScanBuilder}.
*/
public static final String LANCE_FTS_COLUMN_OPT = "_lance_fts_column";

public static final String LANCE_FTS_QUERY_OPT = "_lance_fts_query";

// CDF (Change Data Feed) version tracking columns
public static final String ROW_CREATED_AT_VERSION = "_row_created_at_version";
public static final String ROW_LAST_UPDATED_AT_VERSION = "_row_last_updated_at_version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,30 @@ public DataType dataType() {
}
};

// Virtual BM25 relevance column surfaced by the native scanner when an FTS query is active.
// Exposed unconditionally to Spark's analyzer; referencing it without lance_match() is rejected
// at scan-build time in LanceFragmentScanner.
public static final MetadataColumn FTS_SCORE_COLUMN =
new MetadataColumn() {
@Override
public String name() {
return LanceConstant.FTS_SCORE;
}

@Override
public DataType dataType() {
return DataTypes.FloatType;
}
};

public static final MetadataColumn[] METADATA_COLUMNS =
new MetadataColumn[] {
ROW_ID_COLUMN,
ROW_ADDRESS_COLUMN,
ROW_LAST_UPDATED_AT_VERSION_COLUMN,
ROW_CREATED_AT_VERSION_COLUMN,
FRAGMENT_ID_COLUMN
FRAGMENT_ID_COLUMN,
FTS_SCORE_COLUMN
};

protected final LanceSparkReadOptions readOptions;
Expand Down Expand Up @@ -296,13 +313,22 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveString
.fromOptions(mergedOptions)
.build();
}
return new LanceScanBuilder(
sparkSchema,
scanOptions,
initialStorageOptions,
namespaceImpl,
namespaceProperties,
shardingSpec);
LanceScanBuilder builder =
new LanceScanBuilder(
sparkSchema,
scanOptions,
initialStorageOptions,
namespaceImpl,
namespaceProperties,
shardingSpec);

// Apply FTS spec injected by LanceFtsPushdownRule via scan options.
String ftsColumn = caseInsensitiveStringMap.get(LanceConstant.LANCE_FTS_COLUMN_OPT);
String ftsQuery = caseInsensitiveStringMap.get(LanceConstant.LANCE_FTS_QUERY_OPT);
if (ftsColumn != null && ftsQuery != null) {
builder.setFtsQuery(new org.lance.spark.read.FtsQuerySpec(ftsColumn, ftsQuery));
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import org.lance.Dataset;
import org.lance.Fragment;
import org.lance.ipc.FullTextQuery;
import org.lance.ipc.LanceScanner;
import org.lance.ipc.ScanOptions;
import org.lance.spark.LanceConstant;
import org.lance.spark.LanceRuntime;
import org.lance.spark.LanceSparkReadOptions;
import org.lance.spark.read.FtsQuerySpec;
import org.lance.spark.read.LanceInputPartition;
import org.lance.spark.utils.BlobUtils;
import org.lance.spark.utils.Utils;
Expand Down Expand Up @@ -127,6 +129,17 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
if (inputPartition.getWhereCondition().isPresent()) {
scanOptions.filter(inputPartition.getWhereCondition().get());
}
boolean scoreRequested =
inputPartition.getSchema().getFieldIndex(LanceConstant.FTS_SCORE).nonEmpty();
if (scoreRequested && !inputPartition.getFtsQuery().isPresent()) {
throw new IllegalArgumentException(
LanceConstant.FTS_SCORE
+ " can only be selected in queries using lance_match(); no FTS predicate found");
}
if (inputPartition.getFtsQuery().isPresent()) {
FtsQuerySpec spec = inputPartition.getFtsQuery().get();
scanOptions.fullTextQuery(FullTextQuery.match(spec.query(), spec.column()));
}
scanOptions.batchSize(readOptions.getBatchSize());
if (readOptions.getNearest() != null) {
scanOptions.nearest(readOptions.getNearest());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

import java.io.Serializable;
import java.util.Objects;

public final class FtsQuerySpec implements Serializable {
private static final long serialVersionUID = 1L;

private final String column;
private final String query;

public FtsQuerySpec(String column, String query) {
this.column = Objects.requireNonNull(column, "column");
this.query = Objects.requireNonNull(query, "query");
}

public String column() {
return column;
}

public String query() {
return query;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FtsQuerySpec)) {
return false;
}
FtsQuerySpec that = (FtsQuerySpec) o;
return column.equals(that.column) && query.equals(that.query);
}

@Override
public int hashCode() {
return Objects.hash(column, query);
}

@Override
public String toString() {
return "FtsQuerySpec{column='" + column + "', query='" + query + "'}";
}
}
Loading
Loading