-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathStateConverter.java
318 lines (292 loc) · 14 KB
/
StateConverter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.workers.helper;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.GlobalState;
import io.airbyte.api.model.generated.StreamState;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
public class StateConverter {
/**
* Converts internal representation of state to API representation
*
* @param connectionId connection associated with the state
* @param stateWrapper internal state representation to convert
* @return api representation of state
*/
public static ConnectionState toApi(final UUID connectionId, final @Nullable StateWrapper stateWrapper) {
return new ConnectionState()
.connectionId(connectionId)
.stateType(convertStateTypeToApi(stateWrapper))
.state(stateWrapper != null ? stateWrapper.getLegacyState() : null)
.globalState(globalStateToApi(stateWrapper).orElse(null))
.streamState(streamStateToApi(stateWrapper).orElse(null));
}
/**
* Converts internal representation of state to client representation
*
* @param connectionId connection associated with the state
* @param stateWrapper internal state representation to convert
* @return client representation of state
*/
public static io.airbyte.api.client.model.generated.ConnectionState toClient(final UUID connectionId, final @Nullable StateWrapper stateWrapper) {
return new io.airbyte.api.client.model.generated.ConnectionState()
.connectionId(connectionId)
.stateType(convertStateTypeToClient(stateWrapper))
.state(stateWrapper != null ? stateWrapper.getLegacyState() : null)
.globalState(globalStateToClient(stateWrapper).orElse(null))
.streamState(streamStateToClient(stateWrapper).orElse(null));
}
/**
* Converts API representation of state to internal representation
*
* @param apiConnectionState api representation of state
* @return internal representation of state
*/
public static StateWrapper toInternal(final @Nullable ConnectionState apiConnectionState) {
return new StateWrapper()
.withStateType(convertStateTypeToInternal(apiConnectionState).orElse(null))
.withGlobal(globalStateToInternal(apiConnectionState).orElse(null))
.withLegacyState(apiConnectionState != null ? apiConnectionState.getState() : null)
.withStateMessages(streamStateToInternal(apiConnectionState).orElse(null));
}
public static StateWrapper clientToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState clientConnectionState) {
return new StateWrapper()
.withStateType(clientConnectionState != null ? convertClientStateTypeToInternal(clientConnectionState.getStateType()) : null)
.withGlobal(clientGlobalStateToInternal(clientConnectionState).orElse(null))
.withLegacyState(clientConnectionState != null ? clientConnectionState.getState() : null)
.withStateMessages(clientStreamStateToInternal(clientConnectionState).orElse(null));
}
public static StateType convertClientStateTypeToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionStateType connectionStateType) {
if (connectionStateType == null || connectionStateType.equals(io.airbyte.api.client.model.generated.ConnectionStateType.NOT_SET)) {
return null;
} else {
return Enums.convertTo(connectionStateType, StateType.class);
}
}
/**
* Convert to API representation of state type. API has an additional type (NOT_SET). This
* represents the case where no state is saved so we do not know the state type.
*
* @param stateWrapper state to convert
* @return api representation of state type
*/
private static ConnectionStateType convertStateTypeToApi(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper == null || stateWrapper.getStateType() == null) {
return ConnectionStateType.NOT_SET;
} else {
return Enums.convertTo(stateWrapper.getStateType(), ConnectionStateType.class);
}
}
/**
* Convert to client representation of state type. The client model has an additional type
* (NOT_SET). This represents the case where no state is saved so we do not know the state type.
*
* @param stateWrapper state to convert
* @return client representation of state type
*/
private static io.airbyte.api.client.model.generated.ConnectionStateType convertStateTypeToClient(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper == null || stateWrapper.getStateType() == null) {
return io.airbyte.api.client.model.generated.ConnectionStateType.NOT_SET;
} else {
return Enums.convertTo(stateWrapper.getStateType(), io.airbyte.api.client.model.generated.ConnectionStateType.class);
}
}
/**
* Convert to internal representation of state type, if set. Otherise, empty optional
*
* @param connectionState API state to convert.
* @return internal state type, if set. Otherwise, empty optional.
*/
private static Optional<StateType> convertStateTypeToInternal(final @Nullable ConnectionState connectionState) {
if (connectionState == null || connectionState.getStateType().equals(ConnectionStateType.NOT_SET)) {
return Optional.empty();
} else {
return Optional.of(Enums.convertTo(connectionState.getStateType(), StateType.class));
}
}
/**
* If wrapper is of type global state, returns API representation of global state. Otherwise, empty
* optional.
*
* @param stateWrapper state wrapper to extract from
* @return api representation of global state if state wrapper is type global. Otherwise, empty
* optional.
*/
private static Optional<GlobalState> globalStateToApi(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null
&& stateWrapper.getStateType() == StateType.GLOBAL
&& stateWrapper.getGlobal() != null
&& stateWrapper.getGlobal().getGlobal() != null) {
return Optional.of(new GlobalState()
.sharedState(stateWrapper.getGlobal().getGlobal().getSharedState())
.streamStates(stateWrapper.getGlobal().getGlobal().getStreamStates()
.stream()
.map(StateConverter::streamStateStructToApi)
.toList()));
} else {
return Optional.empty();
}
}
/**
* If wrapper is of type global state, returns client representation of global state. Otherwise,
* empty optional.
*
* @param stateWrapper state wrapper to extract from
* @return client representation of global state if state wrapper is type global. Otherwise, empty
* optional.
*/
private static Optional<io.airbyte.api.client.model.generated.GlobalState> globalStateToClient(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null
&& stateWrapper.getStateType() == StateType.GLOBAL
&& stateWrapper.getGlobal() != null
&& stateWrapper.getGlobal().getGlobal() != null) {
return Optional.of(new io.airbyte.api.client.model.generated.GlobalState()
.sharedState(stateWrapper.getGlobal().getGlobal().getSharedState())
.streamStates(stateWrapper.getGlobal().getGlobal().getStreamStates()
.stream()
.map(StateConverter::streamStateStructToClient)
.toList()));
} else {
return Optional.empty();
}
}
/**
* If API state is of type global, returns internal representation of global state. Otherwise, empty
* optional.
*
* @param connectionState API state representation to extract from
* @return global state message if API state is of type global. Otherwise, empty optional.
*/
private static Optional<AirbyteStateMessage> globalStateToInternal(final @Nullable ConnectionState connectionState) {
if (connectionState != null
&& connectionState.getStateType() == ConnectionStateType.GLOBAL
&& connectionState.getGlobalState() != null) {
return Optional.of(new AirbyteStateMessage()
.withGlobal(new AirbyteGlobalState()
.withSharedState(connectionState.getGlobalState().getSharedState())
.withStreamStates(connectionState.getGlobalState().getStreamStates()
.stream()
.map(StateConverter::streamStateStructToInternal)
.toList())));
} else {
return Optional.empty();
}
}
private static Optional<AirbyteStateMessage> clientGlobalStateToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState connectionState) {
if (connectionState != null
&& connectionState.getStateType() == io.airbyte.api.client.model.generated.ConnectionStateType.GLOBAL
&& connectionState.getGlobalState() != null) {
return Optional.of(new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(new AirbyteGlobalState()
.withSharedState(connectionState.getGlobalState().getSharedState())
.withStreamStates(connectionState.getGlobalState().getStreamStates()
.stream()
.map(StateConverter::clientStreamStateStructToInternal)
.toList())));
} else {
return Optional.empty();
}
}
/**
* If wrapper is of type stream state, returns API representation of stream state. Otherwise, empty
* optional.
*
* @param stateWrapper state wrapper to extract from
* @return api representation of stream state if state wrapper is type stream. Otherwise, empty
* optional.
*/
private static Optional<List<StreamState>> streamStateToApi(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null && stateWrapper.getStateType() == StateType.STREAM && stateWrapper.getStateMessages() != null) {
return Optional.ofNullable(stateWrapper.getStateMessages()
.stream()
.map(AirbyteStateMessage::getStream)
.map(StateConverter::streamStateStructToApi)
.toList());
} else {
return Optional.empty();
}
}
/**
* If wrapper is of type stream state, returns client representation of stream state. Otherwise,
* empty optional.
*
* @param stateWrapper state wrapper to extract from
* @return client representation of stream state if state wrapper is type stream. Otherwise, empty
* optional.
*/
private static Optional<List<io.airbyte.api.client.model.generated.StreamState>> streamStateToClient(final @Nullable StateWrapper stateWrapper) {
if (stateWrapper != null && stateWrapper.getStateType() == StateType.STREAM && stateWrapper.getStateMessages() != null) {
return Optional.ofNullable(stateWrapper.getStateMessages()
.stream()
.map(AirbyteStateMessage::getStream)
.map(StateConverter::streamStateStructToClient)
.toList());
} else {
return Optional.empty();
}
}
/**
* If API state is of type stream, returns internal representation of stream state. Otherwise, empty
* optional.
*
* @param connectionState API representation of state to extract from
* @return internal representation of stream state if API state representation is of type stream.
* Otherwise, empty optional.
*/
private static Optional<List<AirbyteStateMessage>> streamStateToInternal(final @Nullable ConnectionState connectionState) {
if (connectionState != null && connectionState.getStateType() == ConnectionStateType.STREAM && connectionState.getStreamState() != null) {
return Optional.ofNullable(connectionState.getStreamState()
.stream()
.map(StateConverter::streamStateStructToInternal)
.map(s -> new AirbyteStateMessage().withStream(s))
.toList());
} else {
return Optional.empty();
}
}
private static Optional<List<AirbyteStateMessage>> clientStreamStateToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState connectionState) {
if (connectionState != null && connectionState.getStateType() == io.airbyte.api.client.model.generated.ConnectionStateType.STREAM
&& connectionState.getStreamState() != null) {
return Optional.ofNullable(connectionState.getStreamState()
.stream()
.map(StateConverter::clientStreamStateStructToInternal)
.map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM).withStream(s))
.toList());
} else {
return Optional.empty();
}
}
private static StreamState streamStateStructToApi(final AirbyteStreamState streamState) {
return new StreamState()
.streamDescriptor(ProtocolConverters.streamDescriptorToApi(streamState.getStreamDescriptor()))
.streamState(streamState.getStreamState());
}
private static io.airbyte.api.client.model.generated.StreamState streamStateStructToClient(final AirbyteStreamState streamState) {
return new io.airbyte.api.client.model.generated.StreamState()
.streamDescriptor(ProtocolConverters.streamDescriptorToClient(streamState.getStreamDescriptor()))
.streamState(streamState.getStreamState());
}
private static AirbyteStreamState streamStateStructToInternal(final StreamState streamState) {
return new AirbyteStreamState()
.withStreamDescriptor(ProtocolConverters.streamDescriptorToProtocol(streamState.getStreamDescriptor()))
.withStreamState(streamState.getStreamState());
}
private static AirbyteStreamState clientStreamStateStructToInternal(final io.airbyte.api.client.model.generated.StreamState streamState) {
return new AirbyteStreamState()
.withStreamDescriptor(ProtocolConverters.clientStreamDescriptorToProtocol(streamState.getStreamDescriptor()))
.withStreamState(streamState.getStreamState());
}
}