Skip to content

feat(xtest): Add missing attribute value test.#380

Open
c-r33d wants to merge 2 commits intomainfrom
feat/get-attr-values-def
Open

feat(xtest): Add missing attribute value test.#380
c-r33d wants to merge 2 commits intomainfrom
feat/get-attr-values-def

Conversation

@c-r33d
Copy link
Contributor

@c-r33d c-r33d commented Jan 15, 2026

  1. Add E2E encryption/decryption test to ensure that when a user passes in an attribute value with autoconfigure that we give back the attribute definition and it's key to perform the encryption.

Links to:

@c-r33d c-r33d requested review from a team as code owners January 15, 2026 16:36
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @c-r33d, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new end-to-end test case to validate the behavior of the key management system when handling attribute values during encryption and decryption. The core objective is to confirm that if an attribute value is provided via autoconfigure but its fully qualified name is not explicitly present, the system correctly falls back to using the key defined at the attribute level. This ensures robust key resolution and enhances the reliability of the encryption process under various attribute configuration scenarios.

Highlights

  • New Pytest Fixture: A new pytest fixture, attribute_missing_value_key_mapping, has been added. This fixture sets up an attribute with an attribute-level managed key mapping, specifically designed to simulate a scenario where an attribute value FQN is 'missing' but the attribute definition itself has a key.
  • End-to-End Test for Attribute Value Handling: A new end-to-end test, test_encrypt_with_missing_value_uses_definition_key, has been introduced. This test verifies that when an attribute value is passed with autoconfigure and the specific value FQN is not explicitly defined (i.e., 'missing'), the system correctly uses the key associated with the attribute definition for encryption and decryption.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@c-r33d c-r33d changed the title feat(core): Add missing attribute value test. feat(xtest): Add missing attribute value test. Jan 15, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a new end-to-end test to verify that when encrypting with an attribute value that doesn't exist, the key mapping from the attribute definition is correctly used. The changes are logical and the test case is a valuable addition. I've provided a couple of suggestions to refactor the new fixture and test function for improved clarity and robustness.

Comment on lines +391 to +420
def attribute_missing_value_key_mapping(
otdfctl: OpentdfCommandLineTool,
kas_entry_attr: abac.KasEntry,
temporary_namespace: abac.Namespace,
root_key: str,
) -> tuple[abac.Attribute, str, str]:
"""Attribute with attribute-level managed key mapping and a missing value FQN."""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management not supported by platform")

attr = otdfctl.attribute_create(
temporary_namespace, "missingval", abac.AttributeRule.ANY_OF, ["present"]
)
assert attr.values
value_fqn = attr.values[0].fqn
assert value_fqn

kas_key = otdfctl.kas_registry_create_key(
kas_entry_attr,
key_id="missing-value-def",
mode="local",
algorithm="rsa:2048",
wrapping_key=root_key,
wrapping_key_id="root",
)
otdfctl.key_assign_attr(kas_key, attr)

missing_value_fqn = value_fqn.rsplit("/", 1)[0] + "/missing"
return attr, missing_value_fqn, kas_key.key.key_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fixture attribute_missing_value_key_mapping can be improved for clarity and robustness.

  1. It returns an attr object that is unused by the test, which can be removed.
  2. The missing_value_fqn is constructed by parsing another FQN. It's more direct and robust to build it from the attribute's FQN.

This refactoring will also require an update in test_encrypt_with_missing_value_uses_definition_key to adjust to the new return signature. I've added a separate comment for that.

def attribute_missing_value_key_mapping(
    otdfctl: OpentdfCommandLineTool,
    kas_entry_attr: abac.KasEntry,
    temporary_namespace: abac.Namespace,
    root_key: str,
) -> tuple[str, str]:
    """Attribute with attribute-level managed key mapping and a missing value FQN."""
    pfs = tdfs.PlatformFeatureSet()
    if "key_management" not in pfs.features:
        pytest.skip("Key management not supported by platform")

    attr = otdfctl.attribute_create(
        temporary_namespace, "missingval", abac.AttributeRule.ANY_OF, ["present"]
    )
    assert attr.fqn, "Attribute FQN is missing"

    kas_key = otdfctl.kas_registry_create_key(
        kas_entry_attr,
        key_id="missing-value-def",
        mode="local",
        algorithm="rsa:2048",
        wrapping_key=root_key,
        wrapping_key_id="root",
    )
    otdfctl.key_assign_attr(kas_key, attr)

    missing_value_fqn = f"{attr.fqn}/value/missing"
    return missing_value_fqn, kas_key.key.key_id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied: fixture now returns (missing_value_fqn, key_id), builds missing FQN from attribute FQN, and removes unused attr from the return. Thanks for the suggestion.

Comment on lines +829 to +847
attribute_missing_value_key_mapping: tuple[Attribute, str, str],
encrypt_sdk: tdfs.SDK,
decrypt_sdk: tdfs.SDK,
tmp_dir: Path,
pt_file: Path,
kas_url_attr: str,
in_focus: set[tdfs.SDK],
):
"""Encrypts with a missing value FQN and verifies definition-level key mapping."""
if not in_focus & {encrypt_sdk, decrypt_sdk}:
pytest.skip("Not in focus")
tdfs.skip_if_unsupported(encrypt_sdk, "key_management")
tdfs.skip_if_unsupported(decrypt_sdk, "key_management")
tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure")
pfs = tdfs.PlatformFeatureSet()
tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)

_, missing_value_fqn, key_id = attribute_missing_value_key_mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test can be improved in a couple of ways:

  1. To align with the suggested changes in the attribute_missing_value_key_mapping fixture, the type hint for the fixture argument and the unpacking of the returned tuple should be updated.
  2. The multiple calls to tdfs.skip_if_unsupported for the encrypt_sdk can be combined into a single call for better readability.
Suggested change
attribute_missing_value_key_mapping: tuple[Attribute, str, str],
encrypt_sdk: tdfs.SDK,
decrypt_sdk: tdfs.SDK,
tmp_dir: Path,
pt_file: Path,
kas_url_attr: str,
in_focus: set[tdfs.SDK],
):
"""Encrypts with a missing value FQN and verifies definition-level key mapping."""
if not in_focus & {encrypt_sdk, decrypt_sdk}:
pytest.skip("Not in focus")
tdfs.skip_if_unsupported(encrypt_sdk, "key_management")
tdfs.skip_if_unsupported(decrypt_sdk, "key_management")
tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure")
pfs = tdfs.PlatformFeatureSet()
tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)
_, missing_value_fqn, key_id = attribute_missing_value_key_mapping
attribute_missing_value_key_mapping: tuple[str, str],
encrypt_sdk: tdfs.SDK,
decrypt_sdk: tdfs.SDK,
tmp_dir: Path,
pt_file: Path,
kas_url_attr: str,
in_focus: set[tdfs.SDK],
):
"""Encrypts with a missing value FQN and verifies definition-level key mapping."""
if not in_focus & {encrypt_sdk, decrypt_sdk}:
pytest.skip("Not in focus")
tdfs.skip_if_unsupported(encrypt_sdk, "key_management", "autoconfigure")
tdfs.skip_if_unsupported(decrypt_sdk, "key_management")
pfs = tdfs.PlatformFeatureSet()
tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)
missing_value_fqn, key_id = attribute_missing_value_key_mapping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated test to match new fixture return type and combined skip_if_unsupported for encrypt SDK in the parametrized test.

@c-r33d
Copy link
Contributor Author

c-r33d commented Jan 15, 2026

/gemini review

@sonarqubecloud
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
34.3% Duplication on New Code (required ≤ 8%)

See analysis details on SonarQube Cloud

@github-actions
Copy link

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds two new end-to-end tests to verify that when encrypting with a non-existent attribute value, the key mapping from the parent attribute definition is correctly used. This is tested for scenarios with and without key management enabled. The changes are well-implemented and the tests cover the intended functionality. I have a couple of suggestions to improve the test code by reducing duplication, which would enhance maintainability.

Comment on lines +828 to +931
def test_encrypt_with_missing_value_uses_definition_key(
attribute_missing_value_key_mapping: tuple[Attribute, str, str],
encrypt_sdk: tdfs.SDK,
decrypt_sdk: tdfs.SDK,
tmp_dir: Path,
pt_file: Path,
kas_url_attr: str,
in_focus: set[tdfs.SDK],
):
"""Encrypts with a missing value FQN and verifies definition-level key mapping."""
if not in_focus & {encrypt_sdk, decrypt_sdk}:
pytest.skip("Not in focus")
tdfs.skip_if_unsupported(encrypt_sdk, "key_management")
tdfs.skip_if_unsupported(decrypt_sdk, "key_management")
tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure")
pfs = tdfs.PlatformFeatureSet()
tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)

_, missing_value_fqn, key_id = attribute_missing_value_key_mapping

sample_name = f"missing-value-def-{encrypt_sdk}"
if sample_name in cipherTexts:
ct_file = cipherTexts[sample_name]
else:
ct_file = tmp_dir / f"{sample_name}.tdf"
encrypt_sdk.encrypt(
pt_file,
ct_file,
mime_type="text/plain",
container="ztdf",
attr_values=[missing_value_fqn],
target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk),
)
cipherTexts[sample_name] = ct_file

manifest = tdfs.manifest(ct_file)
policy = manifest.encryptionInformation.policy_object
assert policy.body.dataAttributes is not None
assert missing_value_fqn in [v.attribute for v in policy.body.dataAttributes]

assert len(manifest.encryptionInformation.keyAccess) == 1
kao = manifest.encryptionInformation.keyAccess[0]
assert kao.url == kas_url_attr
assert kao.kid == key_id

if kao.type == "ec-wrapped":
tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap")
rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf"
decrypt_sdk.decrypt(ct_file, rt_file, "ztdf")
assert filecmp.cmp(pt_file, rt_file)


def test_encrypt_with_missing_attr_value_public_key_mapping_no_km(
attribute_missing_value_public_key_mapping_no_km: tuple[Attribute, str, str],
encrypt_sdk: tdfs.SDK,
decrypt_sdk: tdfs.SDK,
tmp_dir: Path,
pt_file: Path,
kas_url_attr: str,
in_focus: set[tdfs.SDK],
):
"""Encrypts with missing value FQN using public-key-only mapping when KM is off."""
if not in_focus & {encrypt_sdk, decrypt_sdk}:
pytest.skip("Not in focus")
tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure")
pfs = tdfs.PlatformFeatureSet()
if "key_management" in pfs.features:
pytest.skip("Key management enabled; skipping public-key-only mapping test")
tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)

_, missing_value_fqn, key_id = attribute_missing_value_public_key_mapping_no_km

sample_name = f"missing-value-def-pub-{encrypt_sdk}"
if sample_name in cipherTexts:
ct_file = cipherTexts[sample_name]
else:
ct_file = tmp_dir / f"{sample_name}.tdf"
encrypt_sdk.encrypt(
pt_file,
ct_file,
mime_type="text/plain",
container="ztdf",
attr_values=[missing_value_fqn],
target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk),
)
cipherTexts[sample_name] = ct_file

manifest = tdfs.manifest(ct_file)
policy = manifest.encryptionInformation.policy_object
assert policy.body.dataAttributes is not None
assert missing_value_fqn in [v.attribute for v in policy.body.dataAttributes]

assert len(manifest.encryptionInformation.keyAccess) == 1
kao = manifest.encryptionInformation.keyAccess[0]
assert kao.url == kas_url_attr
assert kao.kid == key_id

if kao.type == "ec-wrapped":
tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap")
rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf"
decrypt_sdk.decrypt(ct_file, rt_file, "ztdf")
assert filecmp.cmp(pt_file, rt_file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These two test functions, test_encrypt_with_missing_value_uses_definition_key and test_encrypt_with_missing_attr_value_public_key_mapping_no_km, are nearly identical. To improve maintainability and reduce code duplication, consider combining them into a single parametrized test using @pytest.mark.parametrize. This would abstract the differences (the fixture used and the key management feature check) into parameters.

@pytest.mark.parametrize(
    "fixture_name, sample_name_prefix, check_km_support",
    [
        ("attribute_missing_value_key_mapping", "missing-value-def", True),
        (
            "attribute_missing_value_public_key_mapping_no_km",
            "missing-value-def-pub",
            False,
        ),
    ],
    ids=["key-management-enabled", "key-management-disabled"],
)
def test_encrypt_with_missing_value_uses_definition_key(
    fixture_name: str,
    sample_name_prefix: str,
    check_km_support: bool,
    request: pytest.FixtureRequest,
    encrypt_sdk: tdfs.SDK,
    decrypt_sdk: tdfs.SDK,
    tmp_dir: Path,
    pt_file: Path,
    kas_url_attr: str,
    in_focus: set[tdfs.SDK],
):
    """Encrypts with a missing value FQN and verifies definition-level key mapping."""
    if not in_focus & {encrypt_sdk, decrypt_sdk}:
        pytest.skip("Not in focus")

    # The fixtures handle skipping based on platform-level key management support.
    fixture_data = request.getfixturevalue(fixture_name)

    if check_km_support:
        # Check for SDK-level key management support.
        tdfs.skip_if_unsupported(encrypt_sdk, "key_management")
        tdfs.skip_if_unsupported(decrypt_sdk, "key_management")

    tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure")

    pfs = tdfs.PlatformFeatureSet()
    tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs)
    tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk)

    _, missing_value_fqn, key_id = fixture_data

    sample_name = f"{sample_name_prefix}-{encrypt_sdk}"
    if sample_name in cipherTexts:
        ct_file = cipherTexts[sample_name]
    else:
        ct_file = tmp_dir / f"{sample_name}.tdf"
        encrypt_sdk.encrypt(
            pt_file,
            ct_file,
            mime_type="text/plain",
            container="ztdf",
            attr_values=[missing_value_fqn],
            target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk),
        )
        cipherTexts[sample_name] = ct_file

    manifest = tdfs.manifest(ct_file)
    policy = manifest.encryptionInformation.policy_object
    assert policy.body.dataAttributes is not None
    assert missing_value_fqn in [v.attribute for v in policy.body.dataAttributes]

    assert len(manifest.encryptionInformation.keyAccess) == 1
    kao = manifest.encryptionInformation.keyAccess[0]
    assert kao.url == kas_url_attr
    assert kao.kid == key_id

    if kao.type == "ec-wrapped":
        tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap")
    rt_file = tmp_dir / f"{sample_name}-{decrypt_sdk}.untdf"
    decrypt_sdk.decrypt(ct_file, rt_file, "ztdf")
    assert filecmp.cmp(pt_file, rt_file)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined both cases into a single parametrized test to remove duplication and share the common assertions.

Comment on lines +895 to +896
if "key_management" in pfs.features:
pytest.skip("Key management enabled; skipping public-key-only mapping test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This if "key_management" in pfs.features: check is redundant. The attribute_missing_value_public_key_mapping_no_km fixture, which is used by this test, already contains the exact same skip logic. Removing this duplicate check will make the test cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the redundant key_management feature check in the test; the fixture now owns that skip logic.

@github-actions
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant