Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion skyflow/errors/_skyflow_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ class SkyflowErrorMessages(Enum):
INVALID_UPSERT_COLUMN_TYPE = "upsert object column key has value of type %s, expected string"
EMPTY_UPSERT_OPTION_TABLE = "upsert object table value is empty string at index %s, expected non-empty string"
EMPTY_UPSERT_OPTION_COLUMN = "upsert object column value is empty string at index %s, expected non-empty string"

SERVER_ERROR = "Server returned errors, check SkyflowError.data for more"

BATCH_INSERT_PARTIAL_SUCCESS = "Insert Operation is partially successful"
BATCH_INSERT_FAILURE = "Insert Operation is unsuccessful"

class SkyflowError(Exception):
def __init__(self, code, message="An Error occured", data={}, interface: str = 'Unknown') -> None:
if type(code) is SkyflowErrorCodes:
Expand Down
26 changes: 16 additions & 10 deletions skyflow/vault/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from ._insert import getInsertRequestBody, processResponse, convertResponse
from ._update import sendUpdateRequests, createUpdateResponseBody
from ._config import Configuration, DeleteOptions
from ._config import InsertOptions, ConnectionConfig, UpdateOptions
from ._config import DetokenizeOptions, InsertOptions, ConnectionConfig, UpdateOptions
from ._connection import createRequest
from ._detokenize import sendDetokenizeRequests, createDetokenizeResponseBody
from ._get_by_id import sendGetByIdRequests, createGetResponseBody
from ._get import sendGetRequests
import asyncio
from skyflow.errors._skyflow_errors import SkyflowError, SkyflowErrorCodes, SkyflowErrorMessages
from skyflow._utils import log_info, InfoMessages, InterfaceName, getMetrics
from skyflow._utils import log_info, log_error, InfoMessages, InterfaceName, getMetrics
from ._token import tokenProviderWrapper


Expand Down Expand Up @@ -61,12 +61,17 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()):

response = requests.post(requestURL, data=jsonBody, headers=headers)
processedResponse = processResponse(response)
result = convertResponse(records, processedResponse, options)

log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface)
result, partial = convertResponse(records, processedResponse, options)
# these statements will be covered in Integration Tests
if partial:
log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface)
elif 'records' not in result:
log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface)
else:
log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface)
return result

def detokenize(self, records):
def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()):
interface = InterfaceName.DETOKENIZE.value
log_info(InfoMessages.DETOKENIZE_TRIGGERED.value, interface)

Expand All @@ -75,11 +80,12 @@ def detokenize(self, records):
self.storedToken, self.tokenProvider, interface)
url = self._get_complete_vault_url() + '/detokenize'
responses = asyncio.run(sendDetokenizeRequests(
records, url, self.storedToken))
result, partial = createDetokenizeResponseBody(responses)
records, url, self.storedToken, options))
result, partial = createDetokenizeResponseBody(records, responses, options)
if partial:
raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS,
SkyflowErrorMessages.PARTIAL_SUCCESS, result, interface=interface)
raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.PARTIAL_SUCCESS, result, interface=interface)
elif 'records' not in result:
raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.SERVER_ERROR, result, interface=interface)
else:
log_info(InfoMessages.DETOKENIZE_SUCCESS.value, interface)
return result
Expand Down
4 changes: 4 additions & 0 deletions skyflow/vault/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class DeleteOptions:
def __init__(self, tokens: bool=False):
self.tokens = tokens

class DetokenizeOptions:
def __init__(self, continueOnError: bool=True):
self.continueOnError = continueOnError

class RequestMethod(Enum):
GET = 'GET'
POST = 'POST'
Expand Down
41 changes: 32 additions & 9 deletions skyflow/vault/_detokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
from ._config import RedactionType
from skyflow._utils import InterfaceName, getMetrics
from skyflow.vault._config import DetokenizeOptions

interface = InterfaceName.DETOKENIZE.value

Expand Down Expand Up @@ -39,8 +40,14 @@ def getDetokenizeRequestBody(data):
})
return requestBody

def getBulkDetokenizeRequestBody(records):
bulkRequestBody = {"detokenizationParameters": []}
for record in records:
requestBody = getDetokenizeRequestBody(record)
bulkRequestBody["detokenizationParameters"].append(requestBody["detokenizationParameters"][0])
return bulkRequestBody

async def sendDetokenizeRequests(data, url, token):
async def sendDetokenizeRequests(data, url, token, options: DetokenizeOptions):

tasks = []

Expand All @@ -53,11 +60,17 @@ async def sendDetokenizeRequests(data, url, token):
recordsType = str(type(records))
raise SkyflowError(SkyflowErrorCodes.INVALID_INPUT, SkyflowErrorMessages.INVALID_RECORDS_TYPE.value % (
recordsType), interface=interface)

validatedRecords = []
for record in records:
requestBody = getDetokenizeRequestBody(record)
if not options.continueOnError:
requestBody = getBulkDetokenizeRequestBody(records)
jsonBody = json.dumps(requestBody)
validatedRecords.append(jsonBody)
else:
for record in records:
requestBody = getDetokenizeRequestBody(record)
jsonBody = json.dumps(requestBody)
validatedRecords.append(jsonBody)
async with ClientSession() as session:
for record in validatedRecords:
headers = {
Expand All @@ -80,13 +93,13 @@ async def post(url, data, headers, session):
return (await response.read(), response.status)


def createDetokenizeResponseBody(responses):
def createDetokenizeResponseBody(records, responses, options: DetokenizeOptions):
result = {
"records": [],
"errors": []
}
partial = False
for response in responses:
for index, response in enumerate(responses):
r = response.result()
status = r[1]
try:
Expand All @@ -96,16 +109,26 @@ def createDetokenizeResponseBody(responses):
SkyflowErrorMessages.RESPONSE_NOT_JSON.value % r[0].decode('utf-8'), interface=interface)

if status == 200:
temp = {}
temp["token"] = jsonRes["records"][0]["token"]
temp["value"] = jsonRes["records"][0]["value"]
result["records"].append(temp)
for record in jsonRes["records"]:
temp = {}
temp["token"] = record["token"]
temp["value"] = record["value"]
result["records"].append(temp)
else:
temp = {"error": {}}

if options.continueOnError:
temp["token"] = records["records"][index]["token"]

temp["error"]["code"] = jsonRes["error"]["http_code"]
temp["error"]["description"] = jsonRes["error"]["message"]
if len(r) > 2 and r[2] != None:
temp["error"]["description"] += ' - Request ID: ' + str(r[2])
result["errors"].append(temp)
partial = True
if len(result["records"]) == 0:
partial = False
result.pop("records")
elif len(result["errors"]) == 0:
result.pop("errors")
return result, partial
7 changes: 5 additions & 2 deletions skyflow/vault/_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def convertResponse(request: dict, response: dict, options: InsertOptions):
return buildResponseWithoutContinueOnError(responseArray, records, options.tokens)

def buildResponseWithContinueOnError(responseArray, records, tokens: bool, requestId):
partial = False
errors = []
result = []
for idx, response in enumerate(responseArray):
Expand All @@ -161,16 +162,18 @@ def buildResponseWithContinueOnError(responseArray, records, tokens: bool, reque
else:
result.append({'table': table, 'skyflow_id': skyflow_id})
elif 'error' in body:
partial = True
message = body['error']
message += ' - request id: ' + requestId
error = {"code": status, "description": message}
errors.append({"error": error})
finalResponse = {"records": result, "errors": errors}
if len(result) == 0:
partial = False
finalResponse.pop('records')
elif len(errors) == 0:
finalResponse.pop('errors')
return finalResponse
return finalResponse, partial

def buildResponseWithoutContinueOnError(responseArray, records, tokens: bool):
# recordsSize = len(records)
Expand All @@ -184,7 +187,7 @@ def buildResponseWithoutContinueOnError(responseArray, records, tokens: bool):
result.append({'table': table, 'fields': fieldsDict})
else:
result.append({'table': table, 'skyflow_id': skyflow_id})
return {'records': result}
return {'records': result}, False

def getUpsertColumn(tableName, upsertOptions):
uniqueColumn:str = ''
Expand Down
99 changes: 90 additions & 9 deletions tests/vault/test_detokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
'''
import unittest
import os
from skyflow.vault._detokenize import getDetokenizeRequestBody, createDetokenizeResponseBody
from skyflow.vault._detokenize import getDetokenizeRequestBody, createDetokenizeResponseBody, getBulkDetokenizeRequestBody
from skyflow.errors._skyflow_errors import SkyflowError, SkyflowErrorCodes, SkyflowErrorMessages
from skyflow.vault._client import Client, Configuration
from skyflow.service_account import generate_bearer_token
from skyflow.vault._config import RedactionType
from skyflow.vault._config import DetokenizeOptions, RedactionType
from dotenv import dotenv_values
import warnings

Expand Down Expand Up @@ -115,32 +115,87 @@ def testDetokenizeRedactionInvalidType(self):
def testResponseBodySuccess(self):
response = {"records": [{"token": "abc", "value": "secret"}]}
self.add_mock_response(response, 200)
res, partial = createDetokenizeResponseBody(self.mocked_futures)
res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions())
self.assertEqual(partial, False)
self.assertEqual(res, {"records": response["records"], "errors": []})
self.assertIn("records", res)
self.assertNotIn("errors", res)
self.assertEqual(len(res["records"]), 1)
self.assertEqual(res, {"records": response["records"]})

def testResponseBodyPartialSuccess(self):
success_response = {"records": [{"token": "abc", "value": "secret"}]}
error_response = {"error": {"http_code": 404, "message": "not found"}}
self.add_mock_response(success_response, 200)
self.add_mock_response(error_response, 404)
res, partial = createDetokenizeResponseBody(self.mocked_futures)

detokenizeRecords = {"records": [self.tokenField, self.tokenField]}

res, partial = createDetokenizeResponseBody(detokenizeRecords, self.mocked_futures, DetokenizeOptions())
self.assertTrue(partial)
self.assertEqual(res["records"], success_response["records"])

records = res["records"]
self.assertIsNotNone(records)
self.assertEqual(len(records), 1)
self.assertEqual(records, success_response["records"])

errors = res["errors"]

self.assertIsNotNone(errors)
self.assertEqual(len(errors), 1)
self.assertEqual(errors[0]["error"]["code"],
error_response["error"]["http_code"])
self.assertEqual(
errors[0]["error"]["description"], error_response["error"]["message"])

def testResponseBodyFailure(self):
error_response = {"error": {"http_code": 404, "message": "not found"}}
self.add_mock_response(error_response, 404)

res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions())
self.assertFalse(partial)

self.assertNotIn("records", res)
errors = res["errors"]
self.assertIsNotNone(errors)
self.assertEqual(len(errors), 1)
self.assertEqual(errors[0]["error"]["code"],
error_response["error"]["http_code"])
self.assertEqual(
errors[0]["error"]["description"], error_response["error"]["message"])

def testResponseBodySuccessWithContinueOnErrorAsFalse(self):
response = {
"records": [
{"token": "abc", "value": "secret1"},
{"token": "def", "value": "secret2"}
]
}
self.add_mock_response(response, 200)
res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions(False))
self.assertEqual(partial, False)
self.assertIn("records", res)
self.assertNotIn("errors", res)
self.assertEqual(len(res["records"]), 2)
self.assertEqual(res, {"records": response["records"]})

def testResponseBodyFailureWithContinueOnErrorAsFalse(self):
error_response = {"error": {"http_code": 404, "message": "not found"}}
self.add_mock_response(error_response, 404)

res, partial = createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions(False))
self.assertFalse(partial)

self.assertNotIn("records", res)
errors = res["errors"]
self.assertIsNotNone(errors)
self.assertEqual(len(errors), 1)
self.assertEqual(errors[0]["error"]["code"], error_response["error"]["http_code"])
self.assertEqual(errors[0]["error"]["description"], error_response["error"]["message"])

def testResponseNotJson(self):
response = "not a valid json".encode()
self.add_mock_response(response, 200, encode=False)
try:
createDetokenizeResponseBody(self.mocked_futures)
createDetokenizeResponseBody(self.data, self.mocked_futures, DetokenizeOptions())
except SkyflowError as error:
expectedError = SkyflowErrorMessages.RESPONSE_NOT_JSON
self.assertEqual(error.code, 200)
Expand Down Expand Up @@ -181,4 +236,30 @@ def testRequestBodyWithInValidRedaction(self):
except SkyflowError as error:
self.assertTrue(error)
self.assertEqual(error.code, SkyflowErrorCodes.INVALID_INPUT.value)
self.assertEqual(error.message, SkyflowErrorMessages.INVALID_REDACTION_TYPE.value % str(type(data["redaction"])))
self.assertEqual(error.message, SkyflowErrorMessages.INVALID_REDACTION_TYPE.value % str(type(data["redaction"])))

def testGetBulkDetokenizeRequestBody(self):
expectedOutput = {
"detokenizationParameters": [
{
"token": self.testToken,
"redaction": "REDACTED"
},
{
"token": self.testToken,
"redaction": "REDACTED"
},
]
}
data = {
"token": self.testToken,
"redaction": RedactionType.REDACTED
}
try:
requestBody = getBulkDetokenizeRequestBody([data, data])
self.assertIn("detokenizationParameters", requestBody)
self.assertEqual(len(requestBody["detokenizationParameters"]), 2)
self.assertEqual(expectedOutput, requestBody)
except SkyflowError as e:
self.fail('Should not have thrown an error')

Loading