Skip to content

Conversation

@wojiaodoubao
Copy link
Collaborator

No description provided.

@github-actions github-actions bot added the enhancement New feature or request label Jan 7, 2026
@jackye1995
Copy link
Collaborator

Discussed a bit offline, my current biggest concern is that once we enter this mode, user can never read a partition table directly and has to always go through namespace. I thought about maybe the implementation of the spec will have the responsibility to keep those datasets up to date asynchronously, but that is also a lot of complexity, maybe this approach is inevitable.

Another direction I think we have not really explored, is actually partition level locking. What if we only add the locking mechanism in __manifest, and writers will essentially acquire a lock before writing and continue to extend the lease until completion, would that make things simpler and allow dataset to still be accessible? I feel for normal cross-partition writes this could be an approach to consider.

@jackye1995
Copy link
Collaborator

jackye1995 commented Jan 7, 2026

Actually, I think there could be a better way.

Today, we already have the concept of an external manifest store in Lance dataset, because S3 was not atomic in write and we used Dynamo. Here we are essentially using another Lance table as the external manifest store, and with that we gain ability to commit to multiple tables as well.

Now, the key idea is that, instead of using CDF, we will change __manifest to record all versions of the table, serving as an external manifest store. Every time a new version is committed to the table, it will add a new entry to the __manifest table. We already have a bitmap index on the object_id column, so we will leverage that, add a new object type table_version, and the object id would be just table_id$version.

Now we have achieved both and avoided using CDF: We will be able to atomically write to multiple partitions by atomically update __manifest to add new table versions against multiple tables, and also we have a way to automatically resolve the latest version in Lance dataset through external manifest store mechanism and treat __manifest as our source of truth for the ordered list of manifests.

What do you think?

@wojiaodoubao
Copy link
Collaborator Author

wojiaodoubao commented Jan 8, 2026

The idea is great, here is my understanding. Instead of storing a read_version column for each partition table, we will add multiple table version rows per table. For instance:
* v2$e9f0g1h2i3j4k5l6$m7n8o9p0q1r2s3t4$dataset$1,
* v2$e9f0g1h2i3j4k5l6$m7n8o9p0q1r2s3t4$dataset$2,
* v2$e9f0g1h2i3j4k5l6$m7n8o9p0q1r2s3t4$dataset$3

These entries indicate that the table v2$e9f0g1h2i3j4k5l6$m7n8o9p0q1r2s3t4$dataset$1 has 3 versions.

I have 2 questions want to explore.

First Question

If table_3 is not updated in a transaction, do we still need to add a new table_3$new_version entry when committing the transaction? Here are the trade-offs:
* If we add a new entry for every table each time, all the newest versions of each table would be the same. We could record the newest version(or maybe call it "partition namespace snapshot version") as a row in __manifest, allowing us to directly concatenate the object_id when querying for a table_version. However, this approach would insert redundant entries into __manifest for tables not involved in any transaction update.
* If we only add a new table_id$version entry when the table changes: It avoids a large number of redundant records. But to retrieve the latest version of a table, we would have to query all version records for that specific table_id and select the newest one.

Second Question

The commit process of a partition namespace transaction is roughly as follows:
* The namespace version at transaction start is S0.
* The namespace version at transaction commit is expected to be S1.
* The transaction aims to update the namespace version to S2.

First, we check if there are any conflicts between the S0→S1 and S0→S2 changes. Since we already track all version records of each table at both S0 and S1, it is straightforward to identify the changes to Lance tables and their corresponding transactions without relying on CDF. This is good.

Next, we start committing S2. The S2 commit process must be a CAS operation: "Update the partition namespace to S2 if and only if its current version is S1; otherwise, fail."

Based on Lance’s current public interfaces, it seems impossible to implement the above CAS semantics (please correct me if my understanding is wrong).

Perhaps we could add a transaction commit interface to Lance with the following semantics: commit(txn: Transaction, expected_version: Version). The transaction will commit successfully if and only if the dataset’s latest version matches the specified expected_version and there are no commit conflicts. When committing, we can use the version of __manifest at S1 as the expected_version condition.

@wojiaodoubao
Copy link
Collaborator Author

Add serial commit to lance lance-format/lance#5662

@wojiaodoubao
Copy link
Collaborator Author

Now, the key idea is that, instead of using CDF, we will change __manifest to record all versions of the table, serving as an external manifest store. Every time a new version is committed to the table, it will add a new entry to the __manifest table. We already have a bitmap index on the object_id column, so we will leverage that, add a new object type table_version, and the object id would be just table_id$version.

I updated the spec based on this design. With only one minor difference by changing "new entry for each partitioned table commit" to "make read_version recording the version timeline". Instead of insert a new entry, we can append the latest version to read_version then update.

@wojiaodoubao
Copy link
Collaborator Author

The SPEC changes describe the process of cross-partition transaction commit. In the following, I will use an example to explain why serial commit is necessary for this cross-partition transaction commit process.

Assume the initial state of the Partition Table is as follows:

S0
* table0 read_version=[1]
* table1 read_version=[1]

At this point, 2 concurrent transactions (t0 and t1) are issued, both of which observe S0.

t1 modifies table0 via a detached commit, resulting in:

S1
* table0 read_version=[1,21]
* table1 read_version=[1]

t2 modifies both table0 and table1 via detached commits, resulting in:

S2
* table0 read_version=[1,31]
* table1 read_version=[1,32]

(Here, 21, 31, and 32 are all detached versions.)

Next, t0 and t1 are to commit, i.e., to insert records into the __manifest.

* For S1: insert <table0_1, read_version=21>;
* For S2: insert <table0_1, read_version=31>, <table1_1, read_version=32>;

Since the __manifest table does not support write skew, t0 and t1 can commit to the __manifest successfully at the same time, thus we obtain 2 table_0_1 records.

If we replace the read_version with a List to record the read_version's timeline, the result remains the same.

* For S1: update to <table_0, read_version=[1, 21]>;
* For S2: update to <table_0, read_version=[1, 31]>, <table_1, read_version=[1, 32]>.

Depending on the commit order, the final possible states are the following 2 cases.

* <table_0, read_version=[1, 21]>、<table_1, read_version=[1, 32]>;
* <table_0, read_version=[1, 31]>、<table_1, read_version=[1, 32]>;

The root cause is: our commits in the __manifest require Compare-And-Swap (CAS) semantics, i.e., "when the __manifest state is S0, modify it to S1". However, the Lance dataset currently does not support strict serializable isolation, so it is impossible to implement CAS semantics.

@jackye1995
Copy link
Collaborator

I think this should be solved by the primary key dedupe feature that me and Vino added previously.

* For S1: 
* insert <table0_1, read_version=21>
* For S2: 
* insert <table0_1, read_version=31>, <table1_1, read_version=32>;

In this case, the primary key is object_id, and table0_1 is inserted (using merge insert) in both cases, so they will conflict and one will fail and needs to rebase against the other.

@wojiaodoubao
Copy link
Collaborator Author

Even if duplicate rows are not inserted (using primary key merge_into to prevent this), there are still semantic issues. Returning to the previous example: assume the commit order is S0 -> t1 -> t2.

* First, t1 commits <table_0, read_version=[1, 21]>;
* Then t2 commits <table_0, read_version=[1, 31]> and <table_1, read_version=[1, 32]>;

Primary key merge_into ensures that t2 cannot insert into table_0 during its commit, but table_1 can still be inserted successfully, resulting in the final state:

<table_0, read_version=[1, 21]>、<table_1, read_version=[1, 32]>;

But what we need is t2 to fail completely and trigger a transaction retry, rather than having one insertion succeed and the other fail. Because t2's commit must perform a conflict check with t1's commit (a conflict check on every modified partitioned table). Serial Commit can guarantee that t2 fails entirely.

@jackye1995
Copy link
Collaborator

but table_1 can still be inserted successfully

I don't think so. When you run merge-insert, your source is

table0_1, read_version=31
table1_1, read_version=32

as a whole.

If table0_1 detects duplicate and fails, table1_1 will not succeed. They are committed together.

@wojiaodoubao
Copy link
Collaborator Author

@jackye1995 Thanks your explanation, you are right. We can use a merge_insert to commit a cross-partition transaction, something like this.

    source = pa.Table.from_pydict(
        {
            "object_id": ['v1$k7m2n9p4q8r5s3t6$dataset$3'],
            "object_type": ['table_version'],
            "metadata": [None],
            "read_version": [5],
            "partition_field_event_date": [None],
        },
        schema=schema,
    )
    ds.merge_insert(['object_id', 'object_type']) \
        .when_matched_fail() \
        .when_not_matched_insert_all() \
        .execute(source)

The the primary key dedupe feature makes sure concurrent merge_insert on the same primary keys will conflicts and trigger a retry. When retry, the merge_insert will find the row matched and fail.

It is a smart solution, let me update the SPEC with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants