Skip to content

[destination-postgres]: fails with Runtime Error on record with JSON array of values #49459

Open
@schuylerfried

Description

Connector Name

destination-postgres

Connector Version

2.4.0

What step the error happened?

During the sync

Relevant information

You can pretty easily see the bug by looking at the source code:

private fun adaptValueNodes(fieldName: String?, node: JsonNode, parentNode: JsonNode?) {
        if (node.isValueNode && filterValueNode.test(node)) {
            if (fieldName != null) {
                val adaptedNode = valueNodeAdapter.apply(node)
                (parentNode as ObjectNode?)!!.set<JsonNode>(fieldName, adaptedNode)
            } else throw RuntimeException("Unexpected value node without fieldName. Node: $node")
        } else if (node.isArray) {
            node.elements().forEachRemaining { arrayNode: JsonNode ->
                adaptValueNodes(null, arrayNode, node)
            }
        } else {
            node.fields().forEachRemaining { stringJsonNodeEntry: Map.Entry<String?, JsonNode> ->
                adaptValueNodes(stringJsonNodeEntry.key, stringJsonNodeEntry.value, node)
            }
        }
    }

Calling this function with a JsonNode with JSON like

{
    "x": [1, 2, 3]
}

will throw the "Unexpected value node without fieldName." runtime error, as the recursive call to adaptValueNodes for each array element calls the function with a fieldName of null. As long as each array element is not a pure value node, you'll be fine.

Relevant log output

Here's the relevant traceback


Stack Trace: java.lang.RuntimeException: Unexpected value node without fieldName. Node: "<redacted>"
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes(PostgresDataTransformer.kt:67)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.access$adaptValueNodes(PostgresDataTransformer.kt:16)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$1.invoke(PostgresDataTransformer.kt:70)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$1.invoke(PostgresDataTransformer.kt:69)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes$lambda$2(PostgresDataTransformer.kt:69)
        at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1085)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes(PostgresDataTransformer.kt:69)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.access$adaptValueNodes(PostgresDataTransformer.kt:16)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$2.invoke(PostgresDataTransformer.kt:74)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer$adaptValueNodes$2.invoke(PostgresDataTransformer.kt:73)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes$lambda$3(PostgresDataTransformer.kt:73)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptValueNodes(PostgresDataTransformer.kt:73)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adaptAllValueNodes(PostgresDataTransformer.kt:50)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.adapt(PostgresDataTransformer.kt:45)
        at io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDataTransformer.transform(PostgresDataTransformer.kt:39)
        at io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer.deserializeAirbyteMessage(AirbyteMessageDeserializer.kt:76)
        at io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept(AsyncStreamConsumer.kt:107)
        at io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer$Companion$appendOnClose$1.accept(SerializedAirbyteMessageConsumer.kt:65)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:412)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion$consumeWriteStream$2$1.invoke(IntegrationRunner.kt:410)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$lambda$1$lambda$0(IntegrationRunner.kt:410)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core(IntegrationRunner.kt:410)
        at io.airbyte.cdk.integrations.base.IntegrationRunner$Companion.consumeWriteStream$io_airbyte_airbyte_cdk_java_airbyte_cdk_airbyte_cdk_core$default(IntegrationRunner.kt:402)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:215)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:119)
        at io.airbyte.cdk.integrations.base.IntegrationRunner.run$default(IntegrationRunner.kt:113)
        at io.airbyte.integrations.destination.postgres.PostgresDestination$Companion.main(PostgresDestination.kt:220)
        at io.airbyte.integrations.destination.postgres.PostgresDestination.main(PostgresDestination.kt)



Relevant document (from mongo source)

  {
    "_id": {
      "$oid": "..."
    },
    "timeStamp": {
      "$date": "2024-03-02T21:19:37.361Z"
    },
    "tenant_id": "...",
    "leadId": "...",
    "msg_ID": [
      "..."
    ],
    "start_unit_id": "...",
    "end_unit_id": "...",
    "apartment_id": "...",
    "original_query": [
      "<redacted - matches log line>"
    ],
    "user_query": "...",
    "response": "...",
    "error_message": null,
    "event": "...",
    "processing_start_time": {
      "$date": "2024-03-02T21:19:26.846Z"
    },
    "processing_end_time": {
      "$date": "2024-03-02T21:19:37.361Z"
    },
    "tool_selector_output": [
      "..."
    ],
    "tool_selector_subqueries": [
      "..."
    ],
    "final_query_plan": [
      {
        "tool": "...",
        "sub_query": "..."
      }
    ],
    "tool_responses": [
      {
        "query": "...",
        "response_instruction": "..."
      }
    ],
    "request_id": "xxxx",
    "__v": 2
  }

Contribute

  • Yes, I want to contribute

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions