-
Notifications
You must be signed in to change notification settings - Fork 186
/
Copy pathtest_index_management.py
117 lines (98 loc) · 3.77 KB
/
test_index_management.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
from opensearchpy.exceptions import NotFoundError
from .. import OpenSearchTestCase
class TestIndexManagementPlugin(OpenSearchTestCase):
POLICY_NAME = "example-policy"
POLICY_CONTENT = {
"policy": {
"description": "hot warm delete workflow",
"default_state": "hot",
"schema_version": 1,
"states": [
{
"name": "hot",
"actions": [
{
"rollover": {
"min_index_age": "1d",
}
}
],
"transitions": [{"state_name": "warm"}],
},
{
"name": "warm",
"actions": [{"replica_count": {"number_of_replicas": 5}}],
"transitions": [
{
"state_name": "delete",
"conditions": {"min_index_age": "30d"},
}
],
},
{
"name": "delete",
"actions": [
{
"notification": {
"destination": {"chime": {"url": "<URL>"}},
"message_template": {
"source": "The index {{ctx.index}} is being deleted"
},
}
},
{"delete": {}},
],
},
],
"ism_template": {"index_patterns": ["log*"], "priority": 100},
}
}
def test_create_policy(self) -> None:
# Test to create policy
response = self.client.index_management.put_policy(
policy=self.POLICY_NAME, body=self.POLICY_CONTENT
)
self.assertNotIn("errors", response)
self.assertIn("_id", response)
def test_get_policy(self) -> None:
# Create a policy
self.test_create_policy()
# Test to fetch the policy
response = self.client.index_management.get_policy(self.POLICY_NAME)
self.assertNotIn("errors", response)
self.assertIn("_id", response)
self.assertEqual(response["_id"], self.POLICY_NAME)
def test_update_policy(self) -> None:
# Create a policy
self.test_create_policy()
# Fetch the policy
response = self.client.index_management.get_policy(self.POLICY_NAME)
params = {
"if_seq_no": response["_seq_no"],
"if_primary_term": response["_primary_term"],
}
policy_content = self.POLICY_CONTENT.copy()
policy_content["policy"]["description"] = "example workflow"
# Test to update policy
response = self.client.index_management.put_policy(
policy=self.POLICY_NAME, body=policy_content, params=params
)
self.assertNotIn("errors", response)
self.assertIn("_id", response)
def test_delete_policy(self) -> None:
# Create a policy
self.test_create_policy()
# Test to delete the policy
response = self.client.index_management.delete_policy(self.POLICY_NAME)
self.assertNotIn("errors", response)
# Try fetching the policy
with self.assertRaises(NotFoundError):
response = self.client.index_management.get_policy(self.POLICY_NAME)