|
| 1 | +package de.rwth.idsg.steve; |
| 2 | + |
| 3 | +import com.fasterxml.jackson.core.JsonParser; |
| 4 | +import com.fasterxml.jackson.core.TreeNode; |
| 5 | +import com.fasterxml.jackson.databind.JsonNode; |
| 6 | +import com.fasterxml.jackson.databind.ObjectMapper; |
| 7 | +import de.rwth.idsg.steve.ocpp.OcppVersion; |
| 8 | +import de.rwth.idsg.steve.ocpp.RequestType; |
| 9 | +import de.rwth.idsg.steve.ocpp.ResponseType; |
| 10 | +import de.rwth.idsg.steve.ocpp.ws.JsonObjectMapper; |
| 11 | +import de.rwth.idsg.steve.ocpp.ws.data.CommunicationContext; |
| 12 | +import de.rwth.idsg.steve.ocpp.ws.data.ErrorCode; |
| 13 | +import de.rwth.idsg.steve.ocpp.ws.data.MessageType; |
| 14 | +import de.rwth.idsg.steve.ocpp.ws.data.OcppJsonCall; |
| 15 | +import de.rwth.idsg.steve.ocpp.ws.data.OcppJsonError; |
| 16 | +import de.rwth.idsg.steve.ocpp.ws.data.OcppJsonResponse; |
| 17 | +import de.rwth.idsg.steve.ocpp.ws.data.OcppJsonResult; |
| 18 | +import de.rwth.idsg.steve.ocpp.ws.pipeline.Serializer; |
| 19 | +import org.eclipse.jetty.websocket.api.Session; |
| 20 | +import org.eclipse.jetty.websocket.api.StatusCode; |
| 21 | +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; |
| 22 | +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; |
| 23 | +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; |
| 24 | +import org.eclipse.jetty.websocket.api.annotations.WebSocket; |
| 25 | +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; |
| 26 | +import org.eclipse.jetty.websocket.client.WebSocketClient; |
| 27 | + |
| 28 | +import java.io.IOException; |
| 29 | +import java.net.URI; |
| 30 | +import java.util.Collection; |
| 31 | +import java.util.LinkedHashMap; |
| 32 | +import java.util.Map; |
| 33 | +import java.util.UUID; |
| 34 | +import java.util.concurrent.CountDownLatch; |
| 35 | +import java.util.concurrent.Future; |
| 36 | +import java.util.concurrent.TimeUnit; |
| 37 | +import java.util.function.Consumer; |
| 38 | + |
| 39 | +/** |
| 40 | + * @author Sevket Goekay <goekay@dbis.rwth-aachen.de> |
| 41 | + * @since 21.03.2018 |
| 42 | + */ |
| 43 | +@WebSocket |
| 44 | +public class OcppJsonChargePoint { |
| 45 | + |
| 46 | + private final OcppVersion version; |
| 47 | + private final String chargeBoxId; |
| 48 | + private final String connectionPath; |
| 49 | + private final Map<String, ResponseContext> responseContextMap; |
| 50 | + private final ResponseDeserializer deserializer; |
| 51 | + private final WebSocketClient client; |
| 52 | + private final CountDownLatch closeHappenedSignal; |
| 53 | + |
| 54 | + private CountDownLatch receivedResponsesSignal; |
| 55 | + private Session session; |
| 56 | + |
| 57 | + OcppJsonChargePoint(OcppVersion version, String chargeBoxId, String pathPrefix) { |
| 58 | + this.version = version; |
| 59 | + this.chargeBoxId = chargeBoxId; |
| 60 | + this.connectionPath = pathPrefix + chargeBoxId; |
| 61 | + this.responseContextMap = new LinkedHashMap<>(); // because we want to keep the insertion order of test cases |
| 62 | + this.deserializer = new ResponseDeserializer(); |
| 63 | + this.client = new WebSocketClient(); |
| 64 | + this.closeHappenedSignal = new CountDownLatch(1); |
| 65 | + } |
| 66 | + |
| 67 | + @OnWebSocketConnect |
| 68 | + public void onConnect(Session session) { |
| 69 | + this.session = session; |
| 70 | + } |
| 71 | + |
| 72 | + @OnWebSocketClose |
| 73 | + public void onClose(Session session, int statusCode, String reason) { |
| 74 | + this.session = null; |
| 75 | + this.closeHappenedSignal.countDown(); |
| 76 | + } |
| 77 | + |
| 78 | + @OnWebSocketMessage |
| 79 | + public void onMessage(Session session, String msg) { |
| 80 | + try { |
| 81 | + OcppJsonResponse response = deserializer.extractResponse(msg); |
| 82 | + ResponseContext ctx = responseContextMap.remove(response.getMessageId()); |
| 83 | + |
| 84 | + if (response instanceof OcppJsonResult) { |
| 85 | + ctx.responseHandler.accept(((OcppJsonResult) response).getPayload()); |
| 86 | + } else if (response instanceof OcppJsonError) { |
| 87 | + ctx.errorHandler.accept((OcppJsonError) response); |
| 88 | + } |
| 89 | + } catch (Exception e) { |
| 90 | + e.printStackTrace(); |
| 91 | + } finally { |
| 92 | + if (receivedResponsesSignal != null) { |
| 93 | + receivedResponsesSignal.countDown(); |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + public void start() { |
| 99 | + try { |
| 100 | + ClientUpgradeRequest request = new ClientUpgradeRequest(); |
| 101 | + request.setSubProtocols(version.getValue()); |
| 102 | + |
| 103 | + client.start(); |
| 104 | + |
| 105 | + Future<Session> connect = client.connect(this, new URI(connectionPath), request); |
| 106 | + connect.get(); // block until session is created |
| 107 | + } catch (Throwable t) { |
| 108 | + t.printStackTrace(); |
| 109 | + } |
| 110 | + } |
| 111 | + |
| 112 | + public <T extends ResponseType> void prepare(RequestType request, Class<T> responseClass, |
| 113 | + Consumer<T> responseHandler, Consumer<OcppJsonError> errorHandler) { |
| 114 | + String messageId = UUID.randomUUID().toString(); |
| 115 | + |
| 116 | + OcppJsonCall call = new OcppJsonCall(); |
| 117 | + call.setMessageId(messageId); |
| 118 | + call.setPayload(request); |
| 119 | + call.setAction(getOperationName(request)); |
| 120 | + |
| 121 | + // session is null, because we do not need org.springframework.web.socket.WebSocketSession |
| 122 | + CommunicationContext ctx = new CommunicationContext(null, chargeBoxId); |
| 123 | + ctx.setOutgoingMessage(call); |
| 124 | + |
| 125 | + Serializer.INSTANCE.accept(ctx); |
| 126 | + |
| 127 | + ResponseContext resCtx = new ResponseContext(ctx.getOutgoingString(), responseClass, responseHandler, errorHandler); |
| 128 | + responseContextMap.put(messageId, resCtx); |
| 129 | + } |
| 130 | + |
| 131 | + public void processAndClose() { |
| 132 | + Collection<ResponseContext> values = responseContextMap.values(); |
| 133 | + receivedResponsesSignal = new CountDownLatch(values.size()); |
| 134 | + |
| 135 | + // send all messages |
| 136 | + for (ResponseContext ctx : values) { |
| 137 | + try { |
| 138 | + session.getRemote().sendString(ctx.outgoingMessage); |
| 139 | + } catch (IOException e) { |
| 140 | + e.printStackTrace(); |
| 141 | + } |
| 142 | + } |
| 143 | + |
| 144 | + try { |
| 145 | + // wait for all responses to arrive and be processed |
| 146 | + receivedResponsesSignal.await(15, TimeUnit.SECONDS); |
| 147 | + |
| 148 | + // "enqueue" a graceful close |
| 149 | + session.close(StatusCode.NORMAL, "Finished"); |
| 150 | + |
| 151 | + // wait for close to happen |
| 152 | + closeHappenedSignal.await(15, TimeUnit.SECONDS); |
| 153 | + |
| 154 | + // well, stop the client |
| 155 | + client.stop(); |
| 156 | + } catch (Exception e) { |
| 157 | + throw new RuntimeException(e); |
| 158 | + } |
| 159 | + } |
| 160 | + |
| 161 | + // ------------------------------------------------------------------------- |
| 162 | + // Private helpers |
| 163 | + // ------------------------------------------------------------------------- |
| 164 | + |
| 165 | + private static String getOperationName(RequestType requestType) { |
| 166 | + String s = requestType.getClass().getSimpleName(); |
| 167 | + if (s.endsWith("Request")) { |
| 168 | + s = s.substring(0, s.length() - 7); |
| 169 | + } |
| 170 | + return s; |
| 171 | + } |
| 172 | + |
| 173 | + private static class ResponseContext { |
| 174 | + private final String outgoingMessage; |
| 175 | + private final Class<ResponseType> responseClass; |
| 176 | + private final Consumer<ResponseType> responseHandler; |
| 177 | + private final Consumer<OcppJsonError> errorHandler; |
| 178 | + |
| 179 | + @SuppressWarnings("unchecked") |
| 180 | + private <T extends ResponseType> ResponseContext(String outgoingMessage, |
| 181 | + Class<T> responseClass, |
| 182 | + Consumer<T> responseHandler, |
| 183 | + Consumer<OcppJsonError> errorHandler) { |
| 184 | + this.outgoingMessage = outgoingMessage; |
| 185 | + this.responseClass = (Class<ResponseType>) responseClass; |
| 186 | + this.responseHandler = (Consumer<ResponseType>) responseHandler; |
| 187 | + this.errorHandler = errorHandler; |
| 188 | + } |
| 189 | + } |
| 190 | + |
| 191 | + private class ResponseDeserializer { |
| 192 | + |
| 193 | + private OcppJsonResponse extractResponse(String msg) throws Exception { |
| 194 | + ObjectMapper mapper = JsonObjectMapper.INSTANCE.getMapper(); |
| 195 | + |
| 196 | + try (JsonParser parser = mapper.getFactory().createParser(msg)) { |
| 197 | + parser.nextToken(); // set cursor to '[' |
| 198 | + |
| 199 | + parser.nextToken(); |
| 200 | + int messageTypeNr = parser.getIntValue(); |
| 201 | + |
| 202 | + parser.nextToken(); |
| 203 | + String messageId = parser.getText(); |
| 204 | + |
| 205 | + MessageType messageType = MessageType.fromTypeNr(messageTypeNr); |
| 206 | + switch (messageType) { |
| 207 | + case CALL_RESULT: |
| 208 | + return handleResult(messageId, parser); |
| 209 | + case CALL_ERROR: |
| 210 | + return handleError(messageId, parser); |
| 211 | + default: |
| 212 | + throw new SteveException("Unknown enum type"); |
| 213 | + } |
| 214 | + } |
| 215 | + } |
| 216 | + |
| 217 | + private OcppJsonResponse handleResult(String messageId, JsonParser parser) throws Exception { |
| 218 | + parser.nextToken(); |
| 219 | + JsonNode responsePayload = parser.readValueAsTree(); |
| 220 | + Class<ResponseType> clazz = responseContextMap.get(messageId).responseClass; |
| 221 | + ResponseType res = JsonObjectMapper.INSTANCE.getMapper().treeToValue(responsePayload, clazz); |
| 222 | + |
| 223 | + OcppJsonResult result = new OcppJsonResult(); |
| 224 | + result.setMessageId(messageId); |
| 225 | + result.setPayload(res); |
| 226 | + return result; |
| 227 | + } |
| 228 | + |
| 229 | + private OcppJsonResponse handleError(String messageId, JsonParser parser) throws Exception { |
| 230 | + parser.nextToken(); |
| 231 | + ErrorCode code = ErrorCode.fromValue(parser.getText()); |
| 232 | + |
| 233 | + parser.nextToken(); |
| 234 | + String desc = parser.getText(); |
| 235 | + if ("".equals(desc)) { |
| 236 | + desc = null; |
| 237 | + } |
| 238 | + |
| 239 | + String details = null; |
| 240 | + parser.nextToken(); |
| 241 | + TreeNode detailsNode = parser.readValueAsTree(); |
| 242 | + if (detailsNode != null && detailsNode.size() != 0) { |
| 243 | + details = JsonObjectMapper.INSTANCE.getMapper().writeValueAsString(detailsNode); |
| 244 | + } |
| 245 | + |
| 246 | + OcppJsonError error = new OcppJsonError(); |
| 247 | + error.setMessageId(messageId); |
| 248 | + error.setErrorCode(code); |
| 249 | + error.setErrorDescription(desc); |
| 250 | + error.setErrorDetails(details); |
| 251 | + return error; |
| 252 | + } |
| 253 | + } |
| 254 | + |
| 255 | +} |
0 commit comments