2
2
3
3
import static com .google .common .collect .ImmutableList .copyOf ;
4
4
import static com .google .common .collect .Iterables .concat ;
5
+ import static java .util .Collections .unmodifiableMap ;
5
6
import static org .hypertrace .core .graphql .atttributes .scopes .HypertraceCoreAttributeScopeString .SPAN ;
6
- import static org .hypertrace .core .graphql .span .joiner .SpanJoin .SPAN_KEY ;
7
+ import static org .hypertrace .core .graphql .span .joiner .SpanJoin .SPANS_KEY ;
7
8
8
9
import graphql .schema .DataFetchingFieldSelectionSet ;
9
10
import graphql .schema .SelectedField ;
10
11
import io .reactivex .rxjava3 .core .Observable ;
11
12
import io .reactivex .rxjava3 .core .Single ;
13
+ import java .util .ArrayList ;
12
14
import java .util .Collection ;
13
15
import java .util .Collections ;
16
+ import java .util .HashMap ;
14
17
import java .util .List ;
15
18
import java .util .Map ;
16
19
import java .util .Map .Entry ;
@@ -77,7 +80,7 @@ public Single<SpanJoiner> build(
77
80
78
81
private List <SelectedField > getSelections (
79
82
DataFetchingFieldSelectionSet selectionSet , List <String > pathToSpanJoin ) {
80
- List <String > fullPath = copyOf (concat (pathToSpanJoin , List .of (SPAN_KEY )));
83
+ List <String > fullPath = copyOf (concat (pathToSpanJoin , List .of (SPANS_KEY )));
81
84
return selectionFinder
82
85
.findSelections (selectionSet , SelectionQuery .builder ().selectionPath (fullPath ).build ())
83
86
.collect (Collectors .toUnmodifiableList ());
@@ -91,34 +94,45 @@ private class DefaultSpanJoiner implements SpanJoiner {
91
94
private final List <SelectedField > selectedFields ;
92
95
93
96
@ Override
94
- public <T > Single <Map <T , Span >> joinSpans (
97
+ public <T > Single <Map <T , Collection < Span > >> joinSpans (
95
98
Collection <T > joinSources , SpanIdGetter <T > spanIdGetter ) {
96
99
return this .buildSourceToIdMap (joinSources , spanIdGetter ).flatMap (this ::joinSpans );
97
100
}
98
101
99
- private <T > Single <Map <T , Span >> joinSpans (Map <T , String > sourceToSpanIdMap ) {
100
- return this .buildSpanRequest (sourceToSpanIdMap )
102
+ private <T > Single <Map <T , Collection <Span >>> joinSpans (
103
+ Map <T , Collection <String >> sourceToSpanIdsMap ) {
104
+ return this .buildSpanRequest (sourceToSpanIdsMap )
101
105
.flatMap (spanDao ::getSpans )
102
106
.map (this ::buildSpanIdToSpanMap )
103
- .map (spanIdToSpanMap -> buildSourceToSpanMap ( sourceToSpanIdMap , spanIdToSpanMap ));
107
+ .map (spanIdToSpanMap -> buildSourceToSpansMap ( sourceToSpanIdsMap , spanIdToSpanMap ));
104
108
}
105
109
106
- private <T > Map <T , Span > buildSourceToSpanMap (
107
- Map <T , String > sourceToSpanIdMap , Map <String , Span > spanIdToSpanMap ) {
108
- return sourceToSpanIdMap .entrySet ().stream ()
109
- .filter (entry -> spanIdToSpanMap .containsKey (entry .getValue ()))
110
- .collect (
111
- Collectors .toUnmodifiableMap (
112
- Entry ::getKey , entry -> spanIdToSpanMap .get (entry .getValue ())));
110
+ private <T > Map <T , Collection <Span >> buildSourceToSpansMap (
111
+ Map <T , Collection <String >> sourceToSpanIdsMap , Map <String , Span > spanIdToSpanMap ) {
112
+ Map <T , Collection <Span >> sourceToSpansMap = new HashMap <>();
113
+ for (Entry <T , Collection <String >> entry : sourceToSpanIdsMap .entrySet ()) {
114
+ List <Span > spans = new ArrayList <>();
115
+ for (String spanId : entry .getValue ()) {
116
+ if (spanIdToSpanMap .containsKey (spanId )) {
117
+ spans .add (spanIdToSpanMap .get (spanId ));
118
+ }
119
+ }
120
+ sourceToSpansMap .put (
121
+ entry .getKey (), spans .stream ().distinct ().collect (Collectors .toUnmodifiableList ()));
122
+ }
123
+ return unmodifiableMap (sourceToSpansMap );
113
124
}
114
125
115
126
private Map <String , Span > buildSpanIdToSpanMap (SpanResultSet resultSet ) {
116
127
return resultSet .results ().stream ()
117
128
.collect (Collectors .toUnmodifiableMap (Identifiable ::id , Function .identity ()));
118
129
}
119
130
120
- private <T > Single <SpanRequest > buildSpanRequest (Map <T , String > sourceToSpanIdMap ) {
121
- Collection <String > spanIds = sourceToSpanIdMap .values ();
131
+ private <T > Single <SpanRequest > buildSpanRequest (Map <T , Collection <String >> sourceToSpanIdMap ) {
132
+ Set <String > spanIds =
133
+ sourceToSpanIdMap .values ().stream ()
134
+ .flatMap (Collection ::stream )
135
+ .collect (Collectors .toUnmodifiableSet ());
122
136
return buildSpanIdsFilter (spanIds )
123
137
.flatMap (filterArguments -> buildSpanRequest (spanIds .size (), filterArguments ));
124
138
}
@@ -144,16 +158,16 @@ private Single<List<AttributeAssociation<FilterArgument>>> buildSpanIdsFilter(
144
158
return filterRequestBuilder .build (context , SPAN , Set .of (new SpanIdFilter (spanIds )));
145
159
}
146
160
147
- private <T > Single <Map <T , String >> buildSourceToIdMap (
161
+ private <T > Single <Map <T , Collection < String > >> buildSourceToIdMap (
148
162
Collection <T > joinSources , SpanIdGetter <T > spanIdGetter ) {
149
163
return Observable .fromIterable (joinSources )
150
164
.flatMapSingle (source -> this .maybeBuildMapEntry (source , spanIdGetter ))
151
165
.collect (Collectors .toMap (Entry ::getKey , Entry ::getValue ));
152
166
}
153
167
154
- private <T > Single <Entry <T , String >> maybeBuildMapEntry (
168
+ private <T > Single <Entry <T , Collection < String > >> maybeBuildMapEntry (
155
169
T source , SpanIdGetter <T > spanIdGetter ) {
156
- return spanIdGetter .getSpanId (source ).map (id -> Map .entry (source , id ));
170
+ return spanIdGetter .getSpanIds (source ).map (ids -> Map .entry (source , ids ));
157
171
}
158
172
}
159
173
0 commit comments