@@ -14,9 +14,10 @@ import { Datatable } from 'src/plugins/expressions/server';
1414import { ReportingConfig } from '../../..' ;
1515import {
1616 cellHasFormulas ,
17+ ES_SEARCH_STRATEGY ,
18+ IndexPattern ,
1719 ISearchSource ,
1820 ISearchStartSearchSource ,
19- ES_SEARCH_STRATEGY ,
2021 SearchFieldValue ,
2122 SearchSourceFields ,
2223 tabifyDocs ,
@@ -79,38 +80,44 @@ export class CsvGenerator {
7980 private stream : Writable
8081 ) { }
8182
82- private async scroll (
83- pitId : string ,
84- settings : CsvExportSettings ,
83+ private async scan (
84+ index : IndexPattern ,
8585 searchSource : ISearchSource ,
86- searchAfter ?: estypes . SearchSortResults
86+ settings : CsvExportSettings
8787 ) {
88- this . logger . debug ( `executing scroll request` ) ;
8988 const { scroll : scrollSettings , includeFrozen } = settings ;
9089 const searchBody = searchSource . getSearchRequestBody ( ) ;
91- const searchParams : estypes . SearchRequest = {
92- body : {
93- ... searchBody ,
94- pit : {
95- id : pitId ,
96- keep_alive : scrollSettings . duration ,
97- } ,
98- search_after : searchAfter ,
90+ this . logger . debug ( `executing search request` ) ;
91+ const searchParams = {
92+ params : {
93+ body : searchBody ,
94+ index : index . title ,
95+ scroll : scrollSettings . duration ,
96+ size : scrollSettings . size ,
97+ ignore_throttled : ! includeFrozen ,
9998 } ,
100- size : scrollSettings . size ,
101- ignore_throttled : ! includeFrozen ,
102- ignore_unavailable : undefined ,
10399 } ;
104100
105101 const results = (
106- await this . clients . data
107- . search ( { params : searchParams } , { strategy : ES_SEARCH_STRATEGY } )
108- . toPromise ( )
102+ await this . clients . data . search ( searchParams , { strategy : ES_SEARCH_STRATEGY } ) . toPromise ( )
109103 ) . rawResponse as estypes . SearchResponse < unknown > ;
110104
111105 return results ;
112106 }
113107
108+ private async scroll ( scrollId : string , scrollSettings : CsvExportSettings [ 'scroll' ] ) {
109+ this . logger . debug ( `executing scroll request` ) ;
110+ const results = (
111+ await this . clients . es . asCurrentUser . scroll ( {
112+ body : {
113+ scroll : scrollSettings . duration ,
114+ scroll_id : scrollId ,
115+ } ,
116+ } )
117+ ) . body ;
118+ return results ;
119+ }
120+
114121 /*
115122 * Load field formats for each field in the list
116123 */
@@ -287,15 +294,14 @@ export class CsvGenerator {
287294 throw new Error ( `The search must have a reference to an index pattern!` ) ;
288295 }
289296
290- const { maxSizeBytes, bom, escapeFormulaValues } = settings ;
297+ const { maxSizeBytes, bom, escapeFormulaValues, scroll : scrollSettings } = settings ;
291298
292299 const builder = new MaxSizeStringBuilder ( this . stream , byteSizeValueToNumber ( maxSizeBytes ) , bom ) ;
293300 const warnings : string [ ] = [ ] ;
294301 let first = true ;
295302 let currentRecord = - 1 ;
296303 let totalRecords = 0 ;
297- let pitId : string | undefined ;
298- let searchAfter : undefined | estypes . SearchSortResults ;
304+ let scrollId : string | undefined ;
299305
300306 // apply timezone from the job to all date field formatters
301307 try {
@@ -320,34 +326,25 @@ export class CsvGenerator {
320326 if ( this . cancellationToken . isCancelled ( ) ) {
321327 break ;
322328 }
323- if ( pitId == null ) {
324- pitId = (
325- await this . clients . es . asCurrentUser . openPointInTime ( {
326- index : index . title ,
327- keep_alive : settings . scroll . duration ,
328- } )
329- ) . body . id ;
330- }
331-
332- const results = await this . scroll ( pitId , settings , searchSource , searchAfter ) ;
333- if ( results . hits ?. total != null && ! totalRecords ) {
334- totalRecords =
335- typeof results . hits . total === 'number' ? results . hits . total : results . hits . total . value ;
336- this . logger . debug ( `Total search results: ${ totalRecords } ` ) ;
329+ let results : estypes . SearchResponse < unknown > | undefined ;
330+ if ( scrollId == null ) {
331+ // open a scroll cursor in Elasticsearch
332+ results = await this . scan ( index , searchSource , settings ) ;
333+ scrollId = results ?. _scroll_id ;
334+ if ( results . hits ?. total != null ) {
335+ totalRecords = results . hits . total as number ;
336+ this . logger . debug ( `Total search results: ${ totalRecords } ` ) ;
337+ }
338+ } else {
339+ // use the scroll cursor in Elasticsearch
340+ results = await this . scroll ( scrollId , scrollSettings ) ;
337341 }
338342
339- pitId = results . pit_id ? results . pit_id : pitId ;
340-
341343 if ( ! results ) {
342344 this . logger . warning ( `Search results are undefined!` ) ;
343345 break ;
344346 }
345347
346- const totalHits = results . hits . hits . length ;
347- if ( totalHits ) {
348- searchAfter = results . hits . hits [ totalHits - 1 ] . sort ;
349- }
350-
351348 let table : Datatable | undefined ;
352349 try {
353350 table = tabifyDocs ( results , index , { shallow : true , meta : true } ) ;
@@ -393,16 +390,16 @@ export class CsvGenerator {
393390 throw JSON . stringify ( err . errBody . error ) ;
394391 }
395392 } finally {
396- // close PIT
397- if ( pitId ) {
398- this . logger . debug ( `executing close point-in-time request` ) ;
393+ // clear scrollID
394+ if ( scrollId ) {
395+ this . logger . debug ( `executing clearScroll request` ) ;
399396 try {
400- await this . clients . es . asCurrentUser . closePointInTime ( { body : { id : pitId } } ) ;
397+ await this . clients . es . asCurrentUser . clearScroll ( { body : { scroll_id : [ scrollId ] } } ) ;
401398 } catch ( err ) {
402399 this . logger . error ( err ) ;
403400 }
404401 } else {
405- this . logger . warn ( `No point-in-time to close !` ) ;
402+ this . logger . warn ( `No scrollId to clear !` ) ;
406403 }
407404 }
408405
0 commit comments