@@ -3904,7 +3904,8 @@ public function createDocument(string $collection, Document $document): Document
39043904 * @param string $collection
39053905 * @param array<Document> $documents
39063906 * @param int $batchSize
3907- * @param callable|null $onNext
3907+ * @param (callable(Document): void)|null $onNext
3908+ * @param (callable(Throwable): void)|null $onError
39083909 * @return int
39093910 * @throws AuthorizationException
39103911 * @throws StructureException
@@ -3916,6 +3917,7 @@ public function createDocuments(
39163917 array $ documents ,
39173918 int $ batchSize = self ::INSERT_BATCH_SIZE ,
39183919 ?callable $ onNext = null ,
3920+ ?callable $ onError = null ,
39193921 ): int {
39203922 if (!$ this ->adapter ->getSharedTables () && $ this ->adapter ->getTenantPerDocument ()) {
39213923 throw new DatabaseException ('Shared tables must be enabled if tenant per document is enabled. ' );
@@ -3992,7 +3994,13 @@ public function createDocuments(
39923994
39933995 $ document = $ this ->casting ($ collection , $ document );
39943996 $ document = $ this ->decode ($ collection , $ document );
3995- $ onNext && $ onNext ($ document );
3997+
3998+ try {
3999+ $ onNext && $ onNext ($ document );
4000+ } catch (\Throwable $ e ) {
4001+ $ onError ? $ onError ($ e ) : throw $ e ;
4002+ }
4003+
39964004 $ modified ++;
39974005 }
39984006 }
@@ -4551,8 +4559,8 @@ public function updateDocument(string $collection, string $id, Document $documen
45514559 * @param Document $updates
45524560 * @param array<Query> $queries
45534561 * @param int $batchSize
4554- * @param callable|null $onNext
4555- * @param callable|null $onError
4562+ * @param ( callable(Document $updated, Document $old): void) |null $onNext
4563+ * @param ( callable(Throwable): void) |null $onError
45564564 * @return int
45574565 * @throws AuthorizationException
45584566 * @throws ConflictException
@@ -4675,12 +4683,12 @@ public function updateDocuments(
46754683 break ;
46764684 }
46774685
4686+ $ old = array_map (fn ($ doc ) => clone $ doc , $ batch );
46784687 $ currentPermissions = $ updates ->getPermissions ();
46794688 sort ($ currentPermissions );
46804689
46814690 $ this ->withTransaction (function () use ($ collection , $ updates , &$ batch , $ currentPermissions ) {
46824691 foreach ($ batch as $ index => $ document ) {
4683-
46844692 $ skipPermissionsUpdate = true ;
46854693
46864694 if ($ updates ->offsetExists ('$permissions ' )) {
@@ -4725,13 +4733,12 @@ public function updateDocuments(
47254733 );
47264734 });
47274735
4728- foreach ($ batch as $ doc ) {
4736+ foreach ($ batch as $ index => $ doc ) {
47294737 $ doc ->removeAttribute ('$skipPermissionsUpdate ' );
4730-
47314738 $ this ->purgeCachedDocument ($ collection ->getId (), $ doc ->getId ());
47324739 $ doc = $ this ->decode ($ collection , $ doc );
47334740 try {
4734- $ onNext && $ onNext ($ doc );
4741+ $ onNext && $ onNext ($ doc, $ old [ $ index ] );
47354742 } catch (Throwable $ th ) {
47364743 $ onError ? $ onError ($ th ) : throw $ th ;
47374744 }
@@ -5147,28 +5154,62 @@ private function getJunctionCollection(Document $collection, Document $relatedCo
51475154 : '_ ' . $ relatedCollection ->getSequence () . '_ ' . $ collection ->getSequence ();
51485155 }
51495156
5157+ /**
5158+ * Create or update a document.
5159+ *
5160+ * @param string $collection
5161+ * @param Document $document
5162+ * @return Document
5163+ * @throws StructureException
5164+ * @throws Throwable
5165+ */
5166+ public function upsertDocument (
5167+ string $ collection ,
5168+ Document $ document ,
5169+ ): Document {
5170+ $ result = null ;
5171+
5172+ $ this ->upsertDocumentsWithIncrease (
5173+ $ collection ,
5174+ '' ,
5175+ [$ document ],
5176+ function (Document $ doc , ?Document $ _old = null ) use (&$ result ) {
5177+ $ result = $ doc ;
5178+ }
5179+ );
5180+
5181+ if ($ result === null ) {
5182+ // No-op (unchanged): return the current persisted doc
5183+ $ result = $ this ->getDocument ($ collection , $ document ->getId ());
5184+ }
5185+ return $ result ;
5186+ }
5187+
51505188 /**
51515189 * Create or update documents.
51525190 *
51535191 * @param string $collection
51545192 * @param array<Document> $documents
51555193 * @param int $batchSize
5156- * @param callable|null $onNext
5194+ * @param (callable(Document, ?Document): void)|null $onNext
5195+ * @param (callable(Throwable): void)|null $onError
51575196 * @return int
51585197 * @throws StructureException
51595198 * @throws \Throwable
51605199 */
5161- public function createOrUpdateDocuments (
5200+ public function upsertDocuments (
51625201 string $ collection ,
51635202 array $ documents ,
51645203 int $ batchSize = self ::INSERT_BATCH_SIZE ,
51655204 ?callable $ onNext = null ,
5205+ ?callable $ onError = null
51665206 ): int {
5167- return $ this ->createOrUpdateDocumentsWithIncrease (
5207+ return $ this ->upsertDocumentsWithIncrease (
51685208 $ collection ,
51695209 '' ,
51705210 $ documents ,
51715211 $ onNext ,
5212+ $ onError ,
51725213 $ batchSize
51735214 );
51745215 }
@@ -5179,18 +5220,20 @@ public function createOrUpdateDocuments(
51795220 * @param string $collection
51805221 * @param string $attribute
51815222 * @param array<Document> $documents
5182- * @param callable|null $onNext
5223+ * @param (callable(Document, ?Document): void)|null $onNext
5224+ * @param (callable(Throwable): void)|null $onError
51835225 * @param int $batchSize
51845226 * @return int
51855227 * @throws StructureException
51865228 * @throws \Throwable
51875229 * @throws Exception
51885230 */
5189- public function createOrUpdateDocumentsWithIncrease (
5231+ public function upsertDocumentsWithIncrease (
51905232 string $ collection ,
51915233 string $ attribute ,
51925234 array $ documents ,
51935235 ?callable $ onNext = null ,
5236+ ?callable $ onError = null ,
51945237 int $ batchSize = self ::INSERT_BATCH_SIZE
51955238 ): int {
51965239 if (empty ($ documents )) {
@@ -5352,7 +5395,7 @@ public function createOrUpdateDocumentsWithIncrease(
53525395 /**
53535396 * @var array<Change> $chunk
53545397 */
5355- $ batch = $ this ->withTransaction (fn () => Authorization::skip (fn () => $ this ->adapter ->createOrUpdateDocuments (
5398+ $ batch = $ this ->withTransaction (fn () => Authorization::skip (fn () => $ this ->adapter ->upsertDocuments (
53565399 $ collection ,
53575400 $ attribute ,
53585401 $ chunk
@@ -5368,7 +5411,7 @@ public function createOrUpdateDocumentsWithIncrease(
53685411 }
53695412 }
53705413
5371- foreach ($ batch as $ doc ) {
5414+ foreach ($ batch as $ index => $ doc ) {
53725415 if ($ this ->resolveRelationships ) {
53735416 $ doc = $ this ->silent (fn () => $ this ->populateDocumentRelationships ($ collection , $ doc ));
53745417 }
@@ -5383,7 +5426,13 @@ public function createOrUpdateDocumentsWithIncrease(
53835426 $ this ->purgeCachedDocument ($ collection ->getId (), $ doc ->getId ());
53845427 }
53855428
5386- $ onNext && $ onNext ($ doc );
5429+ $ old = $ chunk [$ index ]->getOld ();
5430+
5431+ try {
5432+ $ onNext && $ onNext ($ doc , $ old ->isEmpty () ? null : $ old );
5433+ } catch (\Throwable $ th ) {
5434+ $ onError ? $ onError ($ th ) : throw $ th ;
5435+ }
53875436 }
53885437 }
53895438
@@ -6050,8 +6099,8 @@ private function deleteCascade(Document $collection, Document $relatedCollection
60506099 * @param string $collection
60516100 * @param array<Query> $queries
60526101 * @param int $batchSize
6053- * @param callable|null $onNext
6054- * @param callable|null $onError
6102+ * @param ( callable(Document, Document): void) |null $onNext
6103+ * @param ( callable(Throwable): void) |null $onError
60556104 * @return int
60566105 * @throws AuthorizationException
60576106 * @throws DatabaseException
@@ -6143,6 +6192,7 @@ public function deleteDocuments(
61436192 break ;
61446193 }
61456194
6195+ $ old = array_map (fn ($ doc ) => clone $ doc , $ batch );
61466196 $ sequences = [];
61476197 $ permissionIds = [];
61486198
@@ -6179,7 +6229,7 @@ public function deleteDocuments(
61796229 );
61806230 });
61816231
6182- foreach ($ batch as $ document ) {
6232+ foreach ($ batch as $ index => $ document ) {
61836233 if ($ this ->getSharedTables () && $ this ->getTenantPerDocument ()) {
61846234 $ this ->withTenant ($ document ->getTenant (), function () use ($ collection , $ document ) {
61856235 $ this ->purgeCachedDocument ($ collection ->getId (), $ document ->getId ());
@@ -6188,7 +6238,7 @@ public function deleteDocuments(
61886238 $ this ->purgeCachedDocument ($ collection ->getId (), $ document ->getId ());
61896239 }
61906240 try {
6191- $ onNext && $ onNext ($ document );
6241+ $ onNext && $ onNext ($ document, $ old [ $ index ] );
61926242 } catch (Throwable $ th ) {
61936243 $ onError ? $ onError ($ th ) : throw $ th ;
61946244 }
0 commit comments