@@ -45,7 +45,6 @@ def __init__(self, config: Configuration):
4545 def insert (self , records : dict , options : InsertOptions = InsertOptions ()):
4646 interface = InterfaceName .INSERT .value
4747 log_info (InfoMessages .INSERT_TRIGGERED .value , interface = interface )
48-
4948 self ._checkConfig (interface )
5049
5150 jsonBody = getInsertRequestBody (records , options )
@@ -57,16 +56,35 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()):
5756 "sky-metadata" : json .dumps (getMetrics ())
5857 }
5958
60- response = requests .post (requestURL , data = jsonBody , headers = headers )
61- processedResponse = processResponse (response )
62- result , partial = convertResponse (records , processedResponse , options )
63- if partial :
64- log_error (SkyflowErrorMessages .BATCH_INSERT_PARTIAL_SUCCESS .value , interface )
65- elif 'records' not in result :
66- log_error (SkyflowErrorMessages .BATCH_INSERT_FAILURE .value , interface )
67- else :
68- log_info (InfoMessages .INSERT_DATA_SUCCESS .value , interface )
69- return result
59+ # Use for-loop for retry logic, avoid code repetition
60+ for attempt in range (2 ):
61+ try :
62+ # If jsonBody is a dict, use json=, else use data=
63+ if isinstance (jsonBody , dict ):
64+ response = requests .post (requestURL , json = jsonBody , headers = headers )
65+ else :
66+ response = requests .post (requestURL , data = jsonBody , headers = headers )
67+ processedResponse = processResponse (response )
68+ result , partial = convertResponse (records , processedResponse , options )
69+ if partial :
70+ log_error (SkyflowErrorMessages .BATCH_INSERT_PARTIAL_SUCCESS .value , interface )
71+ raise SkyflowError (SkyflowErrorCodes .PARTIAL_SUCCESS , SkyflowErrorMessages .BATCH_INSERT_PARTIAL_SUCCESS .value , result , interface = interface )
72+ if 'records' not in result :
73+ log_error (SkyflowErrorMessages .BATCH_INSERT_FAILURE .value , interface )
74+ raise SkyflowError (SkyflowErrorCodes .SERVER_ERROR , SkyflowErrorMessages .BATCH_INSERT_FAILURE .value , result , interface = interface )
75+ log_info (InfoMessages .INSERT_DATA_SUCCESS .value , interface )
76+ return result
77+ except requests .exceptions .ConnectionError as err :
78+ log_error (f'Connection error inserting record: { err } ' , interface )
79+ if attempt == 0 :
80+ log_info ("Retrying record..." , interface )
81+ continue
82+ else :
83+ raise SkyflowError (SkyflowErrorCodes .SERVER_ERROR , f"Connection error after retry: { err } " , interface = interface )
84+ except Exception as err :
85+ log_error (f'Unexpected error in insert: { err } ' , interface )
86+ raise
87+
7088
7189 def detokenize (self , records : dict , options : DetokenizeOptions = DetokenizeOptions ()):
7290 interface = InterfaceName .DETOKENIZE .value
0 commit comments