@@ -32,15 +32,15 @@ The **BulkApi** trait provides high-performance bulk operations for Elasticsearc
3232
3333``` scala
3434// Data flow pipeline
35- Iterator [ D ]
35+ Source [ D , NotUsed ]
3636 -> Transform to JSON
3737 -> Create BulkItem
3838 -> Apply settings (refresh, replicas)
3939 -> Group into batches
40- -> Execute bulk requests ( parallel)
40+ -> Execute bulk requests in parallel
4141 -> Extract results
4242 -> Retry failures (automatic)
43- -> Return Either [Failed , Success ]
43+ -> Return Either [FailedDocument , SuccessfulDocument ]
4444```
4545
4646### Operation Types
@@ -87,11 +87,10 @@ case class BulkOptions(
8787)
8888
8989// Usage
90- implicit val bulkOptions = BulkOptions (
90+ implicit val bulkOptions : BulkOptions = BulkOptions (
9191 defaultIndex = " products" ,
9292 maxBulkSize = 5000 ,
93- balance = 8 ,
94- retryOnFailure = true
93+ balance = 8
9594)
9695```
9796
@@ -131,7 +130,7 @@ Executes bulk operations with detailed success/failure reporting and metrics.
131130
132131``` scala
133132def bulkWithResult [D ](
134- items : Iterator [ D ],
133+ items : Source [ D , NotUsed ],
135134 toDocument : D => String ,
136135 indexKey : Option [String ] = None ,
137136 idKey : Option [String ] = None ,
@@ -145,7 +144,7 @@ def bulkWithResult[D](
145144```
146145
147146** Parameters:**
148- - ` items ` - Iterator of documents to process
147+ - ` items ` - Source of documents to process
149148- ` toDocument ` - Function to convert document to JSON string
150149- ` indexKey ` - Optional field name containing index name
151150- ` idKey ` - Optional field name containing document ID
@@ -215,7 +214,7 @@ implicit val bulkOptions: BulkOptions = BulkOptions(
215214case class Product (id : String , name : String , price : Double , category : String )
216215
217216// Basic bulk indexing
218- val products : Iterator [Product ] = getProducts() // Large dataset
217+ val products : Source [Product , NotUsed ] = getProducts() // Large dataset
219218
220219val toJson : Product => String = product => s """
221220{
@@ -280,19 +279,24 @@ client.bulkWithResult(
280279}
281280
282281// Bulk delete
283- val idsToDelete : Iterator [String ] = getObsoleteProductIds()
282+ val obsoleteProducts : Source [Product , NotUsed ] = client.scrollAs[Product ](
283+ """
284+ |SELECT uuid AS id, name, price, category, outdated AS obsolete FROM products WHERE outdated = true
285+ |""" .stripMargin
286+ )
287+ val idsToDelete : Source [String , NotUsed ] = obsoleteProducts.map(_._1.id)
284288
285289client.bulkWithResult(
286- items = idsToDelete.map(id => Map ( " id " -> id)) ,
287- toDocument = doc => s """ {"id": " ${doc( " id " )} "} """ ,
290+ items = idsToDelete,
291+ toDocument = id => s """ {"id": " $id "} """ ,
288292 idKey = Some (" id" ),
289293 delete = Some (true )
290294)
291295
292296// Date-based index suffixing
293297case class LogEntry (id : String , message : String , timestamp : String )
294298
295- val logs : Iterator [LogEntry ] = getLogEntries()
299+ val logs : Source [LogEntry , NotUsed ] = getLogEntries()
296300
297301client.bulkWithResult(
298302 items = logs,
@@ -360,7 +364,7 @@ Returns an Akka Streams Source that emits real-time results for each document.
360364
361365``` scala
362366def bulkSource [D ](
363- items : Iterator [ D ],
367+ items : Source [ D , NotUsed ],
364368 toDocument : D => String ,
365369 idKey : Option [String ] = None ,
366370 suffixDateKey : Option [String ] = None ,
@@ -506,7 +510,7 @@ Legacy synchronous bulk method. **Use `bulkWithResult` instead.**
506510``` scala
507511@ deprecated(" Use bulkWithResult for better error handling" )
508512def bulk [D ](
509- items : Iterator [ D ],
513+ items : Source [ D , NotUsed ],
510514 toDocument : D => String ,
511515 idKey : Option [String ] = None ,
512516 suffixDateKey : Option [String ] = None ,
@@ -701,7 +705,7 @@ case class LogEntry(
701705 message : String
702706)
703707
704- val logs : Iterator [LogEntry ] = streamLogs()
708+ val logs : Source [LogEntry , NotUsed ] = streamLogs()
705709
706710implicit val logOptions : BulkOptions = BulkOptions (
707711 defaultIndex = " logs" , // Base index
@@ -731,7 +735,7 @@ client.bulkWithResult(
731735``` scala
732736case class ProductUpdate (id : String , price : Double , stock : Int )
733737
734- val updates : Iterator [ProductUpdate ] = getProductUpdates()
738+ val updates : Source [ProductUpdate , NotUsed ] = getProductUpdates()
735739
736740client.bulkWithResult(
737741 items = updates,
@@ -752,11 +756,11 @@ client.bulkWithResult(
752756### Batch Deletion
753757
754758``` scala
755- val obsoleteIds : Iterator [String ] = findObsoleteDocuments()
759+ val obsoleteIds : Source [String , NotUsed ] = findObsoleteDocuments()
756760
757761client.bulkWithResult(
758- items = obsoleteIds.map(id => Map ( " id " -> id)) ,
759- toDocument = doc => s """ {"id": " ${doc( " id " )} "} """ ,
762+ items = obsoleteIds,
763+ toDocument = id => s """ {"id": " $id "} """ ,
760764 idKey = Some (" id" ),
761765 delete = Some (true )
762766).foreach { result =>
@@ -955,23 +959,23 @@ client.bulkSource(
955959
956960``` scala
957961// ✅ Good - balanced batch size
958- implicit val options = BulkOptions (
962+ implicit val options : BulkOptions = BulkOptions (
959963 defaultIndex = " products" ,
960964 maxBulkSize = 1000 // Good for most use cases
961965)
962966
963967// ❌ Too small - overhead
964- implicit val tooSmall = BulkOptions (maxBulkSize = 10 )
968+ implicit val tooSmall : BulkOptions = BulkOptions (maxBulkSize = 10 )
965969
966970// ❌ Too large - memory issues
967- implicit val tooLarge = BulkOptions (maxBulkSize = 100000 )
971+ implicit val tooLarge : BulkOptions = BulkOptions (maxBulkSize = 100000 )
968972```
969973
970974** 2. Disable Refresh for Large Bulks**
971975
972976``` scala
973977// ✅ Good - disable refresh during bulk
974- implicit val options = BulkOptions (
978+ implicit val options : BulkOptions = BulkOptions (
975979 defaultIndex = " products" ,
976980 disableRefresh = true
977981)
@@ -1021,13 +1025,13 @@ client.bulkWithResult(items, toJson, Some("id"), callbacks = callbacks)
10211025
10221026``` scala
10231027// Small cluster (1-3 nodes)
1024- implicit val smallCluster = BulkOptions (balance = 2 )
1028+ implicit val smallCluster : BulkOptions = BulkOptions (balance = 2 )
10251029
10261030// Medium cluster (4-10 nodes)
1027- implicit val mediumCluster = BulkOptions (balance = 4 )
1031+ implicit val mediumCluster : BulkOptions = BulkOptions (balance = 4 )
10281032
10291033// Large cluster (10+ nodes)
1030- implicit val largeCluster = BulkOptions (balance = 8 )
1034+ implicit val largeCluster : BulkOptions = BulkOptions (balance = 8 )
10311035```
10321036
10331037---
@@ -1280,7 +1284,7 @@ def testPerformanceMetrics()(implicit system: ActorSystem): Future[Unit] = {
12801284``` scala
12811285case class Document (id : String , index : String , data : String )
12821286
1283- val multiIndexDocs : Iterator [Document ] = getDocuments()
1287+ val multiIndexDocs : Source [Document , NotUsed ] = getDocuments()
12841288
12851289// Custom transformation to handle multiple indices
12861290client.bulkWithResult(
@@ -1307,7 +1311,7 @@ client.bulkWithResult(
13071311
13081312``` scala
13091313def bulkWithCondition [D ](
1310- items : Iterator [ D ],
1314+ items : Source [ D , NotUsed ],
13111315 toDocument : D => String ,
13121316 condition : D => Boolean
13131317)(implicit bulkOptions : BulkOptions , system : ActorSystem ): Future [BulkResult ] = {
@@ -1333,7 +1337,7 @@ bulkWithCondition(
13331337
13341338``` scala
13351339def bulkWithTransformation [D , T ](
1336- items : Iterator [ D ],
1340+ items : Source [ D , NotUsed ],
13371341 transform : D => T ,
13381342 toDocument : T => String
13391343)(implicit bulkOptions : BulkOptions , system : ActorSystem ): Future [BulkResult ] = {
@@ -1377,7 +1381,7 @@ bulkWithTransformation(
13771381
13781382``` scala
13791383def bulkWithExternalEnrichment [D ](
1380- items : Iterator [ D ],
1384+ items : Source [ D , NotUsed ],
13811385 enrichmentApi : D => Future [D ],
13821386 toDocument : D => String
13831387)(implicit
@@ -1407,7 +1411,7 @@ def bulkWithExternalEnrichment[D](
14071411
14081412``` scala
14091413def bulkWithDeduplication [D ](
1410- items : Iterator [ D ],
1414+ items : Source [ D , NotUsed ],
14111415 getId : D => String ,
14121416 toDocument : D => String
14131417)(implicit bulkOptions : BulkOptions , system : ActorSystem ): Future [BulkResult ] = {
@@ -1441,20 +1445,20 @@ def bulkWithDeduplication[D](
14411445
14421446``` scala
14431447// Problem: Large batches causing OOM
1444- implicit val problematic = BulkOptions (maxBulkSize = 100000 )
1448+ implicit val problematic : BulkOptions = BulkOptions (maxBulkSize = 100000 )
14451449
14461450// Solution: Reduce batch size
1447- implicit val fixed = BulkOptions (maxBulkSize = 1000 )
1451+ implicit val fixed : BulkOptions = BulkOptions (maxBulkSize = 1000 )
14481452```
14491453
14501454** 2. Slow Performance**
14511455
14521456``` scala
14531457// Problem: Sequential processing
1454- implicit val slow = BulkOptions (balance = 1 )
1458+ implicit val slow : BulkOptions = BulkOptions (balance = 1 )
14551459
14561460// Solution: Increase parallelism
1457- implicit val fast = BulkOptions (
1461+ implicit val fast : BulkOptions = BulkOptions (
14581462 balance = 8 ,
14591463 maxBulkSize = 5000 ,
14601464 disableRefresh = true
@@ -1465,7 +1469,7 @@ implicit val fast = BulkOptions(
14651469
14661470``` scala
14671471// Problem: Retrying non-retryable errors
1468- implicit val wasteful = BulkOptions (
1472+ implicit val wasteful : BulkOptions = BulkOptions (
14691473 retryOnFailure = true ,
14701474 maxRetries = 10
14711475)
@@ -1484,10 +1488,10 @@ result.foreach { r =>
14841488
14851489``` scala
14861490// Problem: Slow indexing due to frequent refresh
1487- implicit val slow = BulkOptions (disableRefresh = false )
1491+ implicit val slow : BulkOptions = BulkOptions (disableRefresh = false )
14881492
14891493// Solution: Disable refresh during bulk, refresh once at end
1490- implicit val fast = BulkOptions (disableRefresh = true )
1494+ implicit val fast : BulkOptions = BulkOptions (disableRefresh = true )
14911495
14921496client.bulkWithResult(items, toJson, Some (" id" )).foreach { result =>
14931497 result.indices.foreach(client.refresh) // Manual refresh
@@ -1541,7 +1545,7 @@ client.bulkWithResult(
15411545
15421546``` scala
15431547// High-performance bulk indexing
1544- implicit val options = BulkOptions (
1548+ implicit val options : BulkOptions = BulkOptions (
15451549 defaultIndex = " products" ,
15461550 maxBulkSize = 5000 ,
15471551 balance = 8 ,
0 commit comments