Skip to content

Commit 64d82ec

Browse files
committed
implement wait for a refresh to happen after deleting
1 parent 26d8e15 commit 64d82ec

File tree

11 files changed

+114
-52
lines changed

11 files changed

+114
-52
lines changed

core/src/main/scala/app/softnetwork/elastic/client/DeleteApi.scala

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.util.{Failure, Success}
2828

2929
/** Delete Management API
3030
*/
31-
trait DeleteApi extends ElasticClientHelpers { _: RefreshApi =>
31+
trait DeleteApi extends ElasticClientHelpers { _: SettingsApi =>
3232

3333
// ========================================================================
3434
// PUBLIC METHODS
@@ -39,10 +39,12 @@ trait DeleteApi extends ElasticClientHelpers { _: RefreshApi =>
3939
* - the id of the entity to delete
4040
* @param index
4141
* - the name of the index to delete the entity from
42+
* @param wait
43+
* - whether to wait for a refresh to happen after deleting (default is false)
4244
* @return
4345
* true if the entity was deleted successfully, false otherwise
4446
*/
45-
def delete(id: String, index: String): ElasticResult[Boolean] = {
47+
def delete(id: String, index: String, wait: Boolean): ElasticResult[Boolean] = {
4648
validateIndexName(index) match {
4749
case Some(error) =>
4850
return ElasticResult.failure(
@@ -58,10 +60,12 @@ trait DeleteApi extends ElasticClientHelpers { _: RefreshApi =>
5860

5961
logger.debug(s"Deleting document with id '$id' from index '$index'")
6062

61-
executeDelete(index, id) match {
62-
case _ @ElasticSuccess(true) =>
63+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1)
64+
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
65+
executeDelete(index, id, waitEnabled) match {
66+
case success @ ElasticSuccess(true) =>
6367
logger.info(s"✅ Successfully deleted document with id '$id' from index '$index'")
64-
this.refresh(index)
68+
success
6569
case success @ ElasticSuccess(_) =>
6670
logger.info(s"✅ Document with id '$id' not found in index '$index'")
6771
success
@@ -78,10 +82,12 @@ trait DeleteApi extends ElasticClientHelpers { _: RefreshApi =>
7882
* - the id of the entity to delete
7983
* @param index
8084
* - the name of the index to delete the entity from
85+
* @param wait
86+
* - whether to wait for a refresh to happen after deleting (default is false)
8187
* @return
8288
* a Future that completes with true if the entity was deleted successfully, false otherwise
8389
*/
84-
def deleteAsync(id: String, index: String)(implicit
90+
def deleteAsync(id: String, index: String, wait: Boolean)(implicit
8591
ec: ExecutionContext
8692
): Future[ElasticResult[Boolean]] = {
8793
validateIndexName(index) match {
@@ -101,13 +107,15 @@ trait DeleteApi extends ElasticClientHelpers { _: RefreshApi =>
101107

102108
logger.debug(s"Deleting asynchronously document with id '$id' from index '$index'")
103109

110+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1)
111+
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
104112
val promise: Promise[ElasticResult[Boolean]] = Promise()
105-
executeDeleteAsync(index, id) onComplete {
113+
executeDeleteAsync(index, id, waitEnabled) onComplete {
106114
case Success(s) =>
107115
s match {
108-
case _ @ElasticSuccess(true) =>
116+
case success @ ElasticSuccess(true) =>
109117
logger.info(s"✅ Successfully deleted document with id '$id' from index '$index'")
110-
promise.success(this.refresh(index))
118+
promise.success(success)
111119
case success @ ElasticSuccess(_) =>
112120
logger.warn(s"❌ Document with id '$id' in index '$index' not deleted")
113121
promise.success(success)
@@ -132,9 +140,13 @@ trait DeleteApi extends ElasticClientHelpers { _: RefreshApi =>
132140
// METHODS TO IMPLEMENT
133141
// ========================================================================
134142

135-
private[client] def executeDelete(index: String, id: String): ElasticResult[Boolean]
143+
private[client] def executeDelete(
144+
index: String,
145+
id: String,
146+
wait: Boolean
147+
): ElasticResult[Boolean]
136148

137-
private[client] def executeDeleteAsync(index: String, id: String)(implicit
149+
private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
138150
ec: ExecutionContext
139151
): Future[ElasticResult[Boolean]]
140152
}

core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -724,36 +724,41 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
724724
* - the id of the entity to delete
725725
* @param index
726726
* - the name of the index to delete the entity from
727+
* @param wait
728+
* - whether to wait for a refresh to happen after deleting (default is false)
727729
* @return
728730
* true if the entity was deleted successfully, false otherwise
729731
*/
730-
override def delete(id: String, index: String): ElasticResult[Boolean] =
731-
delegate.delete(id, index)
732+
override def delete(id: String, index: String, wait: Boolean): ElasticResult[Boolean] =
733+
delegate.delete(id, index, wait)
732734

733735
/** Delete an entity from the given index asynchronously.
734736
*
735737
* @param id
736738
* - the id of the entity to delete
737739
* @param index
738740
* - the name of the index to delete the entity from
741+
* @param wait
742+
* - whether to wait for a refresh to happen after deleting (default is false)
739743
* @return
740744
* a Future that completes with true if the entity was deleted successfully, false otherwise
741745
*/
742-
override def deleteAsync(id: String, index: String)(implicit
746+
override def deleteAsync(id: String, index: String, wait: Boolean)(implicit
743747
ec: ExecutionContext
744748
): Future[ElasticResult[Boolean]] =
745-
delegate.deleteAsync(id, index)
749+
delegate.deleteAsync(id, index, wait)
746750

747751
override private[client] def executeDelete(
748752
index: String,
749-
id: String
753+
id: String,
754+
wait: Boolean
750755
): ElasticResult[Boolean] =
751-
delegate.executeDelete(index, id)
756+
delegate.executeDelete(index, id, wait)
752757

753-
override private[client] def executeDeleteAsync(index: String, id: String)(implicit
758+
override private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
754759
ec: ExecutionContext
755760
): Future[ElasticResult[Boolean]] =
756-
delegate.executeDeleteAsync(index, id)
761+
delegate.executeDeleteAsync(index, id, wait)
757762

758763
// ==================== GetApi ====================
759764

core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -507,17 +507,17 @@ class MetricsElasticClient(
507507

508508
// ==================== DeleteApi ====================
509509

510-
override def delete(id: String, index: String): ElasticResult[Boolean] = {
510+
override def delete(id: String, index: String, wait: Boolean): ElasticResult[Boolean] = {
511511
measureResult("delete", Some(index)) {
512-
delegate.delete(id, index)
512+
delegate.delete(id, index, wait)
513513
}
514514
}
515515

516-
override def deleteAsync(id: String, index: String)(implicit
516+
override def deleteAsync(id: String, index: String, wait: Boolean)(implicit
517517
ec: ExecutionContext
518518
): Future[ElasticResult[Boolean]] = {
519519
measureAsync("deleteAsync", Some(index)) {
520-
delegate.deleteAsync(id, index)
520+
delegate.deleteAsync(id, index, wait)
521521
}
522522
}
523523

es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestDeleteApi.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package app.softnetwork.elastic.client.jest
1818

19-
import app.softnetwork.elastic.client.{DeleteApi, RefreshApi}
19+
import app.softnetwork.elastic.client.{DeleteApi, SettingsApi}
2020
import app.softnetwork.elastic.client.result.ElasticResult
2121
import io.searchbox.core.Delete
2222

@@ -27,30 +27,42 @@ import scala.concurrent.{ExecutionContext, Future}
2727
* [[DeleteApi]] for generic API documentation
2828
*/
2929
trait JestDeleteApi extends DeleteApi with JestClientHelpers {
30-
_: RefreshApi with JestClientCompanion =>
30+
_: SettingsApi with JestClientCompanion =>
3131

3232
/** Delete an entity from the given index.
3333
* @see
3434
* [[DeleteApi.delete]]
3535
*/
36-
private[client] def executeDelete(index: String, id: String): ElasticResult[Boolean] =
36+
override private[client] def executeDelete(
37+
index: String,
38+
id: String,
39+
wait: Boolean
40+
): ElasticResult[Boolean] =
3741
executeJestBooleanAction(
3842
operation = "delete",
3943
index = Some(index),
4044
retryable = true
4145
) {
42-
new Delete.Builder(id).index(index).`type`("_doc").build()
46+
new Delete.Builder(id)
47+
.index(index)
48+
.`type`("_doc")
49+
.setParameter("refresh", if (wait) "wait_for" else "false")
50+
.build()
4351
}
4452

45-
override private[client] def executeDeleteAsync(index: String, id: String)(implicit
53+
override private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
4654
ec: ExecutionContext
4755
): Future[ElasticResult[Boolean]] =
4856
executeAsyncJestAction(
4957
operation = "delete",
5058
index = Some(index),
5159
retryable = true
5260
) {
53-
new Delete.Builder(id).index(index).`type`("_doc").build()
61+
new Delete.Builder(id)
62+
.index(index)
63+
.`type`("_doc")
64+
.setParameter("refresh", if (wait) "wait_for" else "false")
65+
.build()
5466
}(result => result.isSucceeded)
5567

5668
}

es6/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -593,23 +593,27 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientHel
593593
* [[DeleteApi]] for generic API documentation
594594
*/
595595
trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientHelpers {
596-
_: RefreshApi with RestHighLevelClientCompanion =>
596+
_: SettingsApi with RestHighLevelClientCompanion =>
597597

598598
override private[client] def executeDelete(
599599
index: String,
600-
id: String
600+
id: String,
601+
wait: Boolean
601602
): result.ElasticResult[Boolean] =
602603
executeRestAction[DeleteRequest, DeleteResponse, Boolean](
603604
operation = "delete",
604605
index = Some(index),
605606
retryable = false
606607
)(
607608
request = new DeleteRequest(index, "_doc", id)
609+
.setRefreshPolicy(
610+
if (wait) WriteRequest.RefreshPolicy.WAIT_UNTIL else WriteRequest.RefreshPolicy.NONE
611+
)
608612
)(
609613
executor = req => apply().delete(req, RequestOptions.DEFAULT)
610614
)(response => response.status().getStatus < 400)
611615

612-
override private[client] def executeDeleteAsync(index: String, id: String)(implicit
616+
override private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
613617
ec: ExecutionContext
614618
): Future[result.ElasticResult[Boolean]] =
615619
executeAsyncRestAction[DeleteRequest, DeleteResponse, Boolean](
@@ -618,6 +622,9 @@ trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientHel
618622
retryable = false
619623
)(
620624
request = new DeleteRequest(index, "_doc", id)
625+
.setRefreshPolicy(
626+
if (wait) WriteRequest.RefreshPolicy.WAIT_UNTIL else WriteRequest.RefreshPolicy.NONE
627+
)
621628
)(
622629
executor = (req, listener) => apply().deleteAsync(req, RequestOptions.DEFAULT, listener)
623630
)(response => response.status().getStatus < 400)

es7/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -599,23 +599,27 @@ trait RestHighLevelClientUpdateApi extends UpdateApi with RestHighLevelClientHel
599599
* [[DeleteApi]] for generic API documentation
600600
*/
601601
trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientHelpers {
602-
_: RefreshApi with RestHighLevelClientCompanion =>
602+
_: SettingsApi with RestHighLevelClientCompanion =>
603603

604604
override private[client] def executeDelete(
605605
index: String,
606-
id: String
606+
id: String,
607+
wait: Boolean
607608
): result.ElasticResult[Boolean] =
608609
executeRestAction[DeleteRequest, DeleteResponse, Boolean](
609610
operation = "delete",
610611
index = Some(index),
611612
retryable = false
612613
)(
613614
request = new DeleteRequest(index, id)
615+
.setRefreshPolicy(
616+
if (wait) WriteRequest.RefreshPolicy.WAIT_UNTIL else WriteRequest.RefreshPolicy.NONE
617+
)
614618
)(
615619
executor = req => apply().delete(req, RequestOptions.DEFAULT)
616620
)(response => response.status().getStatus < 400)
617621

618-
override private[client] def executeDeleteAsync(index: String, id: String)(implicit
622+
override private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
619623
ec: ExecutionContext
620624
): Future[result.ElasticResult[Boolean]] =
621625
executeAsyncRestAction[DeleteRequest, DeleteResponse, Boolean](
@@ -624,6 +628,9 @@ trait RestHighLevelClientDeleteApi extends DeleteApi with RestHighLevelClientHel
624628
retryable = false
625629
)(
626630
request = new DeleteRequest(index, id)
631+
.setRefreshPolicy(
632+
if (wait) WriteRequest.RefreshPolicy.WAIT_UNTIL else WriteRequest.RefreshPolicy.NONE
633+
)
627634
)(
628635
executor = (req, listener) => apply().deleteAsync(req, RequestOptions.DEFAULT, listener)
629636
)(response => response.status().getStatus < 400)

es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -642,11 +642,12 @@ trait JavaClientUpdateApi extends UpdateApi with JavaClientHelpers {
642642
* [[DeleteApi]] for delete operations
643643
*/
644644
trait JavaClientDeleteApi extends DeleteApi with JavaClientHelpers {
645-
_: RefreshApi with JavaClientCompanion =>
645+
_: SettingsApi with JavaClientCompanion =>
646646

647647
override private[client] def executeDelete(
648648
index: String,
649-
id: String
649+
id: String,
650+
wait: Boolean
650651
): result.ElasticResult[Boolean] =
651652
executeJavaBooleanAction(
652653
operation = "delete",
@@ -655,21 +656,29 @@ trait JavaClientDeleteApi extends DeleteApi with JavaClientHelpers {
655656
)(
656657
apply()
657658
.delete(
658-
new DeleteRequest.Builder().index(index).id(id).build()
659+
new DeleteRequest.Builder()
660+
.index(index)
661+
.id(id)
662+
.refresh(if (wait) Refresh.WaitFor else Refresh.False)
663+
.build()
659664
)
660665
)(
661666
_.shards()
662667
.failed()
663668
.intValue() == 0
664669
)
665670

666-
override private[client] def executeDeleteAsync(index: String, id: String)(implicit
671+
override private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
667672
ec: ExecutionContext
668673
): Future[result.ElasticResult[Boolean]] =
669674
fromCompletableFuture(
670675
async()
671676
.delete(
672-
new DeleteRequest.Builder().index(index).id(id).build()
677+
new DeleteRequest.Builder()
678+
.index(index)
679+
.id(id)
680+
.refresh(if (wait) Refresh.WaitFor else Refresh.False)
681+
.build()
673682
)
674683
).map { response =>
675684
if (response.shards().failed().intValue() == 0) {

es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -639,11 +639,12 @@ trait JavaClientUpdateApi extends UpdateApi with JavaClientHelpers {
639639
* [[DeleteApi]] for delete operations
640640
*/
641641
trait JavaClientDeleteApi extends DeleteApi with JavaClientHelpers {
642-
_: RefreshApi with JavaClientCompanion =>
642+
_: SettingsApi with JavaClientCompanion =>
643643

644644
override private[client] def executeDelete(
645645
index: String,
646-
id: String
646+
id: String,
647+
wait: Boolean
647648
): result.ElasticResult[Boolean] =
648649
executeJavaBooleanAction(
649650
operation = "delete",
@@ -652,21 +653,29 @@ trait JavaClientDeleteApi extends DeleteApi with JavaClientHelpers {
652653
)(
653654
apply()
654655
.delete(
655-
new DeleteRequest.Builder().index(index).id(id).build()
656+
new DeleteRequest.Builder()
657+
.index(index)
658+
.id(id)
659+
.refresh(if (wait) Refresh.WaitFor else Refresh.False)
660+
.build()
656661
)
657662
)(
658663
_.shards()
659664
.failed()
660665
.intValue() == 0
661666
)
662667

663-
override private[client] def executeDeleteAsync(index: String, id: String)(implicit
668+
override private[client] def executeDeleteAsync(index: String, id: String, wait: Boolean)(implicit
664669
ec: ExecutionContext
665670
): Future[result.ElasticResult[Boolean]] =
666671
fromCompletableFuture(
667672
async()
668673
.delete(
669-
new DeleteRequest.Builder().index(index).id(id).build()
674+
new DeleteRequest.Builder()
675+
.index(index)
676+
.id(id)
677+
.refresh(if (wait) Refresh.WaitFor else Refresh.False)
678+
.build()
670679
)
671680
).map { response =>
672681
if (response.shards().failed().intValue() == 0) {

0 commit comments

Comments
 (0)