Conversation
Summary of ChangesHello @c-r33d, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new end-to-end test case to validate the behavior of the key management system when handling attribute values during encryption and decryption. The core objective is to confirm that if an attribute value is provided via Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request adds a new end-to-end test to verify that when encrypting with an attribute value that doesn't exist, the key mapping from the attribute definition is correctly used. The changes are logical and the test case is a valuable addition. I've provided a couple of suggestions to refactor the new fixture and test function for improved clarity and robustness.
| def attribute_missing_value_key_mapping( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_attr: abac.KasEntry, | ||
| temporary_namespace: abac.Namespace, | ||
| root_key: str, | ||
| ) -> tuple[abac.Attribute, str, str]: | ||
| """Attribute with attribute-level managed key mapping and a missing value FQN.""" | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management not supported by platform") | ||
|
|
||
| attr = otdfctl.attribute_create( | ||
| temporary_namespace, "missingval", abac.AttributeRule.ANY_OF, ["present"] | ||
| ) | ||
| assert attr.values | ||
| value_fqn = attr.values[0].fqn | ||
| assert value_fqn | ||
|
|
||
| kas_key = otdfctl.kas_registry_create_key( | ||
| kas_entry_attr, | ||
| key_id="missing-value-def", | ||
| mode="local", | ||
| algorithm="rsa:2048", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| otdfctl.key_assign_attr(kas_key, attr) | ||
|
|
||
| missing_value_fqn = value_fqn.rsplit("/", 1)[0] + "/missing" | ||
| return attr, missing_value_fqn, kas_key.key.key_id |
There was a problem hiding this comment.
The fixture attribute_missing_value_key_mapping can be improved for clarity and robustness.
- It returns an
attrobject that is unused by the test, which can be removed. - The
missing_value_fqnis constructed by parsing another FQN. It's more direct and robust to build it from the attribute's FQN.
This refactoring will also require an update in test_encrypt_with_missing_value_uses_definition_key to adjust to the new return signature. I've added a separate comment for that.
def attribute_missing_value_key_mapping(
otdfctl: OpentdfCommandLineTool,
kas_entry_attr: abac.KasEntry,
temporary_namespace: abac.Namespace,
root_key: str,
) -> tuple[str, str]:
"""Attribute with attribute-level managed key mapping and a missing value FQN."""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management not supported by platform")
attr = otdfctl.attribute_create(
temporary_namespace, "missingval", abac.AttributeRule.ANY_OF, ["present"]
)
assert attr.fqn, "Attribute FQN is missing"
kas_key = otdfctl.kas_registry_create_key(
kas_entry_attr,
key_id="missing-value-def",
mode="local",
algorithm="rsa:2048",
wrapping_key=root_key,
wrapping_key_id="root",
)
otdfctl.key_assign_attr(kas_key, attr)
missing_value_fqn = f"{attr.fqn}/value/missing"
return missing_value_fqn, kas_key.key.key_idThere was a problem hiding this comment.
Applied: fixture now returns (missing_value_fqn, key_id), builds missing FQN from attribute FQN, and removes unused attr from the return. Thanks for the suggestion.
| attribute_missing_value_key_mapping: tuple[Attribute, str, str], | ||
| encrypt_sdk: tdfs.SDK, | ||
| decrypt_sdk: tdfs.SDK, | ||
| tmp_dir: Path, | ||
| pt_file: Path, | ||
| kas_url_attr: str, | ||
| in_focus: set[tdfs.SDK], | ||
| ): | ||
| """Encrypts with a missing value FQN and verifies definition-level key mapping.""" | ||
| if not in_focus & {encrypt_sdk, decrypt_sdk}: | ||
| pytest.skip("Not in focus") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "key_management") | ||
| tdfs.skip_if_unsupported(decrypt_sdk, "key_management") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) | ||
| tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
|
|
||
| _, missing_value_fqn, key_id = attribute_missing_value_key_mapping |
There was a problem hiding this comment.
This test can be improved in a couple of ways:
- To align with the suggested changes in the
attribute_missing_value_key_mappingfixture, the type hint for the fixture argument and the unpacking of the returned tuple should be updated. - The multiple calls to
tdfs.skip_if_unsupportedfor theencrypt_sdkcan be combined into a single call for better readability.
| attribute_missing_value_key_mapping: tuple[Attribute, str, str], | |
| encrypt_sdk: tdfs.SDK, | |
| decrypt_sdk: tdfs.SDK, | |
| tmp_dir: Path, | |
| pt_file: Path, | |
| kas_url_attr: str, | |
| in_focus: set[tdfs.SDK], | |
| ): | |
| """Encrypts with a missing value FQN and verifies definition-level key mapping.""" | |
| if not in_focus & {encrypt_sdk, decrypt_sdk}: | |
| pytest.skip("Not in focus") | |
| tdfs.skip_if_unsupported(encrypt_sdk, "key_management") | |
| tdfs.skip_if_unsupported(decrypt_sdk, "key_management") | |
| tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") | |
| pfs = tdfs.PlatformFeatureSet() | |
| tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) | |
| tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) | |
| _, missing_value_fqn, key_id = attribute_missing_value_key_mapping | |
| attribute_missing_value_key_mapping: tuple[str, str], | |
| encrypt_sdk: tdfs.SDK, | |
| decrypt_sdk: tdfs.SDK, | |
| tmp_dir: Path, | |
| pt_file: Path, | |
| kas_url_attr: str, | |
| in_focus: set[tdfs.SDK], | |
| ): | |
| """Encrypts with a missing value FQN and verifies definition-level key mapping.""" | |
| if not in_focus & {encrypt_sdk, decrypt_sdk}: | |
| pytest.skip("Not in focus") | |
| tdfs.skip_if_unsupported(encrypt_sdk, "key_management", "autoconfigure") | |
| tdfs.skip_if_unsupported(decrypt_sdk, "key_management") | |
| pfs = tdfs.PlatformFeatureSet() | |
| tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) | |
| tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) | |
| missing_value_fqn, key_id = attribute_missing_value_key_mapping |
There was a problem hiding this comment.
Updated test to match new fixture return type and combined skip_if_unsupported for encrypt SDK in the parametrized test.
|
/gemini review |
|
X-Test Failure Report✅ js-v0.4.34 |
There was a problem hiding this comment.
Code Review
This pull request adds two new end-to-end tests to verify that when encrypting with a non-existent attribute value, the key mapping from the parent attribute definition is correctly used. This is tested for scenarios with and without key management enabled. The changes are well-implemented and the tests cover the intended functionality. I have a couple of suggestions to improve the test code by reducing duplication, which would enhance maintainability.
| def test_encrypt_with_missing_value_uses_definition_key( | ||
| attribute_missing_value_key_mapping: tuple[Attribute, str, str], | ||
| encrypt_sdk: tdfs.SDK, | ||
| decrypt_sdk: tdfs.SDK, | ||
| tmp_dir: Path, | ||
| pt_file: Path, | ||
| kas_url_attr: str, | ||
| in_focus: set[tdfs.SDK], | ||
| ): | ||
| """Encrypts with a missing value FQN and verifies definition-level key mapping.""" | ||
| if not in_focus & {encrypt_sdk, decrypt_sdk}: | ||
| pytest.skip("Not in focus") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "key_management") | ||
| tdfs.skip_if_unsupported(decrypt_sdk, "key_management") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) | ||
| tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
|
|
||
| _, missing_value_fqn, key_id = attribute_missing_value_key_mapping | ||
|
|
||
| sample_name = f"missing-value-def-{encrypt_sdk}" | ||
| if sample_name in cipherTexts: | ||
| ct_file = cipherTexts[sample_name] | ||
| else: | ||
| ct_file = tmp_dir / f"{sample_name}.tdf" | ||
| encrypt_sdk.encrypt( | ||
| pt_file, | ||
| ct_file, | ||
| mime_type="text/plain", | ||
| container="ztdf", | ||
| attr_values=[missing_value_fqn], | ||
| target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), | ||
| ) | ||
| cipherTexts[sample_name] = ct_file | ||
|
|
||
| manifest = tdfs.manifest(ct_file) | ||
| policy = manifest.encryptionInformation.policy_object | ||
| assert policy.body.dataAttributes is not None | ||
| assert missing_value_fqn in [v.attribute for v in policy.body.dataAttributes] | ||
|
|
||
| assert len(manifest.encryptionInformation.keyAccess) == 1 | ||
| kao = manifest.encryptionInformation.keyAccess[0] | ||
| assert kao.url == kas_url_attr | ||
| assert kao.kid == key_id | ||
|
|
||
| if kao.type == "ec-wrapped": | ||
| tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") | ||
| rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf" | ||
| decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") | ||
| assert filecmp.cmp(pt_file, rt_file) | ||
|
|
||
|
|
||
| def test_encrypt_with_missing_attr_value_public_key_mapping_no_km( | ||
| attribute_missing_value_public_key_mapping_no_km: tuple[Attribute, str, str], | ||
| encrypt_sdk: tdfs.SDK, | ||
| decrypt_sdk: tdfs.SDK, | ||
| tmp_dir: Path, | ||
| pt_file: Path, | ||
| kas_url_attr: str, | ||
| in_focus: set[tdfs.SDK], | ||
| ): | ||
| """Encrypts with missing value FQN using public-key-only mapping when KM is off.""" | ||
| if not in_focus & {encrypt_sdk, decrypt_sdk}: | ||
| pytest.skip("Not in focus") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" in pfs.features: | ||
| pytest.skip("Key management enabled; skipping public-key-only mapping test") | ||
| tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) | ||
| tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
|
|
||
| _, missing_value_fqn, key_id = attribute_missing_value_public_key_mapping_no_km | ||
|
|
||
| sample_name = f"missing-value-def-pub-{encrypt_sdk}" | ||
| if sample_name in cipherTexts: | ||
| ct_file = cipherTexts[sample_name] | ||
| else: | ||
| ct_file = tmp_dir / f"{sample_name}.tdf" | ||
| encrypt_sdk.encrypt( | ||
| pt_file, | ||
| ct_file, | ||
| mime_type="text/plain", | ||
| container="ztdf", | ||
| attr_values=[missing_value_fqn], | ||
| target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), | ||
| ) | ||
| cipherTexts[sample_name] = ct_file | ||
|
|
||
| manifest = tdfs.manifest(ct_file) | ||
| policy = manifest.encryptionInformation.policy_object | ||
| assert policy.body.dataAttributes is not None | ||
| assert missing_value_fqn in [v.attribute for v in policy.body.dataAttributes] | ||
|
|
||
| assert len(manifest.encryptionInformation.keyAccess) == 1 | ||
| kao = manifest.encryptionInformation.keyAccess[0] | ||
| assert kao.url == kas_url_attr | ||
| assert kao.kid == key_id | ||
|
|
||
| if kao.type == "ec-wrapped": | ||
| tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") | ||
| rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf" | ||
| decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") | ||
| assert filecmp.cmp(pt_file, rt_file) |
There was a problem hiding this comment.
These two test functions, test_encrypt_with_missing_value_uses_definition_key and test_encrypt_with_missing_attr_value_public_key_mapping_no_km, are nearly identical. To improve maintainability and reduce code duplication, consider combining them into a single parametrized test using @pytest.mark.parametrize. This would abstract the differences (the fixture used and the key management feature check) into parameters.
@pytest.mark.parametrize(
"fixture_name, sample_name_prefix, check_km_support",
[
("attribute_missing_value_key_mapping", "missing-value-def", True),
(
"attribute_missing_value_public_key_mapping_no_km",
"missing-value-def-pub",
False,
),
],
ids=["key-management-enabled", "key-management-disabled"],
)
def test_encrypt_with_missing_value_uses_definition_key(
fixture_name: str,
sample_name_prefix: str,
check_km_support: bool,
request: pytest.FixtureRequest,
encrypt_sdk: tdfs.SDK,
decrypt_sdk: tdfs.SDK,
tmp_dir: Path,
pt_file: Path,
kas_url_attr: str,
in_focus: set[tdfs.SDK],
):
"""Encrypts with a missing value FQN and verifies definition-level key mapping."""
if not in_focus & {encrypt_sdk, decrypt_sdk}:
pytest.skip("Not in focus")
# The fixtures handle skipping based on platform-level key management support.
fixture_data = request.getfixturevalue(fixture_name)
if check_km_support:
# Check for SDK-level key management support.
tdfs.skip_if_unsupported(encrypt_sdk, "key_management")
tdfs.skip_if_unsupported(decrypt_sdk, "key_management")
tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure")
pfs = tdfs.PlatformFeatureSet()
tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)
_, missing_value_fqn, key_id = fixture_data
sample_name = f"{sample_name_prefix}-{encrypt_sdk}"
if sample_name in cipherTexts:
ct_file = cipherTexts[sample_name]
else:
ct_file = tmp_dir / f"{sample_name}.tdf"
encrypt_sdk.encrypt(
pt_file,
ct_file,
mime_type="text/plain",
container="ztdf",
attr_values=[missing_value_fqn],
target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk),
)
cipherTexts[sample_name] = ct_file
manifest = tdfs.manifest(ct_file)
policy = manifest.encryptionInformation.policy_object
assert policy.body.dataAttributes is not None
assert missing_value_fqn in [v.attribute for v in policy.body.dataAttributes]
assert len(manifest.encryptionInformation.keyAccess) == 1
kao = manifest.encryptionInformation.keyAccess[0]
assert kao.url == kas_url_attr
assert kao.kid == key_id
if kao.type == "ec-wrapped":
tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap")
rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf"
decrypt_sdk.decrypt(ct_file, rt_file, "ztdf")
assert filecmp.cmp(pt_file, rt_file)There was a problem hiding this comment.
Combined both cases into a single parametrized test to remove duplication and share the common assertions.
| if "key_management" in pfs.features: | ||
| pytest.skip("Key management enabled; skipping public-key-only mapping test") |
There was a problem hiding this comment.
Removed the redundant key_management feature check in the test; the fixture now owns that skip logic.
X-Test Failure Report✅ js-v0.4.34 |


Links to: