-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtest_clusterManage.py
1565 lines (1484 loc) · 74.1 KB
/
test_clusterManage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -- coding: utf-8 --
import allure
import pytest
import sys
import random
import numpy
import time
from common.getData import DoexcleByPandas
from common import commonFunction
from step import cluster_steps, platform_steps
from datetime import datetime
sys.path.append('../') # 将项目路径加到搜索路径中,使得自定义模块可以引用
@allure.feature('集群管理')
@pytest.mark.skipif(commonFunction.check_multi_cluster() is True, reason='多集群环境下不执行')
class TestCluster(object):
if commonFunction.check_multi_cluster() is True:
# 如果为单集群环境,则不会collect该class的所有用例。 __test__ = False
__test__ = False
else:
__test__ = True
# 从文件中读取用例信息
parametrize = DoexcleByPandas().get_data_from_yaml(filename='../data/cluster.yaml')
@allure.title('{title}') # 设置用例标题
# 将用例信息以参数化的方式传入测试方法
@pytest.mark.parametrize('id,url,params,data, story, title,method,severity,condition,except_result', parametrize)
def test_cluster(self, id, url, params, data, story, title, method, severity, condition, except_result):
'''
:param id: 用例编号
:param url: 用例请求的URL地址
:param data: 用例使用的请求数据
:param title: 用例标题
:param method: 用例的请求方式
:param severity: 用例优先级
:param condition: 用例的校验条件
:param except_result: 用例的预期结果
'''
allure.dynamic.story(story) # 动态生成模块
allure.dynamic.severity(severity) # 动态生成用例等级
commonFunction.request_resource(url, params, data, story, title, method, severity, condition, except_result)
@pytest.mark.run(order=1)
@allure.story("节点/集群节点")
@allure.title('为节点设置污点')
@allure.severity(allure.severity_level.CRITICAL)
def test_set_taints(self, node_name):
# 污点信息
taints = [{"key": "tester1", "value": "wx", "effect": "NoSchedule"}]
try:
# 获取节点的污点信息
r = cluster_steps.step_get_node_detail_info(node_name)
taints_old = r.json()['spec']['taints']
except Exception as e:
print(e)
print('节点:' + node_name + ' 无污点')
taints_old = []
# 为节点设置污点
cluster_steps.step_set_taints(node_name, taints)
# 获取节点的污点信息
r = cluster_steps.step_get_node_detail_info(node_name)
taints_actual = r.json()['spec']['taints']
# 验证污点设置成功
with pytest.assume:
assert taints == taints_actual
# 清空设置的污点
cluster_steps.step_set_taints(node_name=node_name, taints=taints_old)
@allure.story("节点/集群节点")
@allure.title('为节点添加标签')
@allure.severity(allure.severity_level.CRITICAL)
def test_add_labels(self, node_name):
# 获取节点的标签信息
re = cluster_steps.step_get_node_detail_info(node_name)
labels_old = re.json()['metadata']['labels'] # 用于删除添加的标签
labels = re.json()['metadata']['labels'] # 用于添加标签
# 添加标签的内容
labels['tester/label'] = 'wxx'
# 添加标签
cluster_steps.step_add_labels_for_node(node_name, labels)
# 获取编辑后节点的标签信息
r = cluster_steps.step_get_node_detail_info(node_name)
labels_actual = r.json()['metadata']['labels']
# 验证标签添加成功
with pytest.assume:
assert labels == labels_actual
# 删除添加的标签
labels_old['tester/label'] = None
cluster_steps.step_add_labels_for_node(node_name, labels_old)
@allure.story('节点/集群节点')
@allure.title('设置节点为停止调度, 然后设置为启用调度')
@allure.severity(allure.severity_level.CRITICAL)
def test_set_uncordon_node(self, node_name):
# 设置节点为停止调度
cluster_steps.step_cordon_node(node_name, True)
# 获取节点的调度状态
response = cluster_steps.step_get_node_detail_info(node_name)
cordon_status = response.json()['spec']['unschedulable']
# 验证节点调度状态为停止调度
with pytest.assume:
assert cordon_status == True
# 设置节点为启用调度
cluster_steps.step_cordon_node(node_name, False)
@allure.story('节点/集群节点')
@allure.title('查看节点的pod信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_pods(self, node_name):
# 查看节点的pod信息
response = cluster_steps.step_get_pod_info(node_name)
# 验证pod信息查询成功
assert response.status_code == 200
@allure.story('节点/集群节点')
@allure.title('查看节点的event信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_events(self, node_name):
# 查看节点的event信息
response = cluster_steps.step_get_event_of_node(node_name)
# 获取资源类型
kind = response.json()['kind']
# 验证event信息查询成功
assert kind == 'EventList'
@allure.story('节点/集群节点')
@allure.title('查看节点的监控信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_monitoring(self, node_name):
# 获取当前时间的10位时间戳
now_time = datetime.now()
now_timestamp = str(datetime.timestamp(now_time))[0:10]
# 获取10分钟之前的戳
before_timestamp = commonFunction.get_before_timestamp(now_time, 10)
# 查看最近十分钟的监控信息
r = cluster_steps.step_get_metrics_of_node(node_name=node_name, start_time=before_timestamp,
end_time=now_timestamp,
step='60s', times='10')
# 获取查询到的数据的结果类型
result_type = r.json()['results'][0]['data']['resultType']
# 验证查询到的数据的结果类型
assert result_type == 'matrix'
@allure.story('节点/集群节点')
@allure.title('查看节点的状态信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_status(self, node_name):
# 获取当前时间的10位时间戳
now_time = datetime.now()
now_timestamp = str(datetime.timestamp(now_time))[0:10]
# 获取20分钟之前的时间戳
before_timestamp = commonFunction.get_before_timestamp(now_time, 20)
# 查看节点的状态信息
response = cluster_steps.step_get_status_of_node(node_name=node_name, start_time=before_timestamp,
end_time=now_timestamp,
step='180s', times='20')
# 获取查询结果中的节点信息
node = response.json()['results'][0]['data']['result'][0]['metric']['node']
# 验证查询结果正确
assert node == node_name
@allure.story('节点/集群节点')
@allure.title('查询节点中不存在的pod')
@allure.severity(allure.severity_level.NORMAL)
def test_query_non_existent_pod(self, node_name):
# 查询不存在的pod
pod_name = 'non-existent'
re = cluster_steps.step_query_pod(node_name, pod_name)
# 获取查询结果
total_items = re.json()['totalItems']
# 验证查询结果
assert total_items == 0
@allure.story('节点/集群节点')
@allure.title('按名称精确查询节点中存在的pod')
@allure.severity(allure.severity_level.NORMAL)
def test_precise_query_existent_pod_in_node(self, node_name):
# 获取节点的任意一个pod的名称
re = cluster_steps.step_query_pod(node_name, '')
pod_name = re.json()['items'][0]['metadata']['name']
# 按名称精确查询存在的pod
re = cluster_steps.step_query_pod(node_name, pod_name)
# 获取查询到的pod名称
name_actual = re.json()['items'][0]['metadata']['name']
# 验证查询结果
assert name_actual == pod_name
@allure.story('节点/集群节点')
@allure.title('按名称模糊查询节点中存在的pod')
@allure.severity(allure.severity_level.NORMAL)
def test_fuzzy_query_existent_pod(self, node_name):
# 获取节点的任意一个pod的名称
re = cluster_steps.step_get_pod_info(node_name)
pod_name = re.json()['items'][0]['metadata']['name']
# 按名称模糊查询存在的pod
pod_fuzzy_name = pod_name[2:]
re = cluster_steps.step_query_pod(node_name, pod_fuzzy_name)
# 获取查询到的pod名称
name_actual = re.json()['items'][0]['metadata']['name']
# 验证查询结果
assert name_actual == pod_name
@allure.story('项目')
@allure.title('按名称精确查询集群中不存在的系统项目')
@allure.severity(allure.severity_level.NORMAL)
def test_precise_non_existent_system_project(self):
project_name = 'non-existent-project'
# 查询指定的集群的系统项目
response = cluster_steps.step_query_system_project(project_name)
# 获取查询结果
items = response.json()['items']
# 验证查询结果为空
assert items == []
@allure.story('项目')
@allure.title('按名称精确查询集群中不存在的用户项目')
@allure.severity(allure.severity_level.NORMAL)
def test_precise_non_existent_user_project(self):
project_name = 'non-existent-project'
# 查询指定的集群的用户项目
response = cluster_steps.step_query_user_system(project_name)
# 获取查询结果
items = response.json()['items']
# 验证查询结果为空
assert items == []
@allure.story('项目')
@allure.title('按名称精确查询集群中存在的用户项目')
@allure.severity(allure.severity_level.NORMAL)
def test_precise_query_existent_user_project(self):
# 创建用户项目
project_name = 'pro-' + str(commonFunction.get_random())
alias_name = ''
description = ''
cluster_steps.step_create_user_project(project_name, alias_name, description)
# 按名称精确查询用户项目
re = cluster_steps.step_query_user_system(project_name)
# 获取查询结果
project_name_actual = re.json()['items'][0]['metadata']['name']
# 验证查询结果
assert project_name_actual == project_name
@allure.story('项目')
@allure.title('按名称精确查询集群中存在的系统项目')
@allure.severity(allure.severity_level.NORMAL)
def test_precise_query_existent_system_project(self):
# 获取集群任意系统项目的名称
response = cluster_steps.step_query_system_project('')
project_name = response.json()['items'][0]['metadata']['name']
# 按名称精确查询系统项目
re = cluster_steps.step_query_system_project(project_name)
# 获取查询结果
project_name_actual = re.json()['items'][0]['metadata']['name']
# 验证查询结果
assert project_name_actual == project_name
@allure.story('项目')
@allure.title('按名称模糊查询集群中存在的系统项目')
@allure.severity(allure.severity_level.NORMAL)
def test_fuzzy_query_existent_system_project(self):
# 获取集群任意系统项目的名称
response = cluster_steps.step_query_system_project('')
project_name = response.json()['items'][0]['metadata']['name']
# 按名称精确查询系统项目
fuzzy_project_name = project_name[2:]
re = cluster_steps.step_query_system_project(fuzzy_project_name)
# 获取查询结果
project_name_actual = re.json()['items'][0]['metadata']['name']
# 验证查询结果
assert project_name_actual == project_name
@allure.story('项目')
@allure.title('按名称模糊查询集群中存在的用户项目')
@allure.severity(allure.severity_level.MINOR)
def test_precise_query_existent_user_project(self):
# 创建用户项目
project_name = 'pro-' + str(commonFunction.get_random())
alias_name = ''
description = ''
cluster_steps.step_create_user_project(project_name, alias_name, description)
# 按名称精确查询用户项目
re = cluster_steps.step_query_user_system(project_name[:-1])
# 获取查询结果
project_name_actual = re.json()['items'][0]['metadata']['name']
# 验证查询结果
with pytest.assume:
assert project_name_actual == project_name
# 删除项目
cluster_steps.step_delete_user_system(project_name)
@allure.story('项目')
@allure.title('查询集群中所有系统项目的详情信息,并验证其状态为活跃')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_system_project_state(self):
# 获取集群中系统项目的数量
response = cluster_steps.step_query_system_project('')
project_count = response.json()['totalItems']
# 获取集群任意系统项目的名称
for i in range(0, project_count):
project_name = response.json()['items'][i]['metadata']['name']
# 查询项目的详细信息
r = cluster_steps.step_get_project_detail(project_name)
# 获取项目的状态
status = r.json()['status']['phase']
# 验证项目运行状态为活跃
assert status == 'Active'
@allure.story('项目')
@allure.title('查询集群中任一系统项目的配额信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_one_system_project_quota(self):
# 获取集群中系统项目的数量
response = cluster_steps.step_query_system_project('')
project_count = response.json()['totalItems']
i = random.randint(0, project_count - 1)
# 获取集群任一系统项目的名称
project_name = response.json()['items'][i]['metadata']['name']
# 查询项目的配额信息
r = cluster_steps.step_get_project_quota(project_name)
# 获取项目的配额信息
used = r.json()['data']['used']
# 验证项目配额信息获取成功
assert 'count/pods' in used
@allure.story('项目')
@allure.title('查询集群中任一系统项目的LimitRanges信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_one_system_project_detail(self):
# 获取集群中系统项目的数量
response = cluster_steps.step_query_system_project('')
project_count = response.json()['totalItems']
i = numpy.random.randint(0, project_count)
# 获取集群任一系统项目的名称
project_name = response.json()['items'][i]['metadata']['name']
kind = ''
k = 0
while k < 60:
try:
# 查询项目的LimitRanges
r = cluster_steps.step_get_project_limit_ranges(project_name)
# 获取请求资源的kind
kind = r.json()['kind']
if kind:
break
except Exception as e:
print(e)
k += 1
time.sleep(1)
# 验证请求资源的kind为LimitRangeList
assert kind == 'LimitRangeList'
@allure.story('项目')
@allure.title('查询集群中任一系统项目的工作负载信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_system_project_workload(self):
# 获取集群中系统项目的数量
response = cluster_steps.step_query_system_project('')
project_count = response.json()['totalItems']
i = random.randint(0, project_count - 1)
# 获取集群任一系统项目的名称
res = cluster_steps.step_query_system_project('')
project_name = res.json()['items'][i]['metadata']['name']
# 查询项目的工作负载信息
re = cluster_steps.step_get_project_workload(project_name)
# 获取接口的响应数据
data = re.json()['data']
# 验证接口响应数据正确
data_except = ['daemonsets', 'deployments', 'jobs', 'persistentvolumeclaims', 'statefulsets']
assert data_except[random.randint(0, 4)] in data
@allure.story('项目')
@allure.title('验证集群中任一系统项目的pod运行正常')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_system_project_pods(self):
# 获取集群中系统项目的数量
response = cluster_steps.step_query_system_project('')
project_count = response.json()['totalItems']
# 获取集群中任一系统项目的名称
i = random.randint(0, project_count - 1)
project_name = response.json()['items'][i]['metadata']['name']
# 查询项目的pods信息
r = cluster_steps.step_get_pods_of_project(project_name=project_name)
# 获取pod的数量
pod_count = r.json()['totalItems']
# 获取所有pod的状态
for j in range(0, pod_count):
state = r.json()['items'][j]['status']['phase']
# 获取pod的名称
pod_name = r.json()['items'][j]['metadata']['name']
if state not in ['Running', 'Succeeded']:
print(pod_name)
# 验证pod的运行状态
with pytest.assume:
assert state in ['Running', 'Succeeded']
@allure.story('项目')
@allure.title('使用名称精确查询项目中存在的pod')
@allure.severity(allure.severity_level.NORMAL)
def test_precise_query_existent_pod(self):
project_name = 'kubesphere-system'
# 用于存放pod的名称
pod_names = []
# 查询项目kubesphere-system的所有的pod信息
re = cluster_steps.step_get_pods_of_project(project_name=project_name)
# 获取pod的数量
pod_count = re.json()['totalItems']
# 获取项目的pod的名称
for i in range(0, pod_count):
name = re.json()['items'][i]['metadata']['name']
pod_names.append(name)
# 使用pod的名称,精确查询存在的pod
r = cluster_steps.step_get_pods_of_project(project_name, 'name=' + pod_names[0])
# 获取查询结果中pod名称
pod_name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询结果正确
assert pod_name_actual == pod_names[0]
@allure.story('项目')
@allure.title('使用名称模糊查询项目中存在的pod')
@allure.severity(allure.severity_level.NORMAL)
def test_fuzzy_query_existent_pod(self):
# 用于存放pod的名称
pod_names = []
# 获取集群中任一系统项目的名称
response = cluster_steps.step_query_system_project('')
project_name = response.json()['items'][0]['metadata']['name']
# 查询项目的所有的pod信息
re = cluster_steps.step_get_pods_of_project(project_name=project_name)
# 获取pod的数量
pod_count = re.json()['totalItems']
# 获取项目的pod的名称
for i in range(0, pod_count):
name = re.json()['items'][i]['metadata']['name']
pod_names.append(name)
# 使用pod的名称,模糊查询存在的pod
r = cluster_steps.step_get_pods_of_project(project_name, 'name=' + pod_names[0][2:])
# 获取查询结果中pod名称
pod_name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询结果正确
assert pod_name_actual == pod_names[0]
@allure.story('项目')
@allure.title('使用名称查询项目中不存在的pod')
@allure.severity(allure.severity_level.NORMAL)
def test_fuzzy_query_existent_pod(self):
# 获取集群中任一系统项目的名称
response = cluster_steps.step_query_system_project('')
project_name = response.json()['items'][0]['metadata']['name']
# 使用pod的名称,模糊查询存在的pod
pod_name = str(commonFunction.get_random()) + 'test' + str(commonFunction.get_random())
r = cluster_steps.step_get_pods_of_project(project_name, 'name=' + pod_name)
# 获取查询结果中的pod数量
pod_count = r.json()['totalItems']
# 验证查询结果正确
assert pod_count == 0
@allure.story('项目')
@allure.title('创建用户项目,并验证项目创建成功')
@allure.severity(allure.severity_level.CRITICAL)
def test_create_user_system(self):
project_name = 'user-project' + str(commonFunction.get_random())
alias_name = 'test'
description = 'create user-system'
# 创建用户项目
cluster_steps.step_create_user_project(project_name, alias_name, description)
# 查询创建的项目,并获取其运行状态
r = cluster_steps.step_get_user_project_detail(project_name)
state = r.json()['status']['phase']
# 验证项目的状态为active
with pytest.assume:
assert state == 'Active'
# 删除创建的项目
cluster_steps.step_delete_user_system(project_name)
@allure.story('项目')
@allure.title('删除用户项目,并验证删除成功')
@allure.severity(allure.severity_level.CRITICAL)
def test_delete_user_system(self):
project_name = 'user-project' + str(commonFunction.get_random())
alias_name = 'test'
description = 'create user-system'
# 创建用户项目
cluster_steps.step_create_user_project(project_name, alias_name, description)
time.sleep(3)
# 查询项目
response = cluster_steps.step_get_user_project(project_name)
# 获取项目数量并验证项目创建成功
count = response.json()['totalItems']
with pytest.assume:
assert count == 1
# 删除用户项目
cluster_steps.step_delete_user_system(project_name)
# 查询被删除的项目
count_result = ''
i = 0
while i < 120:
r = cluster_steps.step_get_user_project(project_name)
count_result = r.json()['totalItems']
if count_result > 0:
time.sleep(10)
i += 10
else:
break
assert count_result == 0
@allure.story('应用负载')
@allure.title('查看集群任一系统项目的deployments,并验证其运行正常')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_deployments_of_cluster(self):
# 查询集群中所有的系统项目
response = cluster_steps.step_get_system_of_cluster()
system_count = response.json()['totalItems']
# 查询集群中任一系统项目的deployments
i = random.randint(0, system_count - 1)
system_name = response.json()['items'][i]['metadata']['name']
re = cluster_steps.step_get_resource_of_cluster_by_project('deployments', system_name)
# 获取集群deployments的数量
count = ''
try:
count = re.json()['totalItems']
except Exception as e:
print(e)
# 获取集群所有的deployments的状态
for j in range(0, count):
state = re.json()['items'][j]['status']['conditions'][0]['status']
# 验证deployment的状态为True
assert state == 'True'
@allure.story('应用负载')
@allure.title('查看集群任一的deployments的Revision Records')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_deployments_revision_records(self):
# 查询集群所有的deployments
response = cluster_steps.step_get_resource_of_cluster('deployments')
# 获取集群deployments的数量
count = response.json()['totalItems']
# 获取集群任一的deployments的labels
i = random.randint(0, count - 1)
label_selector = ''
project_name = response.json()['items'][i]['metadata']['namespace']
labels = response.json()['items'][i]['spec']['selector']['matchLabels']
# 将labels的key和value拼接为label_selector
keys = list(labels.keys())
values = list(labels.values())
for j in range(0, len(keys)):
label_selector += keys[j] + '=' + values[j] + ','
# 去掉最后的逗号
label_selector = label_selector[:-1]
# 查看deployments的revision records信息
r = cluster_steps.step_get_deployment_revision_records(project_name, label_selector)
# 获取请求的资源类型
kind = r.json()['kind']
# 验证请求的资源类型为ReplicaSetList
assert kind == 'ReplicaSetList'
@allure.story('应用负载')
@allure.title('查看集群任一系统项目的statefulSets,并验证其运行正常')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_statefulsets_of_cluster(self):
# 查询集群中所有的系统项目
response = cluster_steps.step_get_system_of_cluster()
system_count = response.json()['totalItems']
# 查询集群任一系统项目的statefulSets
i = random.randint(0, system_count - 1)
system_name = response.json()['items'][i]['metadata']['name']
re = cluster_steps.step_get_resource_of_cluster_by_project('statefulsets', system_name)
# 获取集群statefulSets的数量
count = re.json()['totalItems']
# 获取集群所有的statefulSets的副本数和ready的副本数
for j in range(0, count):
replica = re.json()['items'][j]['status']['replicas']
ready_replicas = re.json()['items'][j]['status']['readyReplicas']
# 验证每个statefulSets的ready的副本数=副本数
assert replica == ready_replicas
@allure.story('应用负载')
@allure.title('查看集群任一系统项目的daemonSets,并验证其运行正常')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_daemonsets_of_cluster(self):
# 查询集群中所有的系统项目
response = cluster_steps.step_get_system_of_cluster()
system_count = response.json()['totalItems']
# 查询集群任一系统项目的daemonSets
i = random.randint(0, system_count - 1)
system_name = response.json()['items'][i]['metadata']['name']
re = cluster_steps.step_get_resource_of_cluster_by_project('daemonsets', system_name)
# 获取集群daemonSets的数量
count = re.json()['totalItems']
# 获取集群所有的daemonSets的currentNumberScheduled和desiredNumberScheduled
for j in range(0, count):
current_number_scheduled = re.json()['items'][j]['status']['currentNumberScheduled']
desired_number_scheduled = re.json()['items'][j]['status']['desiredNumberScheduled']
# 验证每个daemonSets的currentNumberScheduled=desiredNumberScheduled
assert current_number_scheduled == desired_number_scheduled
@allure.story('应用负载')
@allure.title('查看集群任一的daemonSets的详情信息')
@allure.severity(allure.severity_level.CRITICAL)
def test_check_daemonsets_detail_of_cluster(self):
# 查询集群所有的daemonSets
response = cluster_steps.step_get_resource_of_cluster('daemonsets')
# 获取集群daemonSets的数量
count = response.json()['totalItems']
# 获取集群中任一的daemonSets的名称和所在的项目名称
i = random.randint(0, count - 1)
resource_name = response.json()['items'][i]['metadata']['name']
project_name = response.json()['items'][i]['metadata']['namespace']
# 查询daemonSets的详细信息
r = cluster_steps.step_get_app_workload_detail(project_name, 'daemonsets', resource_name)
# 验证信息查询成功
assert r.json()['kind'] == 'DaemonSet'
@allure.story('应用负载')
@allure.title('{title}')
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.parametrize('type, title',
[('statefulsets', '查看集群任一的statefulSets的Revision Records'),
('daemonsets', '查看集群任一的daemonSets的Revision Records')])
def test_check_app_workload_revision_records(self, type, title):
# 查询集群所有的daemonSets
response = cluster_steps.step_get_resource_of_cluster(type)
# 获取集群daemonSets的数量
count = response.json()['totalItems']
# 获取任一的daemonSets的label、project_name和daemonSets的名称
i = random.randint(0, count - 1)
labels = response.json()['items'][i]['metadata']['labels']
# 将labels的key和value拼接为label_selector
label_selector = ''
keys = list(labels.keys())
values = list(labels.values())
for j in range(0, len(keys)):
label_selector = ''
label_selector += keys[j] + '=' + values[j] + ','
# 去掉最后的逗号
label_selector = label_selector[:-1]
# 获取daemonSets的名称和所在的项目名称
project_name = response.json()['items'][i]['metadata']['namespace']
# 查看daemonSets的revision Records 信息
r = cluster_steps.step_get_app_workload_revision_records(project_name, label_selector)
# 获取请求的资源类型
kind = r.json()['kind']
# 验证请求的资源类型为ControllerRevisionList
assert kind == 'ControllerRevisionList'
@allure.story('应用负载')
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.parametrize('type, title',
[('jobs', '查看集群任一系统项目的jobs,并验证运行状态正常')])
def test_check_jobs_of_cluster(self, type, title):
# 查询集群中所有的系统项目
response = cluster_steps.step_get_system_of_cluster()
system_count = response.json()['totalItems']
# 查询集群任一系统项目的Jobs
i = random.randint(0, system_count - 1)
system_name = response.json()['items'][i]['metadata']['name']
re = cluster_steps.step_get_resource_of_cluster_by_project(type, system_name)
# 获取集群Jobs的数量
try:
count = re.json()['totalItems']
# 获取集群所有的Jobs的type
for j in range(0, count):
state = re.json()['items'][j]['status']['conditions'][0]['status']
# 验证运行状态正常
assert state == 'True'
except Exception as e:
print(e)
@allure.story('应用负载')
@allure.severity(allure.severity_level.CRITICAL)
@allure.title('{title}')
@pytest.mark.parametrize('type, title',
[('deployments', '查看集群任一的deployments的Monitoring'),
('statefulsets', '查看集群任一的statefulSets的Monitoring'),
('daemonsets', '查看集群任一的daemonSets的Monitoring'),
('pods', '查看集群任一pod的Monitoring'),
('services', '查看集群任一service的Monitoring')])
def test_check_app_workload_monitoring(self, type, title):
# 查询集群所有的daemonSets
response = cluster_steps.step_get_resource_of_cluster(type)
# 获取集群daemonSets的数量
count = response.json()['totalItems']
# 获取任一的daemonSets的project_name和daemonSets的名称
i = random.randint(0, count - 1)
project_name = response.json()['items'][i]['metadata']['namespace']
resource_name = response.json()['items'][i]['metadata']['name']
# 查看监控信息
r = cluster_steps.step_get_app_workload_monitoring(project_name, type, resource_name)
# 获取请求结果中监控数据的类型
result_type = r.json()['results'][0]['data']['resultType']
# 验证请求结果中监控数据的类型为vector
assert result_type == 'vector'
@allure.title('{title}')
@allure.severity(allure.severity_level.CRITICAL)
@pytest.mark.parametrize('story, type, title',
[('应用负载', 'deployments', '查看集群任一的deployments的event'),
('应用负载', 'statefulsets', '查看集群任一的statefulSets的event'),
('应用负载', 'daemonsets', '查看集群任一的daemonSets的event'),
('应用负载', 'jobs', '查看集群任一jobs的event'),
('应用负载', 'cronjobs', '查看集群任一cronjobs的event'),
('应用负载', 'pods', '查看集群任一pod的event'),
('应用负载', 'services', '查看集群任一service的event'),
('存储', 'persistentvolumeclaims', '查看集群任一pvc的event')
])
def test_check_app_workload_event(self, story, type, title):
allure.dynamic.story(story)
# 查询集群所有的资源
response = cluster_steps.step_get_resource_of_cluster(type)
try:
# 获取集群某一资源的数量
count = response.json()['totalItems']
# 获取任一资源的project_name,资源的名称和uid
i = random.randint(0, count - 1)
project_name = response.json()['items'][i]['metadata']['namespace']
resource_name = response.json()['items'][i]['metadata']['name']
resource_uid = response.json()['items'][i]['metadata']['uid']
# 查询daemonSets的event信息
r = cluster_steps.step_get_resource_event(project_name, type, resource_name, resource_uid)
# 获取请求结果的类型
kind = r.json()['kind']
# 验证请求结果的类型为EventList
assert kind == 'EventList'
except Exception as e:
print(type + ':' + str(e))
@allure.story('应用负载')
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('type, title',
[('deployments', '按名称精确查询存在的deployments'),
('statefulsets', '按名称精确查询存在的statefulSets'),
('daemonsets', '按名称精确查询存在的daemonSets'),
('jobs', '按名称精确查询存在的job'),
('cronjobs', '按名称精确查询存在的cronjob'),
('pods', '按名称精确查询存在的pod'),
('services', '按名称精确查询存在的service')])
def test_precise_query_app_workload_by_name(self, type, title):
# 获取集群中存在的资源的名称
response = cluster_steps.step_get_resource_of_cluster(type)
# 获取工作负载的数量
count = response.json()['totalItems']
# 如果数量大于0
if count > 0:
# 获取第一个资源的名称
name = response.json()['items'][0]['metadata']['name']
r = cluster_steps.step_get_resource_of_cluster(type, 'name=' + name)
# 获取查询结果的名称
name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询结果正确
assert name == name_actual
else:
assert response.status_code == 200
@allure.story('应用负载')
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('type, title',
[('deployments', '按名称模糊查询存在的deployments'),
('statefulsets', '按名称模糊查询存在的statefulSets'),
('daemonsets', '按名称模糊查询存在的daemonSets'),
('jobs', '按名称模糊查询存在的job'),
('cronjobs', '按名称模糊查询存在的cronjob'),
('pods', '按名称模糊查询存在的pod'),
('services', '按名称模糊查询存在的service')])
def test_fuzzy_query_app_workload_by_name(self, type, title):
# 获取集群中存在的资源的名称
response = cluster_steps.step_get_resource_of_cluster(type)
status = response.status_code
# 获取工作负载的数量
count = response.json()['totalItems']
# 如果数量大于0
if count > 0:
# 获取第一个资源的名称
name = response.json()['items'][0]['metadata']['name']
r = cluster_steps.step_get_resource_of_cluster(type, 'name=' + name[1:])
# 获取查询结果的名称
name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询结果正确
assert name == name_actual
else:
assert status == 200
@allure.story('应用负载')
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('type, title',
[('deployments', '按状态查询存在的deployments'),
('statefulsets', '按状态查询存在的statefulSets'),
('daemonsets', '按状态查询存在的daemonSets')])
def test_query_app_workload_by_status(self, type, title):
# 查询状态为running的资源
r = cluster_steps.step_get_resource_of_cluster(type, 'status=running')
# 获取资源的数量
count = r.json()['totalItems']
if count > 0:
# 获取资源的readyReplicas和replicas
for i in range(0, count):
if type == 'daemonsets':
ready_replicas = r.json()['items'][i]['status']['numberReady']
replicas = r.json()['items'][i]['status']['numberAvailable']
else:
ready_replicas = r.json()['items'][i]['status']['readyReplicas']
replicas = r.json()['items'][i]['status']['replicas']
# 验证ready_replicas=replicas,从而判断资源的状态为running
assert ready_replicas == replicas
else:
print('无状态为running的' + type)
@allure.story('应用负载')
@allure.title('按状态查询存在的jobs')
@allure.severity(allure.severity_level.NORMAL)
def test_query_app_jobs_by_status(self):
# 查询状态为completed的jobs
r = cluster_steps.step_get_resource_of_cluster('jobs', 'status=completed')
# 获取资源的数量
count = r.json()['totalItems']
# 获取资源的status
for i in range(0, count):
status = r.json()['items'][i]['status']
# 验证状态里面有完成时间,判断job状态为已完成
assert 'completionTime' in status
@allure.story('应用负载')
@allure.title('按状态查询存在的cronjobs')
@allure.severity(allure.severity_level.NORMAL)
def test_query_app_cronjobs_by_status(self):
# 查询状态为completed的cronjobs
r = cluster_steps.step_get_resource_of_cluster('cronjobs', 'status=running')
# 获取资源的数量
count = r.json()['totalItems']
# 获取资源的status
for i in range(0, count):
status = r.json()['items'][i]['spec']['suspend']
# false 为 Running ,true 为 Paused
assert status == False
@allure.story('应用负载')
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('type, title',
[('deployments', '按状态为运行中和名称查询存在的deployments'),
('statefulsets', '按状态为运行中和名称查询存在的statefulSets'),
('daemonsets', '按状态为运行中和名称查询存在的daemonSets')])
def test_query_app_workload_by_status_and_name(self, type, title):
# 查询集群中所有的资源
response = cluster_steps.step_get_resource_of_cluster(type)
# 获取资源的数量
count = response.json()['totalItems']
running_resource = []
ready_replicas = 0
# 获取任一状态为运行中的资源名称
i = random.randint(0, count - 1)
if type == 'daemonsets':
ready_replicas = response.json()['items'][i]['status']['numberReady']
replicas = response.json()['items'][i]['status']['numberAvailable']
else:
try:
ready_replicas = response.json()['items'][i]['status']['readyReplicas']
except Exception as e:
print(e)
replicas = response.json()['items'][i]['status']['replicas']
if ready_replicas == replicas:
running_resource.append(response.json()['items'][i]['metadata']['name'])
# 使用名称和状态查询资源
for name in running_resource:
r = cluster_steps.step_get_resource_of_cluster(type, 'name=' + name, 'status=running')
# 获取查询结果中的name
name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询的结果正确
assert name in name_actual
@pytest.mark.run(order=2)
@allure.story('应用负载')
@allure.title('获取集群所有的容器组,并验证其数量与从项目管理中获取的数量一致')
@allure.severity(allure.severity_level.CRITICAL)
def test_get_pods_of_cluster(self):
# 获取集群的容器组的数量
response = cluster_steps.step_get_pods_of_cluster()
count = response.json()['totalItems']
# 从项目管理处获取所有项目的名称
project_name = []
re = cluster_steps.step_get_project_of_cluster()
project_count = re.json()['totalItems']
for i in range(0, project_count):
name = re.json()['items'][i]['metadata']['name']
project_name.append(name)
# 获取每个项目的pod数量,并将其加和
pod_counts = 0
for project in project_name:
r = cluster_steps.step_get_pods_of_project(project)
pod_count = r.json()['totalItems']
pod_counts += pod_count
# 验证集群的容器数量等于每个项目的容器数之和
assert count == pod_counts
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('story, type, title',
[('配置中心', 'secrets', '按名称精确查询存在的密钥'),
('配置中心', 'configmaps', '按名称精确查询存在的配置'),
('配置中心', 'serviceaccounts', '按名称精确查询存在的服务账号'),
('CRDs', 'customresourcedefinitions', '按名称精确查询存在的CRD'),
('存储', 'persistentvolumeclaims', '按名称精确查询存在的存储卷'),
('存储', 'storageclasses', '按名称精确查询存在的存储类型')
])
def test_precise_query_resource_by_name(self, story, type, title):
allure.dynamic.story(story)
# 获取集群中存在的任一资源的名称
response = cluster_steps.step_get_resource_of_cluster(resource_type=type)
# 获取工作负载的数量
count = response.json()['totalItems']
# 如果数量大于0
if count > 0:
name = response.json()['items'][random.randint(0, count - 1)]['metadata']['name']
# 按名称精确查询存在的资源
r = cluster_steps.step_get_resource_of_cluster(type, 'name=' + name)
name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询结果正确
assert name in name_actual
else:
assert response.status_code == 200
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('story, type, title',
[('配置中心', 'secrets', '按名称模糊查询存在的密钥'),
('配置中心', 'configmaps', '按名称模糊查询存在的配置'),
('配置中心', 'serviceaccounts', '按名称模糊查询存在的服务账号'),
('CRDs', 'customresourcedefinitions', '按名称模糊查询存在的CRD'),
('存储', 'persistentvolumeclaims', '按名称模糊查询存在的存储卷'),
('存储', 'storageclasses', '按名称模糊查询存在的存储类型')
])
def test_fuzzy_query_resource_by_name(self, story, type, title):
allure.dynamic.story(story)
# 查看集群中的某一种资源
response = cluster_steps.step_get_resource_of_cluster(resource_type=type)
# 获取集群中某一种资源的数量
count = response.json()['totalItems']
# 获取集群中存在的任一资源的名称
name = response.json()['items'][random.randint(0, count - 1)]['metadata']['name']
fuzzy_name = name[1:]
# 按名称模糊查询存在的资源
r = cluster_steps.step_get_resource_of_cluster(type, 'name=' + fuzzy_name)
name_actual = r.json()['items'][0]['metadata']['name']
# 验证查询结果正确
assert name in name_actual
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('story, type, title',
[('配置中心', 'secrets', '按项目查询存在的密钥'),
('配置中心', 'configmaps', '按项目查询存在的配置'),
('配置中心', 'serviceaccounts', '按项目查询存在的服务账号'),
('存储', 'persistentvolumeclaims', '按项目查询存在的存储卷')
])
def test_query_configuration_by_project(self, story, type, title):
allure.dynamic.story(story)
# 查询项目为kube-system的所有资源
response = cluster_steps.step_get_resource_of_cluster_by_project(type=type, project_name='kubesphere-system')
# 获取资源数量
count = response.json()['totalItems']
# 遍历所有资源,验证资源的项目为kubesphere-system
if count > 0:
for i in range(0, count):
project = response.json()['items'][i]['metadata']['namespace']
assert project == 'kubesphere-system'
else:
print('项目kubesphere-system无' + type)
@allure.title('{title}')
@allure.severity(allure.severity_level.NORMAL)
@pytest.mark.parametrize('story, type, title',
[('配置中心', 'secrets', '按项目和名称查询存在的密钥'),
('配置中心', 'configmaps', '按项目和名称查询存在的配置'),
('配置中心', 'serviceaccounts', '按项目和名称查询存在的服务账号'),
('存储', 'persistentvolumeclaims', '按项目和名称查询存在的存储卷')
])
def test_query_configuration_by_project_and_name(self, story, type, title):
allure.dynamic.story(story)
# 查询项目为kube-system的所有资源
response = cluster_steps.step_get_resource_of_cluster_by_project(type=type, project_name='kubesphere-system')
# 获取资源数量
count = response.json()['totalItems']
if count > 0:
i = numpy.random.randint(0, count)
# 获取任一资源的名称
name = response.json()['items'][i]['metadata']['name']
# 按项目和名称查询资源
r = cluster_steps.step_get_resource_of_cluster_by_project(type, 'kubesphere-system', 'name=' + name)
# 在查询结果中获取资源名称和数量
names_actual = []
count_res = r.json()['totalItems']
if count_res > 0:
for k in range(count_res):