6
6
import datetime
7
7
from typing import List , Optional , Type
8
8
9
+ import json
10
+
9
11
from .connector import Kibana
12
+ from . import defenitions
10
13
11
14
DEFAULT_PAGE_SIZE = 10
12
15
@@ -20,10 +23,12 @@ def id(self):
20
23
return self .get (self .ID_FIELD )
21
24
22
25
@classmethod
23
- def bulk_create (cls , resources : list ):
26
+ def bulk_create_legacy (cls , resources : list ):
24
27
for r in resources :
25
28
assert isinstance (r , cls )
26
29
30
+ # _bulk_create is being deprecated. Leave for backwards compat only
31
+ # the new API would be import with multiple rules within an ndjson request
27
32
responses = Kibana .current ().post (cls .BASE_URI + "/_bulk_create" , data = resources )
28
33
return [cls (r ) for r in responses ]
29
34
@@ -127,6 +132,64 @@ def find_elastic(cls, **params):
127
132
params = cls ._add_internal_filter (True , params )
128
133
return cls .find (** params )
129
134
135
+ @classmethod
136
+ def bulk_action (cls , action : defenitions .RuleBulkActions , rule_ids : Optional [List [str ]] = None ,
137
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ,
138
+ edit : Optional [defenitions .RuleBulkEditAction ] = None ,
139
+ duplicate : Optional [defenitions .RuleBulkDuplicateAction ] = None ) -> (dict , List ['RuleResource' ]):
140
+ assert not (rule_ids and query ), 'Cannot provide both rule_ids and query'
141
+
142
+ if action == 'edit' :
143
+ assert edit , 'edit action requires edit object'
144
+
145
+ params = dict (dry_run = dry_run )
146
+ data = dict (query = query , ids = rule_ids , action = action , edit = edit , duplicate = duplicate )
147
+ response = Kibana .current ().post (cls .BASE_URI + "/_bulk_action" , params = params , data = data )
148
+
149
+ results = response ['attributes' ]['results' ]
150
+ result_ids = [r ['rule_id' ] for r in results ['updated' ]]
151
+ result_ids .extend ([r ['rule_id' ] for r in results ['created' ]])
152
+ rule_resources = cls .export_rules (result_ids )
153
+ return response , rule_resources
154
+
155
+ @classmethod
156
+ def bulk_enable (cls , rule_ids : Optional [List [str ]] = None ,
157
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ) -> (dict , List ['RuleResource' ]):
158
+ """Bulk enable rules using _bulk_action."""
159
+ return cls .bulk_action ("enable" , rule_ids = rule_ids , query = query , dry_run = dry_run )
160
+
161
+ @classmethod
162
+ def bulk_disable (cls , rule_ids : Optional [List [str ]] = None ,
163
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ) -> (dict , List ['RuleResource' ]):
164
+ """Bulk disable rules using _bulk_action."""
165
+ return cls .bulk_action ("disable" , rule_ids = rule_ids , query = query , dry_run = dry_run )
166
+
167
+ @classmethod
168
+ def bulk_delete (cls ,rule_ids : Optional [List [str ]] = None ,
169
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ) -> (dict , List ['RuleResource' ]):
170
+ """Bulk delete rules using _bulk_action."""
171
+ return cls .bulk_action ("delete" , rule_ids = rule_ids , query = query , dry_run = dry_run )
172
+
173
+ @classmethod
174
+ def bulk_duplicate (cls , rule_ids : Optional [List [str ]] = None ,
175
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ,
176
+ duplicate : Optional [defenitions .RuleBulkDuplicateAction ] = None ) -> (dict , List ['RuleResource' ]):
177
+ """Bulk duplicate rules using _bulk_action."""
178
+ return cls .bulk_action ("duplicate" , rule_ids = rule_ids , query = query , dry_run = dry_run , duplicate = duplicate )
179
+
180
+ @classmethod
181
+ def bulk_export (cls , rule_ids : Optional [List [str ]] = None ,
182
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ) -> (dict , List ['RuleResource' ]):
183
+ """Bulk export rules using _bulk_action."""
184
+ return cls .bulk_action ("export" , rule_ids = rule_ids , query = query , dry_run = dry_run )
185
+
186
+ @classmethod
187
+ def bulk_edit (cls , rule_ids : Optional [List [str ]] = None ,
188
+ query : Optional [str ] = None , dry_run : Optional [bool ] = False ,
189
+ edit : Optional [defenitions .RuleBulkEditAction ] = None ) -> (dict , List ['RuleResource' ]):
190
+ """Bulk edit rules using _bulk_action."""
191
+ return cls .bulk_action ("edit" , rule_ids = rule_ids , query = query , dry_run = dry_run , edit = edit )
192
+
130
193
def put (self ):
131
194
# id and rule_id are mutually exclusive
132
195
rule_id = self .get ("rule_id" )
@@ -142,6 +205,41 @@ def put(self):
142
205
143
206
raise
144
207
208
+ @classmethod
209
+ def import_rules (cls , rules : List [dict ], overwrite : bool = False , overwrite_exceptions : bool = False ,
210
+ overwrite_action_connectors : bool = False ) -> (dict , List ['RuleResource' ]):
211
+ """Import a list of rules into Kibana using the _import API and return the response and successful imports."""
212
+ url = f'{ cls .BASE_URI } /_import'
213
+ params = dict (
214
+ overwrite = overwrite ,
215
+ overwrite_exceptions = overwrite_exceptions ,
216
+ overwrite_action_connectors = overwrite_action_connectors
217
+ )
218
+ rule_ids = [r ['rule_id' ] for r in rules ]
219
+ headers , raw_data = Kibana .ndjson_file_data_prep (rules , "import.ndjson" )
220
+ response = Kibana .current ().post (url , headers = headers , params = params , raw_data = raw_data )
221
+ errors = response .get ("errors" , [])
222
+ error_rule_ids = [e ['rule_id' ] for e in errors ]
223
+
224
+ # successful rule_ids are not returned, so they must be implicitly inferred from errored rule_ids
225
+ successful_rule_ids = [r for r in rule_ids if r not in error_rule_ids ]
226
+ rule_resources = cls .export_rules (successful_rule_ids )
227
+ return response , rule_resources
228
+
229
+ @classmethod
230
+ def export_rules (cls , rule_ids : Optional [List [str ]] = None ,
231
+ exclude_export_details : bool = False ) -> List ['RuleResource' ]:
232
+ """Export a list of rules from Kibana using the _export API."""
233
+ url = f'{ cls .BASE_URI } /_export'
234
+
235
+ if rule_ids :
236
+ rule_ids = {'objects' : [{'rule_id' : r } for r in rule_ids ]}
237
+
238
+ params = dict (exclude_export_details = exclude_export_details )
239
+ response = Kibana .current ().post (url , params = params , data = rule_ids , raw = True )
240
+ data = [json .loads (r ) for r in response .text .splitlines ()]
241
+ return [cls (r ) for r in data ]
242
+
145
243
146
244
class Signal (BaseResource ):
147
245
BASE_URI = "/api/detection_engine/signals"
0 commit comments