Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core:Remove unnecessary row filtering in deleted manifest file #4316

Closed
wants to merge 4 commits into from

Conversation

hililiwei
Copy link
Contributor

@hililiwei hililiwei commented Mar 12, 2022

After we update the data via upsert, if we use the field not in the identifierFieldIds to query the data, we may get inaccurate result.

Because the metrics values ( such as upper_bounds \ lower_bounds )of non-identifierFieldIds in the manifest file are new, and these new data may be hit by the row filter, this will cause the update to the old data to not take effect, even though it has been upsert, old data is still fetched and put into the result set.

In this PR, try to come up with a solution. When filtering the delete manifest file, if the identifierFieldIds is not empty, only the fields in the identifierFieldIds are retained in the row filter, always keep True for filter predicate of non-identifierFieldIds fields.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for the patch @hililiwei!

And thank you for expanding on the issue in the other PR from @xloya. Can you please link that PR and possibly restate some of the facts from over there so reviewers have more of the facts in one place? I’ll DM you to not add too much unneeded stuff to the PR.

Given the situation doesn’t occur with Avro files, I’d like to get input from others on the best way to resolve this.

This seems like the solution to go with, but I’d like to get input from others on whether there is possibly a better way to compose this with existing Predicate and related interfaces.

But overall this is really great work. And thank you to @xloya for opening the original PR to bring attention to this issue.

cc @szehon-ho @stevenzwu (when you get a chance)

@hililiwei
Copy link
Contributor Author

We seem to have the same issues with @xloya. Here are some comments from #4311


Of course, we have a scenario to write data to iceberg's v2 table through Flink CDC. They have non-primary key query scenarios. The current implementation in core will add a filter, which may lose the latest seq num equality delete files for Flink streaming writing. E.g: Table schema : (id int (primary key), date date) When seq num=1, Flink writes a record with id=1, date='2021-01-01', will insert a data record with id=1, date='2021-01-01', and a equality delete data record with an id=1, date='2021-01-01'; When seq num=2, writes a record with id=1, date='2022-01-01' to update, will insert a data record with id=1, date='2022-01-01', and a equality delete record with id=1 ,date='2022-01-01' ; At this time, when using select * from xxx where date < '2022-01-01' to query, due to the addition of the filter, the equality delete file written when seq num=2 will be filtered out.

This is currently the easiest way to fix the problem. If we want to optimize for Flink upsert, then I think may need to read the latest records with the primary key that already exists in the table and write them to the equality delete file when writing, while instead of writing the inserted data to the equality delete file

#4311 (comment)


We seem to have the same issues.

And it only happens on our Parquet table (doesn't happen on our Avro table). After analysis, we found that the problem occurred in the metric (such as upper_bounds \ lower_bounds )filtering process of the MANIFEST file (avro tables did not generate these metric data).

Our solution is different from this PR. We try to trim the row filter fileds used for metric filtering. For deleted manifest file, only the metric filed id in the equality_ids will be filtered, please refer to #4316 for details.

I'm not sure which way is better, or that there is another better solution.

I'm sorry if your PR is to address a different issues.

Thx. 😄


@hililiwei hililiwei force-pushed the core-remove-row-filter branch from 70d46f2 to 4889692 Compare March 14, 2022 06:14
@rdblue
Copy link
Contributor

rdblue commented Mar 14, 2022

Because the metrics values ( such as upper_bounds \ lower_bounds )of non-identifierFieldIds in the manifest file are new, and these new data may be hit by the row filter

Can you help me understand what's happening? Are you saying that the lower/upper bounds in the delete file for other columns are not accurate for the deleted rows? That's what it sounds like when you say the bounds are "new" -- are these values for the upserted row columns rather than the deleted row columns?

@hililiwei
Copy link
Contributor Author

hililiwei commented Mar 15, 2022

I'm going to try to explain what happened, which may be a bit long, 😄

prerequisite:

  • Table:test_upsert_query
	id INT NOT NULL, 
	province STRING NOT NULL, 
	dt DATE, 
	PRIMARY KEY(id,province) NOT ENFORCED
	PARTITIONED BY (province)
	WITH
	('write.format.default'='PARQUET',
	 'write.upsert.enabled'='true',
	 'format-version'='2')
  1. Execute SQL:
INSERT INTO test_upsert_query 
VALUES 
(1, 'a', TO_DATE('2022-03-01')),
(2, 'b', TO_DATE('2022-03-01')),
(1, 'b', TO_DATE('2022-03-01'))

Two manifest files are generated:

  • 5385d1b0-82d6-4328-8115-eb9761f68aef-m0.avro
  • 5385d1b0-82d6-4328-8115-eb9761f68aef-m1.avro

m1.avo is a delete manifest file, view it:

java -jar ~/plat/iceberg-tools-1.0-SNAPSHOT.jar manifest2json 5385d1b0-82d6-4328-8115-eb9761f68aef-m1.avro

{
		"status": 1,
		"snapshot_id": {
			"long": 8012546586091949255
		},
		"sequence_number": null,
		"data_file": {
			"content": 2,
			"file_path": "file:/tmp/junit3813726988943192507/db.db/test_upsert_query/data/province=a/00000-0-5dc08ee2-d7e7-4348-a726-99ac0041c24f-00002.parquet",
			"file_format": "PARQUET",
			"partition": {
				"province": {
					"string": "a"
				}
			},
			…………
			……omitted non-critical information……
			…………
			"sort_order_id": {
				"int": 0
			}
		}
	}
	{
		"status": 1,
		"snapshot_id": {
			"long": 8012546586091949255
		},
		"sequence_number": null,
		"data_file": {
			"content": 2,
			"file_path": "file:/tmp/junit3813726988943192507/db.db/test_upsert_query/data/province=b/00000-0-5dc08ee2-d7e7-4348-a726-99ac0041c24f-00004.parquet",
			"file_format": "PARQUET",
			"partition": {
				"province": {
					"string": "b"
				}
			},
			"record_count": 2,
			"file_size_in_bytes": 1053,
			"column_sizes": {
				"array": [{
					"key": 1,
					"value": 49
				}, {
					"key": 2,
					"value": 89
				}, {
					"key": 3,
					"value": 94
				}]
			},
			"value_counts": {
				"array": [{
					"key": 1,
					"value": 2
				}, {
					"key": 2,
					"value": 2
				}, {
					"key": 3,
					"value": 2
				}]
			},
			"null_value_counts": {
				"array": [{
					"key": 1,
					"value": 0
				}, {
					"key": 2,
					"value": 0
				}, {
					"key": 3,
					"value": 0
				}]
			},
			"nan_value_counts": {
				"array": []
			},
			"lower_bounds": {
				"array": [{
					"key": 1,
					"value": "1"
				}, {
					"key": 2,
					"value": "b"
				}, {
					"key": 3,
					"value": "19052"
				}]
			},
			"upper_bounds": {
				"array": [{
					"key": 1,
					"value": "2"
				}, {
					"key": 2,
					"value": "b"
				}, {
					"key": 3,
					"value": "19052"
				}]
			},
			"key_metadata": null,
			"split_offsets": null,
			"equality_ids": {
				"array": [1, 2]
			},
			"sort_order_id": {
				"int": 0
			}
		}
	}

Notice upper_bounds\ lower_bounds. We can see that the upper/lower value of the dt field(key=3) is 19052.
Remember this 19052.

Execute upsert:

INSERT INTO test_upsert_query 
VALUES 
(4, 'a', TO_DATE('2022-03-02')),
(5, 'b', TO_DATE('2022-03-02')),
(1, 'b', TO_DATE('2022-03-02'))

The dt of (1,'b') is updated to '2022-03-02' (:19053) .

Check again. The following two manifest files are displayed::

  • c3fd1626-d26f-4067-b4b0-a245d59a0615-m0.avro
  • c3fd1626-d26f-4067-b4b0-a245d59a0615-m1.avro

This time, we're still just looking at c3fd1626-d26f-4067-b4b0-a245d59a0615-m1.avro:

java -jar ~/plat/iceberg-tools-1.0-SNAPSHOT.jar manifest2json c3fd1626-d26f-4067-b4b0-a245d59a0615-m1.avro
{
		"status": 1,
		"snapshot_id": {
			"long": 4408783096627784376
		},
		"sequence_number": null,
		"data_file": {
			"content": 2,
			"file_path": "file:/tmp/junit3813726988943192507/db.db/test_upsert_query/data/province=a/00000-0-9749ef4a-b579-43f5-acbb-ad41e32be3c8-00002.parquet",
			"file_format": "PARQUET",
			"partition": {
				"province": {
					"string": "a"
				}
			},
			"record_count": 1,
			…………
			……omitted non-critical information……
			…………
			"sort_order_id": {
				"int": 0
			}
		}
	}
	{
		"status": 1,
		"snapshot_id": {
			"long": 4408783096627784376
		},
		"sequence_number": null,
		"data_file": {
			"content": 2,
			"file_path": "file:/tmp/junit3813726988943192507/db.db/test_upsert_query/data/province=b/00000-0-9749ef4a-b579-43f5-acbb-ad41e32be3c8-00004.parquet",
			"file_format": "PARQUET",
			"partition": {
				"province": {
					"string": "b"
				}
			},
			"record_count": 2,
			"file_size_in_bytes": 1053,
			"column_sizes": {
				"array": [{
					"key": 1,
					"value": 49
				}, {
					"key": 2,
					"value": 89
				}, {
					"key": 3,
					"value": 94
				}]
			},
			"value_counts": {
				"array": [{
					"key": 1,
					"value": 2
				}, {
					"key": 2,
					"value": 2
				}, {
					"key": 3,
					"value": 2
				}]
			},
			"null_value_counts": {
				"array": [{
					"key": 1,
					"value": 0
				}, {
					"key": 2,
					"value": 0
				}, {
					"key": 3,
					"value": 0
				}]
			},
			"nan_value_counts": {
				"array": []
			},
			"lower_bounds": {
				"array": [{
					"key": 1,
					"value": "1"
				}, {
					"key": 2,
					"value": "b"
				}, {
					"key": 3,
					"value": "19053"
				}]
			},
			"upper_bounds": {
				"array": [{
					"key": 1,
					"value": "5"
				}, {
					"key": 2,
					"value": "b"
				}, {
					"key": 3,
					"value": "19053"
				}]
			},
			"key_metadata": null,
			"split_offsets": null,
			"equality_ids": {
				"array": [1, 2]
			},
			"sort_order_id": {
				"int": 0
			}
		}
	}

Now, the value of dt (key=3) in lower_bounds is 19053.

If we query data at this time:

SELECT * FROM test_upsert_query WHERE dt < '2022-03-02'

During the query, the manifest file is filtered based on the values of metric lower_bounds\ upper_bounds\ etc.
In the second delete manifest file, lower_bounds show us that the min of dt is 19053, which is equal to '2022-03-02'. As a result, the file is ignored.

So in the result, it will contain (1,'b',TO_DATE('2022-03-01')).

In this PR, I tried to trim the filter predicate, and only the field in equality_ids would take effect.

If the process and result of our analysis are wrong, please do not hesitate to tell me.

@rdblue
Copy link
Contributor

rdblue commented Mar 16, 2022

Thanks, @hililiwei! Great repro case. I just went through this with @kbendick and we have a slightly different fix to prevent this with future tables. He's also going to open a PR with a fix for existing tables that we'll ping you to review.

@hililiwei
Copy link
Contributor Author

Thanks, @hililiwei! Great repro case. I just went through this with @kbendick and we have a slightly different fix to prevent this with future tables. He's also going to open a PR with a fix for existing tables that we'll ping you to review.

It's the least I could do. Looking forward to your PR. Please feel free to let me know if there's anything I can do to help. 😄

…ert mode by updating equalityDeleteSchema

Co-authored-by: hililiwei <hililiwei@gmail.com>
@hililiwei
Copy link
Contributor Author

hililiwei commented Mar 18, 2022

@kbendick and I are trying to solve this, ref: kbendick#71

@hililiwei hililiwei force-pushed the core-remove-row-filter branch from 4889692 to 3abe264 Compare March 18, 2022 15:26
@github-actions github-actions bot added the data label Mar 18, 2022
@hililiwei hililiwei force-pushed the core-remove-row-filter branch from 3abe264 to 23836a0 Compare March 18, 2022 15:29
@hililiwei
Copy link
Contributor Author

Hi, @kbendick, based on our previous discussion, I've raised a preliminary solution. It deletes data based on the key. Please take a look at it. Thx.

@@ -74,7 +82,8 @@ public void write(RowData row) throws IOException {
case INSERT:
case UPDATE_AFTER:
if (upsert) {
writer.delete(row);
RowData wrap = RowDataProjection.create(schema, deleteSchema).wrap(row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't create a new projection every time. Instead it should create one and reuse it by calling wrap every time.

writer.deleteKey(wrap);
} else {
writer.delete(row);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to change? I think you just want to fix the upsert case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addtionally, in the upsert case, data doesn't come through as UPDATE_BEFORE. Though this might be needed to keep CDC data in check.

We've been workin on the PR in my fork but I'll run some tests.

return fieldGetter[index].getFieldOrNull(struct);
if (struct.isNullAt(index)) {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this need to change? NPE in a test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an NPE in some test cases, yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I'm going to investigate a bit furhter as I do think it might be indicative of a bug.

I think if we use the correct deletion schema in all cases, the NPEs will go away. I am testing now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or actually, I don't think this change is needed. If there's no fieldGetter for a given index, that's likely indicative of a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this didn't need to be changed if we use the full schema as the deletion schema outside of upsert case like here https://github.com/apache/iceberg/pull/4364/files#diff-bbdfbcbd83d2e6f53d402e804dcd0a6fd28ab39d1cc3f74a88641ae8763fde3bR75-R87

@@ -57,6 +60,7 @@
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
this.wrapperDelete = new RowDataWrapper(FlinkSchemaUtil.convert(deleteSchema), deleteSchema.asStruct());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about keyWrapper instead?

@@ -66,6 +70,10 @@ RowDataWrapper wrapper() {
return wrapper;
}

RowDataWrapper wrapperDelete() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there an accessor for this?

this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec,
ArrayUtil.toIntArray(equalityFieldIds), schema, null);
ArrayUtil.toIntArray(equalityFieldIds), TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct schema for upsert but should not be used for delete when the row passed to the delete file is the deleted row.

@@ -66,6 +70,10 @@ RowDataWrapper wrapper() {
return wrapper;
}

RowDataWrapper wrapperDelete() {
return wrapperDelete;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert the changes in all modules other than 1.14? That makes reviewing and updating for review comments much easier. Once this goes in we can backport to 1.12 and 1.13.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed. I left a similar comment on another file because the changes were a bit much.

It keeps the discussion of a specific change all in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I believe that @hililiwei made the changes as existing tests might not pass in earlier versions of Flink.

But for something important, we should still keep the changes in one PR while reviewing. Otherwise it's difficult for others to review.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good, but we don't want to change how Flink handles CDC deletes or updates. This should be a narrower change that only applies to upserts.

if (struct.isNullAt(index)) {
return null;
}
return this.fieldGetter[index].getFieldOrNull(struct);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the work to just Flink 1.14 for now, so comments don't get duplicated on many things.

@hililiwei
Copy link
Contributor Author

@rdblue Sorry for the late reply, I just got up for work due to jet lag. Fortunately @kbendick is already working on it, he has raised new PR #4364, maybe we can continue on that, So I'm going to close this PR. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants