Skip to content

Commit

Permalink
feat: crypto_deterministic_config (#108) (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
hilliao authored Apr 1, 2021
1 parent ab0c1de commit c8cc8ba
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
170 changes: 170 additions & 0 deletions dlp/snippets/deid.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,92 @@ def deidentify_with_fpe(

# [END dlp_deidentify_fpe]

# [START dlp_deidentify_deterministic]
def deidentify_with_deterministic(
project,
input_str,
info_types,
surrogate_type=None,
key_name=None,
wrapped_key=None,
):
"""Deidentifies sensitive data in a string using deterministic encryption.
Args:
project: The Google Cloud project id to use as a parent resource.
input_str: The string to deidentify (will be treated as text).
surrogate_type: The name of the surrogate custom info type to use. Only
necessary if you want to reverse the deidentification process. Can
be essentially any arbitrary string, as long as it doesn't appear
in your dataset otherwise.
key_name: The name of the Cloud KMS key used to encrypt ('wrap') the
AES-256 key. Example:
key_name = 'projects/YOUR_GCLOUD_PROJECT/locations/YOUR_LOCATION/
keyRings/YOUR_KEYRING_NAME/cryptoKeys/YOUR_KEY_NAME'
wrapped_key: The encrypted ('wrapped') AES-256 key to use. This key
should be encrypted using the Cloud KMS key specified by key_name.
Returns:
None; the response from the API is printed to the terminal.
"""
import base64

# Import the client library
import google.cloud.dlp

# Instantiate a client
dlp = google.cloud.dlp_v2.DlpServiceClient()

# Convert the project id into a full resource id.
parent = f"projects/{project}"

# The wrapped key is base64-encoded, but the library expects a binary
# string, so decode it here.
wrapped_key = base64.b64decode(wrapped_key)

# Construct Deterministic encryption configuration dictionary
crypto_replace_deterministic_config = {
"crypto_key": {
"kms_wrapped": {"wrapped_key": wrapped_key, "crypto_key_name": key_name}
},
}

# Add surrogate type
if surrogate_type:
crypto_replace_deterministic_config["surrogate_info_type"] = {"name": surrogate_type}

# Construct inspect configuration dictionary
inspect_config = {"info_types": [{"name": info_type} for info_type in info_types]}

# Construct deidentify configuration dictionary
deidentify_config = {
"info_type_transformations": {
"transformations": [
{
"primitive_transformation": {
"crypto_deterministic_config": crypto_replace_deterministic_config
}
}
]
}
}

# Convert string to item
item = {"value": input_str}

# Call the API
response = dlp.deidentify_content(
request={
"parent": parent,
"deidentify_config": deidentify_config,
"inspect_config": inspect_config,
"item": item,
}
)

# Print results
print(response.item.value)

# [END dlp_deidentify_deterministic]


# [START dlp_reidentify_fpe]
def reidentify_with_fpe(
Expand Down Expand Up @@ -380,6 +466,90 @@ def reidentify_with_fpe(
# [END dlp_reidentify_fpe]


# [START dlp_reidentify_deterministic]
def reidentify_with_deterministic(
project,
input_str,
surrogate_type=None,
key_name=None,
wrapped_key=None,
):
"""Deidentifies sensitive data in a string using deterministic encryption.
Args:
project: The Google Cloud project id to use as a parent resource.
input_str: The string to deidentify (will be treated as text).
surrogate_type: The name of the surrogate custom info type to used
during the encryption process.
key_name: The name of the Cloud KMS key used to encrypt ('wrap') the
AES-256 key. Example:
keyName = 'projects/YOUR_GCLOUD_PROJECT/locations/YOUR_LOCATION/
keyRings/YOUR_KEYRING_NAME/cryptoKeys/YOUR_KEY_NAME'
wrapped_key: The encrypted ('wrapped') AES-256 key to use. This key
should be encrypted using the Cloud KMS key specified by key_name.
Returns:
None; the response from the API is printed to the terminal.
"""
import base64

# Import the client library
import google.cloud.dlp

# Instantiate a client
dlp = google.cloud.dlp_v2.DlpServiceClient()

# Convert the project id into a full resource id.
parent = f"projects/{project}"

# The wrapped key is base64-encoded, but the library expects a binary
# string, so decode it here.
wrapped_key = base64.b64decode(wrapped_key)

# Construct reidentify Configuration
reidentify_config = {
"info_type_transformations": {
"transformations": [
{
"primitive_transformation": {
"crypto_deterministic_config": {
"crypto_key": {
"kms_wrapped": {
"wrapped_key": wrapped_key,
"crypto_key_name": key_name,
}
},
"surrogate_info_type": {"name": surrogate_type},
}
}
}
]
}
}

inspect_config = {
"custom_info_types": [
{"info_type": {"name": surrogate_type}, "surrogate_type": {}}
]
}

# Convert string to item
item = {"value": input_str}

# Call the API
response = dlp.reidentify_content(
request={
"parent": parent,
"reidentify_config": reidentify_config,
"inspect_config": inspect_config,
"item": item,
}
)

# Print results
print(response.item.value)

# [END dlp_reidentify_deterministic]


# [START dlp_deidentify_free_text_with_fpe_using_surrogate]
def deidentify_free_text_with_fpe_using_surrogate(
project,
Expand Down
31 changes: 31 additions & 0 deletions dlp/snippets/deid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,21 @@ def test_deidentify_with_fpe(capsys):
assert "372819127" not in out


def test_deidentify_with_deterministic(capsys):
deid.deidentify_with_deterministic(
GCLOUD_PROJECT,
HARMFUL_STRING,
["US_SOCIAL_SECURITY_NUMBER"],
surrogate_type=SURROGATE_TYPE,
key_name=KEY_NAME,
wrapped_key=WRAPPED_KEY,
)

out, _ = capsys.readouterr()
assert "My SSN is" in out
assert "372819127" not in out


def test_deidentify_with_fpe_uses_surrogate_info_types(capsys):
deid.deidentify_with_fpe(
GCLOUD_PROJECT,
Expand Down Expand Up @@ -207,6 +222,22 @@ def test_reidentify_with_fpe(capsys):
assert "731997681" not in out


def test_reidentify_with_deterministic(capsys):
labeled_fpe_string = "My SSN is SSN_TOKEN(36):ATeRUd3WWnAHHFtjtl1bv+CT09FZ7hyqNas="

deid.reidentify_with_deterministic(
GCLOUD_PROJECT,
labeled_fpe_string,
surrogate_type=SURROGATE_TYPE,
key_name=KEY_NAME,
wrapped_key=WRAPPED_KEY,
)

out, _ = capsys.readouterr()

assert "SSN_TOKEN(" not in out


def test_deidentify_free_text_with_fpe_using_surrogate(capsys):
labeled_fpe_string = "My phone number is 4359916732"

Expand Down

0 comments on commit c8cc8ba

Please sign in to comment.