Skip to content

Commit b8efac9

Browse files
committed
SK-1018 Bulk support for Detokenize in Python SDK
- SK-1024 Test Driven Development of Bulk support for Detokenize - SK-1028 Fixed: Token key not added in error response for detokenize individual requests
1 parent 503c6e8 commit b8efac9

File tree

7 files changed

+165
-39
lines changed

7 files changed

+165
-39
lines changed

skyflow/errors/_skyflow_errors.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ class SkyflowErrorMessages(Enum):
8585
INVALID_UPSERT_COLUMN_TYPE = "upsert object column key has value of type %s, expected string"
8686
EMPTY_UPSERT_OPTION_TABLE = "upsert object table value is empty string at index %s, expected non-empty string"
8787
EMPTY_UPSERT_OPTION_COLUMN = "upsert object column value is empty string at index %s, expected non-empty string"
88-
88+
SERVER_ERROR = "Server returned errors, check SkyflowError.data for more"
89+
90+
BATCH_INSERT_PARTIAL_SUCCESS = "Insert Operation is partially successful"
91+
BATCH_INSERT_FAILURE = "Insert Operation is unsuccessful"
92+
8993
class SkyflowError(Exception):
9094
def __init__(self, code, message="An Error occured", data={}, interface: str = 'Unknown') -> None:
9195
if type(code) is SkyflowErrorCodes:

skyflow/vault/_client.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
from ._insert import getInsertRequestBody, processResponse, convertResponse
1010
from ._update import sendUpdateRequests, createUpdateResponseBody
1111
from ._config import Configuration, DeleteOptions
12-
from ._config import InsertOptions, ConnectionConfig, UpdateOptions
12+
from ._config import DetokenizeOptions, InsertOptions, ConnectionConfig, UpdateOptions
1313
from ._connection import createRequest
1414
from ._detokenize import sendDetokenizeRequests, createDetokenizeResponseBody
1515
from ._get_by_id import sendGetByIdRequests, createGetResponseBody
1616
from ._get import sendGetRequests
1717
import asyncio
1818
from skyflow.errors._skyflow_errors import SkyflowError, SkyflowErrorCodes, SkyflowErrorMessages
19-
from skyflow._utils import log_info, InfoMessages, InterfaceName, getMetrics
19+
from skyflow._utils import log_info, log_error, InfoMessages, InterfaceName, getMetrics
2020
from ._token import tokenProviderWrapper
2121

2222

@@ -61,12 +61,17 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()):
6161

6262
response = requests.post(requestURL, data=jsonBody, headers=headers)
6363
processedResponse = processResponse(response)
64-
result = convertResponse(records, processedResponse, options)
65-
66-
log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface)
64+
result, partial = convertResponse(records, processedResponse, options)
65+
# these statements will be covered in Integration Tests
66+
if partial:
67+
log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface)
68+
elif 'records' not in result:
69+
log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface)
70+
else:
71+
log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface)
6772
return result
6873

69-
def detokenize(self, records):
74+
def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()):
7075
interface = InterfaceName.DETOKENIZE.value
7176
log_info(InfoMessages.DETOKENIZE_TRIGGERED.value, interface)
7277

@@ -75,11 +80,12 @@ def detokenize(self, records):
7580
self.storedToken, self.tokenProvider, interface)
7681
url = self._get_complete_vault_url() + '/detokenize'
7782
responses = asyncio.run(sendDetokenizeRequests(
78-
records, url, self.storedToken))
79-
result, partial = createDetokenizeResponseBody(responses)
83+
records, url, self.storedToken, options))
84+
result, partial = createDetokenizeResponseBody(records, responses, options)
8085
if partial:
81-
raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS,
82-
SkyflowErrorMessages.PARTIAL_SUCCESS, result, interface=interface)
86+
raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.PARTIAL_SUCCESS, result, interface=interface)
87+
elif 'records' not in result:
88+
raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.SERVER_ERROR, result, interface=interface)
8389
else:
8490
log_info(InfoMessages.DETOKENIZE_SUCCESS.value, interface)
8591
return result

skyflow/vault/_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ class DeleteOptions:
4343
def __init__(self, tokens: bool=False):
4444
self.tokens = tokens
4545

46+
class DetokenizeOptions:
47+
def __init__(self, continueOnError: bool=True):
48+
self.continueOnError = continueOnError
49+
4650
class RequestMethod(Enum):
4751
GET = 'GET'
4852
POST = 'POST'

skyflow/vault/_detokenize.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
from ._config import RedactionType
99
from skyflow._utils import InterfaceName, getMetrics
10+
from skyflow.vault._config import DetokenizeOptions
1011

1112
interface = InterfaceName.DETOKENIZE.value
1213

@@ -39,8 +40,14 @@ def getDetokenizeRequestBody(data):
3940
})
4041
return requestBody
4142

43+
def getBulkDetokenizeRequestBody(records):
44+
bulkRequestBody = {"detokenizationParameters": []}
45+
for record in records:
46+
requestBody = getDetokenizeRequestBody(record)
47+
bulkRequestBody["detokenizationParameters"].append(requestBody["detokenizationParameters"][0])
48+
return bulkRequestBody
4249

43-
async def sendDetokenizeRequests(data, url, token):
50+
async def sendDetokenizeRequests(data, url, token, options: DetokenizeOptions):
4451

4552
tasks = []
4653

@@ -53,11 +60,17 @@ async def sendDetokenizeRequests(data, url, token):
5360
recordsType = str(type(records))
5461
raise SkyflowError(SkyflowErrorCodes.INVALID_INPUT, SkyflowErrorMessages.INVALID_RECORDS_TYPE.value % (
5562
recordsType), interface=interface)
63+
5664
validatedRecords = []
57-
for record in records:
58-
requestBody = getDetokenizeRequestBody(record)
65+
if not options.continueOnError:
66+
requestBody = getBulkDetokenizeRequestBody(records)
5967
jsonBody = json.dumps(requestBody)
6068
validatedRecords.append(jsonBody)
69+
else:
70+
for record in records:
71+
requestBody = getDetokenizeRequestBody(record)
72+
jsonBody = json.dumps(requestBody)
73+
validatedRecords.append(jsonBody)
6174
async with ClientSession() as session:
6275
for record in validatedRecords:
6376
headers = {
@@ -80,13 +93,13 @@ async def post(url, data, headers, session):
8093
return (await response.read(), response.status)
8194

8295

83-
def createDetokenizeResponseBody(responses):
96+
def createDetokenizeResponseBody(records, responses, options: DetokenizeOptions):
8497
result = {
8598
"records": [],
8699
"errors": []
87100
}
88101
partial = False
89-
for response in responses:
102+
for index, response in enumerate(responses):
90103
r = response.result()
91104
status = r[1]
92105
try:
@@ -96,16 +109,26 @@ def createDetokenizeResponseBody(responses):
96109
SkyflowErrorMessages.RESPONSE_NOT_JSON.value % r[0].decode('utf-8'), interface=interface)
97110

98111
if status == 200:
99-
temp = {}
100-
temp["token"] = jsonRes["records"][0]["token"]
101-
temp["value"] = jsonRes["records"][0]["value"]
102-
result["records"].append(temp)
112+
for record in jsonRes["records"]:
113+
temp = {}
114+
temp["token"] = record["token"]
115+
temp["value"] = record["value"]
116+
result["records"].append(temp)
103117
else:
104118
temp = {"error": {}}
119+
120+
if options.continueOnError:
121+
temp["token"] = records["records"][index]["token"]
122+
105123
temp["error"]["code"] = jsonRes["error"]["http_code"]
106124
temp["error"]["description"] = jsonRes["error"]["message"]
107125
if len(r) > 2 and r[2] != None:
108126
temp["error"]["description"] += ' - Request ID: ' + str(r[2])
109127
result["errors"].append(temp)
110128
partial = True
129+
if len(result["records"]) == 0:
130+
partial = False
131+
result.pop("records")
132+
elif len(result["errors"]) == 0:
133+
result.pop("errors")
111134
return result, partial

skyflow/vault/_insert.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ def convertResponse(request: dict, response: dict, options: InsertOptions):
145145
return buildResponseWithoutContinueOnError(responseArray, records, options.tokens)
146146

147147
def buildResponseWithContinueOnError(responseArray, records, tokens: bool, requestId):
148+
partial = False
148149
errors = []
149150
result = []
150151
for idx, response in enumerate(responseArray):
@@ -161,16 +162,18 @@ def buildResponseWithContinueOnError(responseArray, records, tokens: bool, reque
161162
else:
162163
result.append({'table': table, 'skyflow_id': skyflow_id})
163164
elif 'error' in body:
165+
partial = True
164166
message = body['error']
165167
message += ' - request id: ' + requestId
166168
error = {"code": status, "description": message}
167169
errors.append({"error": error})
168170
finalResponse = {"records": result, "errors": errors}
169171
if len(result) == 0:
172+
partial = False
170173
finalResponse.pop('records')
171174
elif len(errors) == 0:
172175
finalResponse.pop('errors')
173-
return finalResponse
176+
return finalResponse, partial
174177

175178
def buildResponseWithoutContinueOnError(responseArray, records, tokens: bool):
176179
# recordsSize = len(records)
@@ -184,7 +187,7 @@ def buildResponseWithoutContinueOnError(responseArray, records, tokens: bool):
184187
result.append({'table': table, 'fields': fieldsDict})
185188
else:
186189
result.append({'table': table, 'skyflow_id': skyflow_id})
187-
return {'records': result}
190+
return {'records': result}, False
188191

189192
def getUpsertColumn(tableName, upsertOptions):
190193
uniqueColumn:str = ''

tests/vault/test_detokenize.py

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
'''
44
import unittest
55
import os
6-
from skyflow.vault._detokenize import getDetokenizeRequestBody, createDetokenizeResponseBody
6+
from skyflow.vault._detokenize import getDetokenizeRequestBody, createDetokenizeResponseBody, getBulkDetokenizeRequestBody
77
from skyflow.errors._skyflow_errors import SkyflowError, SkyflowErrorCodes, SkyflowErrorMessages
88
from skyflow.vault._client import Client, Configuration
99
from skyflow.service_account import generate_bearer_token
10-
from skyflow.vault._config import RedactionType
10+
from skyflow.vault._config import DetokenizeOptions, RedactionType
1111
from dotenv import dotenv_values
1212
import warnings
1313

@@ -115,32 +115,87 @@ def testDetokenizeRedactionInvalidType(self):
115115
def testResponseBodySuccess(self):
116116
response = {"records": [{"token": "abc", "value": "secret"}]}
117117
self.add_mock_response(response, 200)
118-
res, partial = createDetokenizeResponseBody(self.mocked_futures)
118+
res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions())
119119
self.assertEqual(partial, False)
120-
self.assertEqual(res, {"records": response["records"], "errors": []})
120+
self.assertIn("records", res)
121+
self.assertNotIn("errors", res)
122+
self.assertEqual(len(res["records"]), 1)
123+
self.assertEqual(res, {"records": response["records"]})
121124

122125
def testResponseBodyPartialSuccess(self):
123126
success_response = {"records": [{"token": "abc", "value": "secret"}]}
124127
error_response = {"error": {"http_code": 404, "message": "not found"}}
125128
self.add_mock_response(success_response, 200)
126129
self.add_mock_response(error_response, 404)
127-
res, partial = createDetokenizeResponseBody(self.mocked_futures)
130+
131+
detokenizeRecords = {"records": [self.tokenField, self.tokenField]}
132+
133+
res, partial = createDetokenizeResponseBody(detokenizeRecords, self.mocked_futures, DetokenizeOptions())
128134
self.assertTrue(partial)
129-
self.assertEqual(res["records"], success_response["records"])
135+
136+
records = res["records"]
137+
self.assertIsNotNone(records)
138+
self.assertEqual(len(records), 1)
139+
self.assertEqual(records, success_response["records"])
140+
130141
errors = res["errors"]
131-
132142
self.assertIsNotNone(errors)
133143
self.assertEqual(len(errors), 1)
134144
self.assertEqual(errors[0]["error"]["code"],
135145
error_response["error"]["http_code"])
136146
self.assertEqual(
137147
errors[0]["error"]["description"], error_response["error"]["message"])
148+
149+
def testResponseBodyFailure(self):
150+
error_response = {"error": {"http_code": 404, "message": "not found"}}
151+
self.add_mock_response(error_response, 404)
152+
153+
res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions())
154+
self.assertFalse(partial)
155+
156+
self.assertNotIn("records", res)
157+
errors = res["errors"]
158+
self.assertIsNotNone(errors)
159+
self.assertEqual(len(errors), 1)
160+
self.assertEqual(errors[0]["error"]["code"],
161+
error_response["error"]["http_code"])
162+
self.assertEqual(
163+
errors[0]["error"]["description"], error_response["error"]["message"])
164+
165+
def testResponseBodySuccessWithContinueOnErrorAsFalse(self):
166+
response = {
167+
"records": [
168+
{"token": "abc", "value": "secret1"},
169+
{"token": "def", "value": "secret2"}
170+
]
171+
}
172+
self.add_mock_response(response, 200)
173+
res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions(False))
174+
self.assertEqual(partial, False)
175+
self.assertIn("records", res)
176+
self.assertNotIn("errors", res)
177+
self.assertEqual(len(res["records"]), 2)
178+
self.assertEqual(res, {"records": response["records"]})
179+
180+
def testResponseBodyFailureWithContinueOnErrorAsFalse(self):
181+
error_response = {"error": {"http_code": 404, "message": "not found"}}
182+
self.add_mock_response(error_response, 404)
183+
184+
res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions(False))
185+
self.assertFalse(partial)
186+
187+
self.assertNotIn("records", res)
188+
errors = res["errors"]
189+
self.assertIsNotNone(errors)
190+
self.assertEqual(len(errors), 1)
191+
self.assertEqual(errors[0]["error"]["code"], error_response["error"]["http_code"])
192+
self.assertEqual(errors[0]["error"]["description"], error_response["error"]["message"])
138193

139194
def testResponseNotJson(self):
140195
response = "not a valid json".encode()
141196
self.add_mock_response(response, 200, encode=False)
142197
try:
143-
createDetokenizeResponseBody(self.mocked_futures)
198+
createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions())
144199
except SkyflowError as error:
145200
expectedError = SkyflowErrorMessages.RESPONSE_NOT_JSON
146201
self.assertEqual(error.code, 200)
@@ -181,4 +236,30 @@ def testRequestBodyWithInValidRedaction(self):
181236
except SkyflowError as error:
182237
self.assertTrue(error)
183238
self.assertEqual(error.code, SkyflowErrorCodes.INVALID_INPUT.value)
184-
self.assertEqual(error.message, SkyflowErrorMessages.INVALID_REDACTION_TYPE.value % str(type(data["redaction"])))
239+
self.assertEqual(error.message, SkyflowErrorMessages.INVALID_REDACTION_TYPE.value % str(type(data["redaction"])))
240+
241+
def testGetBulkDetokenizeRequestBody(self):
242+
expectedOutput = {
243+
"detokenizationParameters": [
244+
{
245+
"token": self.testToken,
246+
"redaction": "REDACTED"
247+
},
248+
{
249+
"token": self.testToken,
250+
"redaction": "REDACTED"
251+
},
252+
]
253+
}
254+
data = {
255+
"token": self.testToken,
256+
"redaction": RedactionType.REDACTED
257+
}
258+
try:
259+
requestBody = getBulkDetokenizeRequestBody([data, data])
260+
self.assertIn("detokenizationParameters", requestBody)
261+
self.assertEqual(len(requestBody["detokenizationParameters"]), 2)
262+
self.assertEqual(expectedOutput, requestBody)
263+
except SkyflowError as e:
264+
self.fail('Should not have thrown an error')
265+

0 commit comments

Comments
 (0)