16
16
from infrahub .pools .tasks import validate_schema_number_pools
17
17
from infrahub .services import InfrahubServices
18
18
from infrahub .services .adapters .cache .redis import RedisCache
19
+ from tests .helpers .schema .snow import SNOW_INCIDENT , SNOW_REQUEST , SNOW_TASK
19
20
from tests .helpers .test_app import TestInfrahubApp
20
21
21
22
if TYPE_CHECKING :
43
44
}
44
45
45
46
46
- class TestMutationGenerator (TestInfrahubApp ):
47
+ class TestAttributeNumberPoolLifecycle (TestInfrahubApp ):
47
48
@pytest .fixture (scope = "class" )
48
49
async def initial_dataset (
49
50
self ,
@@ -56,18 +57,24 @@ async def initial_dataset(
56
57
) -> None :
57
58
bus_simulator .service ._cache = RedisCache ()
58
59
59
- schema = {"version" : "1.0" , "nodes" : [node_schema_definition ]}
60
+ schema = {
61
+ "version" : "1.0" ,
62
+ "generics" : [SNOW_TASK .to_dict ()],
63
+ "nodes" : [node_schema_definition , SNOW_INCIDENT .to_dict (), SNOW_REQUEST .to_dict ()],
64
+ }
60
65
schema_load_response = await client .schema .load (schemas = [schema ], wait_until_converged = True )
61
66
assert not schema_load_response .errors
62
67
63
- async def test_numberpool_assignment (
68
+ async def test_numberpool_assignment_direct_node (
64
69
self , db : InfrahubDatabase , initial_dataset : None , client : InfrahubClient , default_branch
65
70
) -> None :
66
- assert True
71
+ service = await InfrahubServices .new (database = db )
72
+ context = InfrahubContext .init (
73
+ branch = default_branch ,
74
+ account = AccountSession (auth_type = AuthType .NONE , authenticated = False , account_id = "" ),
75
+ )
67
76
68
- incident_1 = await Node .init (db = db , schema = "TestNumberAttribute" )
69
- await incident_1 .new (db = db , name = "The first thing" )
70
- await incident_1 .save (db = db )
77
+ await validate_schema_number_pools (branch_name = registry .default_branch , context = context , service = service )
71
78
72
79
test_schema = registry .schema .get_node_schema (name = "TestNumberAttribute" )
73
80
test_attribute = test_schema .get_attribute (name = "assigned_number" )
@@ -81,6 +88,10 @@ async def test_numberpool_assignment(
81
88
)
82
89
assert number_pool_pre .start_range .value == 10
83
90
91
+ incident_1 = await Node .init (db = db , schema = "TestNumberAttribute" )
92
+ await incident_1 .new (db = db , name = "The first thing" )
93
+ await incident_1 .save (db = db )
94
+
84
95
initial_branches = get_branches_with_schema_number_pool (
85
96
kind = "TestNumberAttribute" , attribute_name = "assigned_number"
86
97
)
@@ -94,6 +105,18 @@ async def test_numberpool_assignment(
94
105
after_purge = get_branches_with_schema_number_pool (kind = "TestNumberAttribute" , attribute_name = "assigned_number" )
95
106
assert after_purge == []
96
107
108
+ await validate_schema_number_pools (branch_name = registry .default_branch , context = context , service = service )
109
+
110
+ with pytest .raises (NodeNotFoundError ):
111
+ await NodeManager .find_object (
112
+ db = db ,
113
+ kind = CoreNumberPool ,
114
+ id = number_pool_id ,
115
+ )
116
+
117
+ async def test_numberpool_assignment_from_generic (
118
+ self , db : InfrahubDatabase , initial_dataset : None , client : InfrahubClient , default_branch
119
+ ) -> None :
97
120
service = await InfrahubServices .new (database = db )
98
121
context = InfrahubContext .init (
99
122
branch = default_branch ,
@@ -102,6 +125,40 @@ async def test_numberpool_assignment(
102
125
103
126
await validate_schema_number_pools (branch_name = registry .default_branch , context = context , service = service )
104
127
128
+ test_schema = registry .schema .get_node_schema (name = "SnowIncident" )
129
+ test_attribute = test_schema .get_attribute (name = "number" )
130
+ assert isinstance (test_attribute .parameters , NumberPoolParameters )
131
+ number_pool_id = test_attribute .parameters .number_pool_id
132
+
133
+ number_pool_pre = await NodeManager .find_object (
134
+ db = db ,
135
+ kind = CoreNumberPool ,
136
+ id = number_pool_id ,
137
+ )
138
+ assert number_pool_pre .start_range .value == 1
139
+
140
+ incident_1 = await Node .init (db = db , schema = "SnowIncident" )
141
+ await incident_1 .new (db = db , title = "The very first incident" )
142
+ await incident_1 .save (db = db )
143
+
144
+ initial_branches = get_branches_with_schema_number_pool (kind = "SnowTask" , attribute_name = "number" )
145
+
146
+ assert initial_branches == ["main" ]
147
+ snow_task = SNOW_TASK .to_dict ()
148
+ snow_task ["state" ] = "absent"
149
+ snow_request = SNOW_REQUEST .to_dict ()
150
+ snow_request ["state" ] = "absent"
151
+ snow_incident = SNOW_INCIDENT .to_dict ()
152
+ snow_incident ["state" ] = "absent"
153
+ schema = {"version" : "1.0" , "generics" : [snow_task ], "nodes" : [snow_request , snow_incident ]}
154
+ schema_load_response = await client .schema .load (schemas = [schema ], wait_until_converged = True )
155
+ assert not schema_load_response .errors
156
+
157
+ after_purge = get_branches_with_schema_number_pool (kind = "SnowTask" , attribute_name = "number" )
158
+ assert after_purge == []
159
+
160
+ await validate_schema_number_pools (branch_name = registry .default_branch , context = context , service = service )
161
+
105
162
with pytest .raises (NodeNotFoundError ):
106
163
await NodeManager .find_object (
107
164
db = db ,
0 commit comments