Open
Description
Connector Name
destination-postgres
Connector Version
2.4.0
What step the error happened?
During the sync
Relevant information
You can pretty easily see the bug by looking at the source code:
private fun adaptValueNodes(fieldName: String?, node: JsonNode, parentNode: JsonNode?) {
if (node.isValueNode && filterValueNode.test(node)) {
if (fieldName != null) {
val adaptedNode = valueNodeAdapter.apply(node)
(parentNode as ObjectNode?)!!.set<JsonNode>(fieldName, adaptedNode)
} else throw RuntimeException("Unexpected value node without fieldName. Node: $node")
} else if (node.isArray) {
node.elements().forEachRemaining { arrayNode: JsonNode ->
adaptValueNodes(null, arrayNode, node)
}
} else {
node.fields().forEachRemaining { stringJsonNodeEntry: Map.Entry<String?, JsonNode> ->
adaptValueNodes(stringJsonNodeEntry.key, stringJsonNodeEntry.value, node)
}
}
}
Calling this function with a JsonNode with JSON like
{
"x": [1, 2, 3]
}
will throw the "Unexpected value node without fieldName." runtime error, as the recursive call to adaptValueNodes for each array element calls the function with a fieldName of null. As long as each array element is not a pure value node, you'll be fine.
Relevant log output
Here's the relevant traceback
Stack Trace: java.lang.RuntimeException: Unexpected value node without fieldName. Node: "<redacted>"
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes(PostgresDataTransformer.kt:67)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.access$adaptValueNodes(PostgresDataTransformer.kt:16)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$1.invoke(PostgresDataTransformer.kt:70)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$1.invoke(PostgresDataTransformer.kt:69)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes$lambda$2(PostgresDataTransformer.kt:69)
at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1085)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes(PostgresDataTransformer.kt:69)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.access$adaptValueNodes(PostgresDataTransformer.kt:16)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$2.invoke(PostgresDataTransformer.kt:74)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$2.invoke(PostgresDataTransformer.kt:73)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes$lambda$3(PostgresDataTransformer.kt:73)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes(PostgresDataTransformer.kt:73)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptAllValueNodes(PostgresDataTransformer.kt:50)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adapt(PostgresDataTransformer.kt:45)
at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.transform(PostgresDataTransformer.kt:39)
at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage(AirbyteMessageDeserializer.kt:76)
at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept(AsyncStreamConsumer.kt:107)
at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.accept(SerializedAirbyteMessageConsumer.kt:65)
at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:412)
at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:410)
at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:410)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:410)
at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:402)
at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215)
at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:220)
at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt)
Relevant document (from mongo source)
{
"_id": {
"$oid": "..."
},
"timeStamp": {
"$date": "2024-03-02T21:19:37.361Z"
},
"tenant_id": "...",
"leadId": "...",
"msg_ID": [
"..."
],
"start_unit_id": "...",
"end_unit_id": "...",
"apartment_id": "...",
"original_query": [
"<redacted - matches log line>"
],
"user_query": "...",
"response": "...",
"error_message": null,
"event": "...",
"processing_start_time": {
"$date": "2024-03-02T21:19:26.846Z"
},
"processing_end_time": {
"$date": "2024-03-02T21:19:37.361Z"
},
"tool_selector_output": [
"..."
],
"tool_selector_subqueries": [
"..."
],
"final_query_plan": [
{
"tool": "...",
"sub_query": "..."
}
],
"tool_responses": [
{
"query": "...",
"response_instruction": "..."
}
],
"request_id": "xxxx",
"__v": 2
}
Contribute
- Yes, I want to contribute