Skip to content

Commit

Permalink
Merge pull request #1069 from apache/SP-1065
Browse files Browse the repository at this point in the history
Sp 1065
  • Loading branch information
tenthe authored Jan 13, 2023
2 parents 505fdb6 + 8d07caf commit bd9a94d
Show file tree
Hide file tree
Showing 25 changed files with 672 additions and 174 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@
<spring-security.version>6.0.1</spring-security.version>
<swagger.version>2.2.7</swagger.version>
<type-parser.version>0.7.0</type-parser.version>
<underscore.version>1.47</underscore.version>
<wildfly-common.version>1.6.0.Final</wildfly-common.version>
<hawtbuf.version>1.11</hawtbuf.version>
<netty-tc-native.version>2.0.52.Final</netty-tc-native.version>
Expand Down Expand Up @@ -372,11 +371,6 @@
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
</dependency>
<dependency>
<groupId>com.github.javadev</groupId>
<artifactId>underscore</artifactId>
<version>${underscore.version}</version>
</dependency>
<dependency>
<groupId>com.squareup</groupId>
<artifactId>javapoet</artifactId>
Expand Down
14 changes: 10 additions & 4 deletions streampipes-extensions-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,18 @@
<artifactId>streampipes-service-discovery</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-vocabulary</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>


<!-- External dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.github.javadev</groupId>
<artifactId>underscore</artifactId>
</dependency>
<dependency>
<groupId>de.grundid.opendatalab</groupId>
<artifactId>geojson-jackson</artifactId>
Expand All @@ -114,6 +116,10 @@
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.parsson</groupId>
<artifactId>jakarta.json</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ public AdapterGuessInfo getSchemaAndSample(List<byte[]> oneEvent) {
EventSchema resultSchema = new EventSchema();
for (int i = 0; i < keys.length; i++) {
EventPropertyPrimitive p = new EventPropertyPrimitive();
var runtimeType = DatatypeUtils.getXsdDatatype(data[i], true);
var runtimeType = DatatypeUtils.getXsdDatatype(data[i], false);
var convertedValue = DatatypeUtils.convertValue(data[i], runtimeType);
p.setRuntimeName(keys[i]);
p.setRuntimeType(runtimeType);
sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i], true), convertedValue));
sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i], false), convertedValue));
resultSchema.addEventProperty(p);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.management.connect.adapter.format.geojson;

public class GeoJsonConstants {
public static final String LATITUDE = "latitude";
public static final String LONGITUDE = "longitude";
public static final String ALTITUDE = "altitude";

public static final String COORDINATES = "coordinates";
public static final String COORDINATES_LINE_STRING = "coordinatesLineString";
public static final String COORDINATES_POLYGON = "coordinatesPolygon";
public static final String COORDINATES_MULTI_POINT = "coordinatesMultiPoint";
public static final String COORDINATES_MULTI_STRING = "coordinatesMultiString";
public static final String COORDINATES_MULTI_POLYGON = "coordinatesMultiPolygon";

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,42 @@ private Map<String, Object> formatGeometryField(Map<String, Object> map) {
String type = (String) map.get("type");

if (type.equalsIgnoreCase("POINT")) {
List<Double> coordinates = (List<Double>) map.get("coordinates");
List<Double> coordinates = (List<Double>) map.get(GeoJsonConstants.COORDINATES);

try {
geometryFields.put("longitude", coordinates.get(0));
geometryFields.put("latitude", coordinates.get(1));
geometryFields.put(GeoJsonConstants.LONGITUDE, coordinates.get(0));
geometryFields.put(GeoJsonConstants.LATITUDE, coordinates.get(1));
if (coordinates.size() == 3) {
geometryFields.put("altitude", coordinates.get(2));
geometryFields.put(GeoJsonConstants.ALTITUDE, coordinates.get(2));
}
} catch (IndexOutOfBoundsException e) {
logger.error(e.getMessage());
}

} else if (type.equalsIgnoreCase("LINESTRING")) {
geometryFields.put("coordinatesLineString", map.get("coordinates").toString());
geometryFields.put(
GeoJsonConstants.COORDINATES_LINE_STRING,
map.get(GeoJsonConstants.COORDINATES).toString());

} else if (type.equalsIgnoreCase("POLYGON")) {
geometryFields.put("coordinatesPolygon", map.get("coordinates").toString());
geometryFields.put(
GeoJsonConstants.COORDINATES_POLYGON,
map.get(GeoJsonConstants.COORDINATES).toString());

} else if (type.equalsIgnoreCase("MULTIPOINT")) {
geometryFields.put("coordinatesMultiPoint", map.get("coordinates").toString());
geometryFields.put(
GeoJsonConstants.COORDINATES_MULTI_POINT,
map.get(GeoJsonConstants.COORDINATES).toString());

} else if (type.equalsIgnoreCase("MULTILINESTRING")) {
geometryFields.put("coordinatesMultiString", map.get("coordinates").toString());
geometryFields.put(
GeoJsonConstants.COORDINATES_MULTI_STRING,
map.get(GeoJsonConstants.COORDINATES).toString());

} else if (type.equalsIgnoreCase("MULTIPOLYGON")) {
geometryFields.put("coordinatesMultiPolygon", map.get("coordinates").toString());
geometryFields.put(
GeoJsonConstants.COORDINATES_MULTI_POLYGON,
map.get(GeoJsonConstants.COORDINATES).toString());

} else {
logger.error(type + "is not a suppported field type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@
import org.apache.streampipes.extensions.management.connect.adapter.format.util.JsonEventProperty;
import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Parser;
import org.apache.streampipes.model.connect.grounding.FormatDescription;
import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.io.CharStreams;
import com.google.gson.Gson;
import org.geojson.Feature;
import org.geojson.FeatureCollection;
import org.geojson.LineString;
import org.geojson.MultiLineString;
import org.geojson.MultiPoint;
Expand Down Expand Up @@ -66,7 +68,6 @@ public Parser getInstance(FormatDescription formatDescription) {

@Override
public void parse(InputStream data, EmitBinaryEvent emitBinaryEvent) throws ParseException {
FeatureCollection geoFeature;
Gson gson = new Gson();

try {
Expand All @@ -85,66 +86,106 @@ public void parse(InputStream data, EmitBinaryEvent emitBinaryEvent) throws Pars

@Override
public EventSchema getEventSchema(List<byte[]> oneEvent) {
EventSchema resultSchema = new EventSchema();
return this.getSchemaAndSample(oneEvent).getEventSchema();
}

@Override
public boolean supportsPreview() {
return true;
}

@Override
public AdapterGuessInfo getSchemaAndSample(List<byte[]> eventSample) throws ParseException {

Feature geoFeature = null;
try {
geoFeature = new ObjectMapper().readValue(oneEvent.get(0), Feature.class);
geoFeature = new ObjectMapper().readValue(eventSample.get(0), Feature.class);

} catch (IOException e) {
logger.error(e.toString());
}

for (Map.Entry<String, Object> entry : geoFeature.getProperties().entrySet()) {
EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());
resultSchema.addEventProperty(p);
}

List<EventProperty> eventProperties = parseGeometryField(geoFeature);
eventProperties.forEach(eventProperty -> resultSchema.addEventProperty(eventProperty));

return resultSchema;
return this.parseGeometryField(geoFeature);
}

private List<EventProperty> parseGeometryField(Feature geoFeature) {
private AdapterGuessInfo parseGeometryField(Feature geoFeature) {

EventSchema resultSchema = new EventSchema();
List<EventProperty> eventProperties = new LinkedList<>();
var sampleValues = new HashMap<String, GuessTypeInfo>();

if (geoFeature.getGeometry() instanceof Point) {
Point point = (Point) geoFeature.getGeometry();
eventProperties.add(getEventPropertyGeoJson("longitude", point.getCoordinates().getLongitude(),
"http://www.w3.org/2003/01/geo/wgs84_pos#long"));
eventProperties.add(getEventPropertyGeoJson("latitude", point.getCoordinates().getLatitude(),
"http://www.w3.org/2003/01/geo/wgs84_pos#lat"));
eventProperties.add(
getEventPropertyGeoJson(
GeoJsonConstants.LONGITUDE,
point.getCoordinates().getLongitude(),
Geo.LNG));
eventProperties.add(
getEventPropertyGeoJson(
GeoJsonConstants.LATITUDE,
point.getCoordinates().getLatitude(),
Geo.LAT));

sampleValues.put(GeoJsonConstants.LONGITUDE,
new GuessTypeInfo(Double.class.getName(),
point.getCoordinates().getLongitude()));
sampleValues.put(GeoJsonConstants.LATITUDE,
new GuessTypeInfo(Double.class.getName(),
point.getCoordinates().getLatitude()));
if (point.getCoordinates().hasAltitude()) {
eventProperties.add(getEventPropertyGeoJson("altitude", point.getCoordinates().getAltitude(), SO.ALTITUDE));
eventProperties.add(
getEventPropertyGeoJson(GeoJsonConstants.ALTITUDE, point.getCoordinates().getAltitude(), SO.ALTITUDE));
sampleValues.put(GeoJsonConstants.ALTITUDE,
new GuessTypeInfo(Double.class.getName(), point.getCoordinates().getAltitude()));
}

} else if (geoFeature.getGeometry() instanceof LineString) {
LineString lineString = (LineString) geoFeature.getGeometry();
eventProperties.add(JsonEventProperty.getEventProperty("coorindatesLineString", lineString.getCoordinates()));

eventProperties.add(
JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_LINE_STRING, lineString.getCoordinates()));
sampleValues.put(GeoJsonConstants.COORDINATES_LINE_STRING,
new GuessTypeInfo(Double.class.getName(), lineString.getCoordinates()));
} else if (geoFeature.getGeometry() instanceof Polygon) {
Polygon polygon = (Polygon) geoFeature.getGeometry();
eventProperties.add(JsonEventProperty.getEventProperty("coorindatesPolygon", polygon.getCoordinates()));

eventProperties.add(
JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_POLYGON, polygon.getCoordinates()));
sampleValues.put(GeoJsonConstants.COORDINATES_POLYGON,
new GuessTypeInfo(Double.class.getName(), polygon.getCoordinates()));
} else if (geoFeature.getGeometry() instanceof MultiPoint) {
MultiPoint multiPoint = (MultiPoint) geoFeature.getGeometry();
eventProperties.add(JsonEventProperty.getEventProperty("coorindatesMultiPoint", multiPoint.getCoordinates()));

eventProperties.add(
JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_MULTI_POINT, multiPoint.getCoordinates()));
sampleValues.put(GeoJsonConstants.COORDINATES_MULTI_POINT,
new GuessTypeInfo(Double.class.getName(), multiPoint.getCoordinates()));
} else if (geoFeature.getGeometry() instanceof MultiLineString) {
MultiLineString multiLineString = (MultiLineString) geoFeature.getGeometry();
eventProperties.add(
JsonEventProperty.getEventProperty("coorindatesMultiLineString", multiLineString.getCoordinates()));

JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_LINE_STRING,
multiLineString.getCoordinates()));
sampleValues.put(GeoJsonConstants.COORDINATES_LINE_STRING,
new GuessTypeInfo(Double.class.getName(), multiLineString.getCoordinates()));
} else if (geoFeature.getGeometry() instanceof MultiPolygon) {
MultiPolygon multiPolygon = (MultiPolygon) geoFeature.getGeometry();
eventProperties.add(JsonEventProperty.getEventProperty("coorindatesMultiPolygon", multiPolygon.getCoordinates()));
eventProperties.add(JsonEventProperty.getEventProperty(GeoJsonConstants.COORDINATES_MULTI_POLYGON,
multiPolygon.getCoordinates()));
sampleValues.put(GeoJsonConstants.COORDINATES_MULTI_POLYGON,
new GuessTypeInfo(Double.class.getName(), multiPolygon.getCoordinates()));
} else {
logger.error("No geometry field found in geofeature: " + geoFeature.toString());
}

return eventProperties;

for (Map.Entry<String, Object> entry : geoFeature.getProperties().entrySet()) {
EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());
resultSchema.addEventProperty(p);
sampleValues.put(p.getRuntimeName(),
new GuessTypeInfo(entry.getValue().getClass().getCanonicalName(), entry.getValue()));
}

eventProperties.forEach(eventProperty -> resultSchema.addEventProperty(eventProperty));

return new AdapterGuessInfo(resultSchema, sampleValues);
}

private EventProperty getEventPropertyGeoJson(String name, Object value, String domain) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.management.connect.adapter.format.json;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
import org.apache.streampipes.extensions.management.connect.adapter.format.util.JsonEventProperty;
import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Parser;
import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class AbstractJsonParser extends Parser {

@Override
public EventSchema getEventSchema(List<byte[]> oneEvent) {
return getSchemaAndSample(oneEvent).getEventSchema();
}

@Override
public boolean supportsPreview() {
return true;
}

@Override
public AdapterGuessInfo getSchemaAndSample(List<byte[]> eventSample) throws ParseException {
EventSchema resultSchema = new EventSchema();

JsonDataFormatDefinition jsonDefinition = new JsonDataFormatDefinition();


Map<String, Object> exampleEvent;

try {
exampleEvent = jsonDefinition.toMap(eventSample.get(0));
var sample = exampleEvent
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
new GuessTypeInfo(e.getValue().getClass().getCanonicalName(), e.getValue())));

for (Map.Entry<String, Object> entry : exampleEvent.entrySet()) {
EventProperty p = JsonEventProperty.getEventProperty(entry.getKey(), entry.getValue());

resultSchema.addEventProperty(p);
}

return new AdapterGuessInfo(resultSchema, sample);
} catch (SpRuntimeException e) {
throw new ParseException("Could not serialize event, did you choose the correct format?", e);
}
}

}
Loading

0 comments on commit bd9a94d

Please sign in to comment.