Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
{"name":"upload attachment to global", "function": "upload_attachment_to_global", "screenshot": "none" },
{"name":"download attachment from global", "function": "download_attachment_from_global", "screenshot": "none" },

# data-store actions
{"name": "data store: read", "function": "data_store_read", "screenshot": "none"},
{"name": "data store: write", "function": "data_store_write", "screenshot": "none"},

# Client specific actions
{"name": "write into text file", "function": "text_write", "screenshot": "none" },
{"name": "compare identifiers content", "function": "compare_identifiers_content", "screenshot": "none" },
Expand Down
136 changes: 136 additions & 0 deletions Framework/Built_In_Automation/Sequential_Actions/common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5515,3 +5515,139 @@ def authenticator_code_generator(data_set):

except Exception:
return CommonUtil.Exception_Handler(sys.exc_info())


def data_store_read(data_set):
"""
This function reads data from datastore

Args:
data_set:
------------------------------------------------------------------------------
table name | input parameter | xyz
where | input parameter | $col1 = 'Hello' AND name = 'Mini'
columns | optional parameter | name, age
data store: read | common action | variable_name_to_save_data_to
------------------------------------------------------------------------------
Return:
`list of datastore` if success
`zeuz_failed` if fails
"""

sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME

try:
table_name = columns = var_name = ""
params = {}
for left, mid, right in data_set:
if left.strip() == 'table name':
table_name = right.strip()
params['table_name'] = table_name
if left.strip() == 'where':
q = right.strip()
temp = q.lower().replace('and', ',').replace('or', ',').split(',')

t = temp[0].split('=')
params['and_' + t[0].strip()] = t[1].strip()
i = 1
for s in q.split():
if s.lower() == 'and':
t = temp[i].split('=')
params['and_' + t[0].strip()] = t[1].strip()
i+=1

if s.lower() == 'or':
t = temp[i].split('=')
params['or_'+t[0].strip()] = t[1].strip()

i += 1

headers = RequestFormatter.add_api_key_to_headers({})
headers['headers']['content-type'] = 'application/json'
headers['headers']['X-API-KEY'] = ConfigModule.get_config_value("Authentication", "api-key")
res = requests.get(
RequestFormatter.form_uri('data_store/data_store/custom_operation/'),
params=json.dumps(params),
verify=False,
**headers
)
#

# print(res.text)
CommonUtil.ExecLog(sModuleInfo, f"Captured following output:\n{res.text}", 1)

return sr.Set_Shared_Variables(var_name, json.loads(res.text))

except Exception:
return CommonUtil.Exception_Handler(sys.exc_info())

def data_store_write(data_set):
"""
This function reads data from datastore

Args:
data_set:
------------------------------------------------------------------------------
table name | input parameter | xyz
where | input parameter | $col1 = 'Hello' AND name = 'Mini'
columns | optional parameter | name, age
data | element parameter | column_name=Arifa
data store: write| common action | variable_name_to_save_data_to
------------------------------------------------------------------------------
Return:
`list of datastore` if success
`zeuz_failed` if fails
"""

sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME

try:
table_name = columns = var_name = ""
params = {}
data={}
for left, mid, right in data_set:
if left.strip() == 'table name':
table_name = right.strip()
params['table_name'] = table_name
if left.strip() == 'where':
q = right.strip()
temp = q.lower().replace('and', ',').replace('or', ',').split(',')

t = temp[0].split('=')
params['and_' + t[0].strip()] = t[1].strip()
i = 1
for s in q.split():
if s.lower() == 'and':
t = temp[i].split('=')
params['and_' + t[0].strip()] = t[1].strip()
i+=1

if s.lower() == 'or':
t = temp[i].split('=')
params['or_'+t[0].strip()] = t[1].strip()

i += 1
if left.strip() == 'data':
temp = right.strip().split(',')
for t in temp:
tt=t.split('=')
data[tt[0].strip()]=tt[1].strip()
headers = RequestFormatter.add_api_key_to_headers({})
headers['headers']['content-type'] = 'application/json'
headers['headers']['X-API-KEY'] = ConfigModule.get_config_value("Authentication", "api-key")
res = requests.patch(
RequestFormatter.form_uri('data_store/data_store/custom_operation/'),
params=json.dumps(params),
data=json.dumps(data),
verify=False,
**headers
)
#

# print(res.text)
CommonUtil.ExecLog(sModuleInfo, f"Captured following output:\n{res.text}", 1)

return sr.Set_Shared_Variables(var_name, json.loads(res.text))

except Exception:
return CommonUtil.Exception_Handler(sys.exc_info())
11 changes: 7 additions & 4 deletions Framework/Built_In_Automation/Web/REST/BuiltInFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,14 +1600,16 @@ def Get_Oauth2_Access_Token_URl(step_data):
request_token_uri = ''
token_name = ''
var_name='access_token_url'

scope = ''
for row in step_data:
if row[0] == "client id":
client_id = str(row[2]).strip()
elif row[0] == "client secret":
client_secret = str(row[2]).strip()
elif row[0] == "access point":
access_point = str(row[2]).strip()
elif row[0] == "scope":
scope = str(row[2]).strip()
elif row[0] == "grant type":
grant_type = str(row[2]).strip()
elif row[0] == "base uri":
Expand Down Expand Up @@ -1662,10 +1664,11 @@ def Get_Oauth2_Access_Token_URl(step_data):
)

data = {
'code': 'test',
'state': 'test',
'response_type': 'code',
'grant_type': grant_type,
'redirect_uri': redirect_uri
# 'grant_type': grant_type,
'redirect_uri': redirect_uri,
'scope':scope,
}
params = data
url = service.get_authorize_url(**params)
Expand Down
35 changes: 19 additions & 16 deletions Framework/Built_In_Automation/Web/Selenium/BuiltInFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def get_performance_metrics(dataset):
return CommonUtil.Exception_Handler(sys.exc_info())


initial_download_folder = None
@logger
def Open_Browser(dependency, window_size_X=None, window_size_Y=None, capability=None, browser_options=None):
""" Launch browser and create instance """
Expand Down Expand Up @@ -513,12 +514,14 @@ def Open_Browser(dependency, window_size_X=None, window_size_Y=None, capability=
options.add_argument(
"--headless"
) # Enable headless operation if dependency set
download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
global initial_download_folder
initial_download_folder = download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
prefs = {
"profile.default_content_settings.popups": 0,
"download.default_directory": download_dir,
"download.prompt_for_download": False,
"download.directory_upgrade": True
"download.directory_upgrade": True,
'safebrowsing.enabled': 'false'
}
options.add_experimental_option('prefs', prefs)
if remote_host:
Expand Down Expand Up @@ -582,7 +585,7 @@ def Open_Browser(dependency, window_size_X=None, window_size_Y=None, capability=
capabilities = webdriver.DesiredCapabilities().FIREFOX
capabilities['acceptSslCerts'] = True
profile = webdriver.FirefoxProfile()
download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
initial_download_folder = download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
profile.set_preference("browser.download.folderList", 2)
profile.set_preference("browser.download.manager.showWhenStarting", False)
profile.set_preference("browser.download.dir", download_dir)
Expand Down Expand Up @@ -630,7 +633,7 @@ def Open_Browser(dependency, window_size_X=None, window_size_Y=None, capability=
Since this module inherits Selenium module so all updates will be inherited as well
"""
from Framework.edge_module.msedge.selenium_tools import EdgeOptions, Edge
download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
initial_download_folder = download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
options = webdriver.EdgeOptions()

if remote_browser_version:
Expand Down Expand Up @@ -679,7 +682,7 @@ def Open_Browser(dependency, window_size_X=None, window_size_Y=None, capability=
from selenium.webdriver.opera.options import Options
options = Options()
options.add_argument("--zeuz_pid_finder")
download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
initial_download_folder = download_dir = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
options.add_experimental_option("prefs", {"download.default_directory": download_dir}) # This does not work
# options.binary_location = r'C:\Users\ASUS\AppData\Local\Programs\Opera\launcher.exe' # This might be needed

Expand Down Expand Up @@ -1575,7 +1578,7 @@ def Click_and_Download(data_set):
else:
ext = ".crdownload"
while True:
ld = os.listdir(ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config))
ld = os.listdir(initial_download_folder)
if all([len(ld) > 0, all([not i.endswith(".tmp") and not i.endswith(ext) for i in ld])]):
CommonUtil.ExecLog(sModuleInfo, "Download Finished in %s seconds" % round(time.perf_counter()-s, 2), 1)
break
Expand All @@ -1588,7 +1591,7 @@ def Click_and_Download(data_set):

if filepath:
# filepath = Shared_Resources.Get_Shared_Variables("zeuz_download_folder")
source_folder = ConfigModule.get_config_value("sectionOne", "initial_download_folder", temp_config)
source_folder = initial_download_folder
all_source_dir = [os.path.join(source_folder, f) for f in os.listdir(source_folder) if os.path.isfile(os.path.join(source_folder, f))]
new_path = filepath
for file_to_be_moved in all_source_dir:
Expand Down Expand Up @@ -2301,9 +2304,9 @@ def Extract_Table_Data(step_data):
return "zeuz_failed"

variable_value = []
all_tr = Element.find_elements_by_tag_name("tr")
all_tr = Element.find_elements("tag name", "tr")
for row in all_tr:
all_td = row.find_elements_by_tag_name("td")
all_td = row.find_elements("tag name", "td")
td_data = []
for td in all_td:
text_data = td.get_property("textContent").strip()
Expand Down Expand Up @@ -3127,7 +3130,7 @@ def validate_table_row_size(data_set):

all_rows = []
if table_type == "html": # HTML type table
all_rows = table.find_elements_by_tag_name(
all_rows = table.find_elements("tag name",
"tr"
) # Get element list for all rows
elif table_type == "css": # CSS type table
Expand Down Expand Up @@ -3192,17 +3195,17 @@ def validate_table_column_size(data_set):
all_rows = []
all_cols = []
if table_type == "html": # HTML type table
all_rows = table.find_elements_by_tag_name(
all_rows = table.find_elements("tag name",
"tr"
) # Get element list for all rows
if len(all_rows) > 0:
all_cols = all_rows[0].find_elements_by_tag_name(
all_cols = all_rows[0].find_elements("tag name",
"td"
) # Get element list for all columns in this row
if (
len(all_cols) == 0
): # No <TD> type columns, so check if there were header type columns, and use those instead
all_cols = all_rows[0].find_elements_by_tag_name(
all_cols = all_rows[0].find_elements("tag name",
"th"
) # Get element list for all header columns in this row
elif table_type == "css": # CSS type table
Expand Down Expand Up @@ -3255,17 +3258,17 @@ def get_webpage_table_html(data_set, ignore_rows=[], ignore_cols=[], retain_case

master_text_table = {}
table_row = 0
tr_list = table.find_elements_by_tag_name("tr") # Get element list for all rows
tr_list = table.find_elements("tag name", "tr") # Get element list for all rows
for tr in tr_list: # For each row element
table_row += 1
table_col = 0
td_list = tr.find_elements_by_tag_name(
td_list = tr.find_elements("tag name",
"td"
) # Get element list for all columns in this row
if (
len(td_list) == 0
): # No <TD> type columns, so check if there were header type columns, and use those instead
td_list = tr.find_elements_by_tag_name(
td_list = tr.find_elements("tag name",
"th"
) # Get element list for all header columns in this row
for td in td_list: # For each column element
Expand Down