@@ -126,36 +126,44 @@ protected function _returnSearch($params, $source)
126126 }
127127
128128
129- public function processDistinct ($ column , $ wheres )
129+ public function processDistinct ($ wheres , $ options , $ columns , $ includeDocCount = false )
130130 {
131- $ col = $ column ;
132- if (is_array ($ column )) {
133- $ col = $ column [0 ];
134- }
135-
136-
137131 try {
138- $ params = $ this ->buildParams ($ this ->index , $ wheres );
139- $ params ['body ' ]['aggs ' ]['distinct_ ' .$ col ]['terms ' ] = [
140- 'field ' => $ col ,
141- 'size ' => $ this ->maxSize ,
142- ];
143- $ process = $ this ->client ->search ($ params );
132+ if ($ columns && !is_array ($ columns )) {
133+ $ columns = [$ columns ];
134+ }
135+ $ sort = $ options ['sort ' ] ?? [];
136+ $ skip = $ options ['skip ' ] ?? false ;
137+ $ limit = $ options ['limit ' ] ?? false ;
138+ unset($ options ['sort ' ]);
139+ unset($ options ['skip ' ]);
140+ unset($ options ['limit ' ]);
141+
142+ $ params = $ this ->buildParams ($ this ->index , $ wheres , $ options );
143+ $ params ['body ' ]['aggs ' ] = $ this ->createNestedAggs ($ columns , $ sort );
144+
145+
146+ // dd($params['body']['aggs']);
147+ $ response = $ this ->client ->search ($ params );
148+
149+
144150 $ data = [];
145- if (!empty ($ process ['aggregations ' ]['distinct_ ' .$ col ]['buckets ' ])) {
146- foreach ($ process ['aggregations ' ]['distinct_ ' .$ col ]['buckets ' ] as $ bucket ) {
147- $ data [] = $ bucket ['key ' ];
148- }
151+ if (!empty ($ response ['aggregations ' ])) {
152+ $ data = $ this ->_sanitizeDistinctResponse ($ response ['aggregations ' ], $ columns , $ includeDocCount );
153+ }
154+
155+ //process limit and skip from all results
156+ if ($ skip || $ limit ) {
157+ $ data = array_slice ($ data , $ skip , $ limit );
149158 }
150159
151- return $ this ->_return ($ data , $ process , $ params , $ this ->_queryTag (__FUNCTION__ ));
160+ return $ this ->_return ($ data , $ response , $ params , $ this ->_queryTag (__FUNCTION__ ));
152161 } catch (Exception $ e ) {
153162
154163 $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
155164 throw new Exception ($ result ->errorMessage );
156165 }
157166
158-
159167 }
160168
161169 /**
@@ -640,6 +648,146 @@ private function _matrixAggregate($wheres, $options, $columns)
640648 }
641649
642650 }
651+ //----------------------------------------------------------------------
652+ // Distinct Aggregates
653+ //----------------------------------------------------------------------
654+
655+ public function processDistinctAggregate ($ function , $ wheres , $ options , $ columns ): Results
656+ {
657+ return $ this ->{'_ ' .$ function .'DistinctAggregate ' }($ wheres , $ options , $ columns );
658+ }
659+
660+ private function _countDistinctAggregate ($ wheres , $ options , $ columns ): Results
661+ {
662+ try {
663+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
664+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
665+ $ count = count ($ process ->data );
666+
667+ return $ this ->_return ($ count , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
668+ } catch (Exception $ e ) {
669+
670+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
671+ throw new Exception ($ result ->errorMessage );
672+ }
673+
674+ }
675+
676+
677+ private function _minDistinctAggregate ($ wheres , $ options , $ columns ): Results
678+ {
679+ try {
680+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
681+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
682+
683+ $ min = 0 ;
684+ $ hasBeenSet = false ;
685+ if (!empty ($ process ->data )) {
686+ foreach ($ process ->data as $ datum ) {
687+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
688+ if (!$ hasBeenSet ) {
689+ $ min = $ datum [$ columns [0 ]];
690+ $ hasBeenSet = true ;
691+ } else {
692+ $ min = min ($ min , $ datum [$ columns [0 ]]);
693+ }
694+
695+ }
696+ }
697+ }
698+
699+ return $ this ->_return ($ min , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
700+ } catch (Exception $ e ) {
701+
702+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
703+ throw new Exception ($ result ->errorMessage );
704+ }
705+
706+ }
707+
708+ private function _maxDistinctAggregate ($ wheres , $ options , $ columns ): Results
709+ {
710+ try {
711+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
712+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
713+
714+ $ max = 0 ;
715+ if (!empty ($ process ->data )) {
716+ foreach ($ process ->data as $ datum ) {
717+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
718+ $ max = max ($ max , $ datum [$ columns [0 ]]);
719+ }
720+ }
721+ }
722+
723+
724+ return $ this ->_return ($ max , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
725+ } catch (Exception $ e ) {
726+
727+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
728+ throw new Exception ($ result ->errorMessage );
729+ }
730+
731+ }
732+
733+
734+ private function _sumDistinctAggregate ($ wheres , $ options , $ columns ): Results
735+ {
736+ try {
737+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
738+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
739+ $ sum = 0 ;
740+ if (!empty ($ process ->data )) {
741+ foreach ($ process ->data as $ datum ) {
742+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
743+ $ sum += $ datum [$ columns [0 ]];
744+ }
745+ }
746+ }
747+
748+ return $ this ->_return ($ sum , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
749+ } catch (Exception $ e ) {
750+
751+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
752+ throw new Exception ($ result ->errorMessage );
753+ }
754+
755+ }
756+
757+ private function _avgDistinctAggregate ($ wheres , $ options , $ columns )
758+ {
759+ try {
760+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
761+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
762+ $ sum = 0 ;
763+ $ count = 0 ;
764+ $ avg = 0 ;
765+ if (!empty ($ process ->data )) {
766+ foreach ($ process ->data as $ datum ) {
767+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
768+ $ count ++;
769+ $ sum += $ datum [$ columns [0 ]];
770+ }
771+ }
772+ }
773+ if ($ count > 0 ) {
774+ $ avg = $ sum / $ count ;
775+ }
776+
777+
778+ return $ this ->_return ($ avg , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
779+ } catch (Exception $ e ) {
780+
781+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
782+ throw new Exception ($ result ->errorMessage );
783+ }
784+
785+ }
786+
787+ private function _matrixDistinctAggregate ($ wheres , $ options , $ columns ): Results
788+ {
789+ return $ this ->_returnError ('Matrix distinct aggregate not supported ' , 500 , [], $ this ->_queryTag (__FUNCTION__ ));
790+ }
643791
644792
645793 //======================================================================
@@ -675,10 +823,58 @@ private function _sanitizeSearchResponse($response, $params, $queryTag)
675823 return $ this ->_return ($ data , $ meta , $ params , $ queryTag );
676824 }
677825
826+ private function _sanitizeDistinctResponse ($ response , $ columns , $ includeDocCount )
827+ {
828+ $ keys = [];
829+ foreach ($ columns as $ column ) {
830+ $ keys [] = 'by_ ' .$ column ;
831+ }
832+
833+ return $ this ->processBuckets ($ columns , $ keys , $ response , 0 , $ includeDocCount );
834+
835+ }
836+
837+ private function processBuckets ($ columns , $ keys , $ response , $ index , $ includeDocCount , $ currentData = [])
838+ {
839+ $ data = [];
840+
841+ if (!empty ($ response [$ keys [$ index ]]['buckets ' ])) {
842+ foreach ($ response [$ keys [$ index ]]['buckets ' ] as $ res ) {
843+ $ datum = $ currentData ;
844+ $ datum [$ columns [$ index ]] = $ res ['key ' ];
845+ if ($ includeDocCount ) {
846+ $ datum [$ columns [$ index ].'_count ' ] = $ res ['doc_count ' ];
847+ }
848+
849+ if (isset ($ columns [$ index + 1 ])) {
850+ $ nestedData = $ this ->processBuckets ($ columns , $ keys , $ res , $ index + 1 , $ includeDocCount , $ datum );
851+
852+ if (!empty ($ nestedData )) {
853+ $ data = array_merge ($ data , $ nestedData );
854+ } else {
855+ $ data [] = $ datum ;
856+ }
857+ } else {
858+ $ data [] = $ datum ;
859+ }
860+ }
861+ }
862+
863+ return $ data ;
864+ }
865+
678866 private function _return ($ data , $ meta , $ params , $ queryTag )
679867 {
680868
681- $ results = new Results ($ data , $ meta , $ params , $ queryTag );
869+ if (is_object ($ meta )) {
870+ $ metaAsArray = [];
871+ if (method_exists ($ meta , 'asArray ' )) {
872+ $ metaAsArray = $ meta ->asArray ();
873+ }
874+ $ results = new Results ($ data , $ metaAsArray , $ params , $ queryTag );
875+ } else {
876+ $ results = new Results ($ data , $ meta , $ params , $ queryTag );
877+ }
682878 if ($ this ->queryLogger && !$ this ->queryLoggerOnErrorOnly ) {
683879 $ this ->_logQuery ($ results );
684880 }
0 commit comments