Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pinot.common.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.EqualityUtils;
import org.apache.pinot.common.utils.JsonUtils;

import static org.apache.pinot.common.utils.EqualityUtils.hashCodeOf;
import static org.apache.pinot.common.utils.EqualityUtils.isEqual;
Expand All @@ -43,7 +45,11 @@ public class CombinedConfig {

@Override
public String toString() {
return "CombinedConfig{" + "_offline=" + _offline + ", _realtime=" + _realtime + ", _schema=" + _schema + '}';
try {
return JsonUtils.objectToPrettyString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public TableConfig getOfflineTableConfig() {
Expand Down
39 changes: 28 additions & 11 deletions pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,42 +361,59 @@ public TimeUnit getOutgoingTimeUnit() {
return (_timeFieldSpec != null) ? _timeFieldSpec.getOutgoingGranularitySpec().getTimeType() : null;
}

@JsonIgnore
@Nonnull
public String getJSONSchema() {
ObjectNode jsonSchema = JsonUtils.newObjectNode();
jsonSchema.put("schemaName", _schemaName);
/**
* Returns a json representation of the schema.
*/
public ObjectNode toJsonObject() {
ObjectNode jsonObject = JsonUtils.newObjectNode();
jsonObject.put("schemaName", _schemaName);
if (!_dimensionFieldSpecs.isEmpty()) {
ArrayNode jsonArray = JsonUtils.newArrayNode();
for (DimensionFieldSpec dimensionFieldSpec : _dimensionFieldSpecs) {
jsonArray.add(dimensionFieldSpec.toJsonObject());
}
jsonSchema.set("dimensionFieldSpecs", jsonArray);
jsonObject.set("dimensionFieldSpecs", jsonArray);
}
if (!_metricFieldSpecs.isEmpty()) {
ArrayNode jsonArray = JsonUtils.newArrayNode();
for (MetricFieldSpec metricFieldSpec : _metricFieldSpecs) {
jsonArray.add(metricFieldSpec.toJsonObject());
}
jsonSchema.set("metricFieldSpecs", jsonArray);
jsonObject.set("metricFieldSpecs", jsonArray);
}
if (_timeFieldSpec != null) {
jsonSchema.set("timeFieldSpec", _timeFieldSpec.toJsonObject());
jsonObject.set("timeFieldSpec", _timeFieldSpec.toJsonObject());
}
if (!_dateTimeFieldSpecs.isEmpty()) {
ArrayNode jsonArray = JsonUtils.newArrayNode();
for (DateTimeFieldSpec dateTimeFieldSpec : _dateTimeFieldSpecs) {
jsonArray.add(dateTimeFieldSpec.toJsonObject());
}
jsonSchema.set("dateTimeFieldSpecs", jsonArray);
jsonObject.set("dateTimeFieldSpecs", jsonArray);
}
return jsonObject;
}

/**
* Returns a pretty json string representation of the schema.
*/
@Nonnull
public String toPrettyJsonString() {
try {
return JsonUtils.objectToPrettyString(jsonSchema);
return JsonUtils.objectToPrettyString(toJsonObject());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

/**
* Returns a single-line json string representation of the schema.
*/
@Nonnull
public String toSingleLineJsonString() {
return toJsonObject().toString();
}

/**
* Validates a pinot schema.
* <p>The following validations are performed:
Expand Down Expand Up @@ -622,7 +639,7 @@ public Schema build() {

@Override
public String toString() {
return getJSONSchema();
return toPrettyJsonString();
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Schema fromZNRecord(@Nonnull ZNRecord record)
*/
public static ZNRecord toZNRecord(@Nonnull Schema schema) {
ZNRecord record = new ZNRecord(schema.getSchemaName());
record.setSimpleField("schemaJSON", schema.getJSONSchema());
record.setSimpleField("schemaJSON", schema.toSingleLineJsonString());
return record;
}

Expand Down Expand Up @@ -118,7 +118,7 @@ public static boolean postSchema(@Nonnull String host, int port, @Nonnull Schema
URL url = new URL("http", host, port, "/schemas");
PostMethod httpPost = new PostMethod(url.toString());
try {
Part[] parts = {new StringPart(schema.getSchemaName(), schema.toString())};
Part[] parts = {new StringPart(schema.getSchemaName(), schema.toSingleLineJsonString())};
MultipartRequestEntity requestEntity = new MultipartRequestEntity(parts, new HttpMethodParams());
httpPost.setRequestEntity(requestEntity);
int responseCode = HTTP_CLIENT.executeMethod(httpPost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ public void testSerializeDeserialize()
Assert.assertNotNull(resourceUrl);
Schema schema = Schema.fromFile(new File(resourceUrl.getFile()));

Schema schemaToCompare = Schema.fromString(schema.getJSONSchema());
Schema schemaToCompare = Schema.fromString(schema.toPrettyJsonString());
Assert.assertEquals(schemaToCompare, schema);
Assert.assertEquals(schemaToCompare.hashCode(), schema.hashCode());

schemaToCompare = Schema.fromString(schema.toSingleLineJsonString());
Assert.assertEquals(schemaToCompare, schema);
Assert.assertEquals(schemaToCompare.hashCode(), schema.hashCode());

Expand All @@ -275,10 +279,10 @@ public void testSerializeDeserialize()
Assert.assertEquals(schemaToCompare.hashCode(), schema.hashCode());

// When setting new fields, schema string should be updated
String JSONSchema = schemaToCompare.getJSONSchema();
String jsonSchema = schemaToCompare.toSingleLineJsonString();
schemaToCompare.setSchemaName("newSchema");
String JSONSchemaToCompare = schemaToCompare.getJSONSchema();
Assert.assertFalse(JSONSchema.equals(JSONSchemaToCompare));
String jsonSchemaToCompare = schemaToCompare.toSingleLineJsonString();
Assert.assertNotEquals(jsonSchemaToCompare, jsonSchema);
}

@Test
Expand All @@ -292,7 +296,7 @@ public void testSimpleDateFormat()
TimeFormat.SIMPLE_DATE_FORMAT + ":yyyyMMdd", "Date");
Schema schema = new Schema.SchemaBuilder().setSchemaName("testSchema")
.addTime(incomingTimeGranularitySpec, outgoingTimeGranularitySpec).build();
String jsonSchema = schema.getJSONSchema();
String jsonSchema = schema.toSingleLineJsonString();
Schema schemaFromJson = Schema.fromString(jsonSchema);
Assert.assertEquals(schemaFromJson, schema);
Assert.assertEquals(schemaFromJson.hashCode(), schema.hashCode());
Expand All @@ -311,7 +315,7 @@ public void testByteType()
expectedSchema.addField(new MetricFieldSpec("nonEmptyDefault", FieldSpec.DataType.BYTES, expectedNonEmptyDefault));

// Ensure that schema can be serialized and de-serialized (ie byte[] converted to String and back).
String jsonSchema = expectedSchema.getJSONSchema();
String jsonSchema = expectedSchema.toSingleLineJsonString();
Schema actualSchema = Schema.fromString(jsonSchema);

Assert.assertEquals(actualSchema.getFieldSpecFor("noDefault").getDefaultNullValue(), expectedEmptyDefault);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ public String getSchema(
if (schema == null) {
throw new ControllerApplicationException(LOGGER, "Schema not found", Response.Status.NOT_FOUND);
}
// We need to return schema.getJSONSchema(). Returning schema ends up with many extra fields, "jsonSchema" being one of them,
// Others like fieldSpecMap, etc., serialzing the entire Schema object.
return schema.getJSONSchema();
return schema.toPrettyJsonString();
}

@DELETE
Expand Down Expand Up @@ -144,7 +142,7 @@ public String validateSchema(FormDataMultiPart multiPart) {
throw new ControllerApplicationException(LOGGER, "Invalid schema. Check controller logs",
Response.Status.BAD_REQUEST);
}
return schema.getJSONSchema();
return schema.toPrettyJsonString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public String getTableSchema(
@ApiParam(value = "Table name (without type)", required = true) @PathParam("tableName") String tableName) {
Schema schema = pinotHelixResourceManager.getTableSchema(tableName);
if (schema != null) {
return schema.getJSONSchema();
return schema.toPrettyJsonString();
}
throw new ControllerApplicationException(LOGGER, "Schema not found for table: " + tableName,
Response.Status.NOT_FOUND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void setUp() {
public void testBadContentType() {
Schema schema = createDummySchema("testSchema");
try {
sendPostRequest(_controllerRequestURLBuilder.forSchemaCreate(), schema.getJSONSchema());
sendPostRequest(_controllerRequestURLBuilder.forSchemaCreate(), schema.toSingleLineJsonString());
} catch (IOException e) {
// TODO The Jersey API returns 400, so we need to check return code here not a string.
// Assert.assertTrue(e.getMessage().startsWith("Server returned HTTP response code: 415"), e.getMessage());
Expand All @@ -59,40 +59,41 @@ public void testCreateUpdateSchema()
String schemaName = "testSchema";
Schema schema = createDummySchema(schemaName);
String url = _controllerRequestURLBuilder.forSchemaCreate();
PostMethod postMethod = sendMultipartPostRequest(url, schema.toString());
PostMethod postMethod = sendMultipartPostRequest(url, schema.toSingleLineJsonString());
Assert.assertEquals(postMethod.getStatusCode(), 200);

schema.addField(new DimensionFieldSpec("NewColumn", FieldSpec.DataType.STRING, true));
postMethod = sendMultipartPostRequest(url, schema.toString());
postMethod = sendMultipartPostRequest(url, schema.toSingleLineJsonString());
Assert.assertEquals(postMethod.getStatusCode(), 200);

String schemaStr = sendGetRequest(_controllerRequestURLBuilder.forSchemaGet(schemaName));
Schema readSchema = Schema.fromString(schemaStr);
Schema inputSchema = Schema.fromString(schema.toString());
Schema inputSchema = Schema.fromString(schema.toSingleLineJsonString());
Assert.assertEquals(readSchema, inputSchema);
Assert.assertTrue(readSchema.getFieldSpecMap().containsKey("NewColumn"));

final String yetAnotherColumn = "YetAnotherColumn";
Assert.assertFalse(readSchema.getFieldSpecMap().containsKey(yetAnotherColumn));
schema.addField(new DimensionFieldSpec(yetAnotherColumn, FieldSpec.DataType.STRING, true));
PutMethod putMethod =
sendMultipartPutRequest(_controllerRequestURLBuilder.forSchemaUpdate(schemaName), schema.toString());
PutMethod putMethod = sendMultipartPutRequest(_controllerRequestURLBuilder.forSchemaUpdate(schemaName),
schema.toSingleLineJsonString());
Assert.assertEquals(putMethod.getStatusCode(), 200);
// verify some more...
schemaStr = sendGetRequest(_controllerRequestURLBuilder.forSchemaGet(schemaName));
readSchema = Schema.fromString(schemaStr);
inputSchema = Schema.fromString(schema.toString());
inputSchema = Schema.fromString(schema.toSingleLineJsonString());
Assert.assertEquals(readSchema, inputSchema);
Assert.assertTrue(readSchema.getFieldSpecMap().containsKey(yetAnotherColumn));

// error cases
putMethod = sendMultipartPutRequest(_controllerRequestURLBuilder.forSchemaUpdate(schemaName),
schema.toString().substring(1));
schema.toSingleLineJsonString().substring(1));
// invalid json
Assert.assertEquals(putMethod.getStatusCode(), 400);

schema.setSchemaName("differentSchemaName");
putMethod = sendMultipartPutRequest(_controllerRequestURLBuilder.forSchemaUpdate(schemaName), schema.toString());
putMethod = sendMultipartPutRequest(_controllerRequestURLBuilder.forSchemaUpdate(schemaName),
schema.toSingleLineJsonString());
Assert.assertEquals(putMethod.getStatusCode(), 400);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,16 @@ protected Schema createDummySchema(String tableName) {

protected void addDummySchema(String tableName)
throws IOException {
addSchema(createDummySchema(tableName).getJSONSchema());
addSchema(createDummySchema(tableName));
}

/**
* Add a schema to the controller.
* @param schemaJson the json string representing the schema
*/
protected void addSchema(String schemaJson)
protected void addSchema(Schema schema)
throws IOException {
String url = _controllerRequestURLBuilder.forSchemaCreate();
PostMethod postMethod = sendMultipartPostRequest(url, schemaJson);
PostMethod postMethod = sendMultipartPostRequest(url, schema.toSingleLineJsonString());
Assert.assertEquals(postMethod.getStatusCode(), 200);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static String getSegmentName(JobContext context) {
}

public static void setSchema(Job job, Schema schema) {
job.getConfiguration().set(PinotOutputFormat.SCHEMA, schema.getJSONSchema());
job.getConfiguration().set(PinotOutputFormat.SCHEMA, schema.toSingleLineJsonString());
}

public static String getSchema(JobContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void run()

job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataFiles.size());
if (_dataSchema != null) {
job.getConfiguration().set(JobConfigConstants.SCHEMA, _dataSchema.toString());
job.getConfiguration().set(JobConfigConstants.SCHEMA, _dataSchema.toSingleLineJsonString());
}
setOutputPath(job.getConfiguration());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public boolean execute()
LOGGER.info("Store Pinot schema to file: {}", outputFile.getAbsolutePath());

try (FileWriter writer = new FileWriter(outputFile)) {
writer.write(schema.toString());
writer.write(schema.toPrettyJsonString());
}

return true;
Expand Down