Skip to content

Commit

Permalink
Splits bases and updates build.gradle files (airbytehq#25649)
Browse files Browse the repository at this point in the history
* Splits bases and updates build.gradle files

* Fixed changelog out of sync

* Bumps version number and metadata files

* auto-bump connector version

* Downgraded untouched connector bumps

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii authored May 6, 2023
1 parent a682955 commit c673b0a
Show file tree
Hide file tree
Showing 131 changed files with 11,841 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 1.0.1
dockerImageTag: 1.0.2
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6604,7 +6604,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:1.0.1"
- dockerImage: "airbyte/destination-snowflake:1.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6380,7 +6380,7 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "1.0.1",
"dockerImageTag": "1.0.2",
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/snowflake",
"icon": "snowflake.svg",
"spec": {
Expand Down
5 changes: 5 additions & 0 deletions airbyte-integrations/bases/base-java-async/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*
!Dockerfile
!build
!javabase.sh
!run_with_normalization.sh
26 changes: 26 additions & 0 deletions airbyte-integrations/bases/base-java-async/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
ARG JDK_VERSION=17.0.4
FROM amazoncorretto:${JDK_VERSION}
COPY --from=airbyte/integration-base:dev /airbyte /airbyte

RUN yum install -y tar openssl && yum clean all

WORKDIR /airbyte

# Add the Datadog Java APM agent
ADD https://dtdg.co/latest-java-tracer dd-java-agent.jar

COPY javabase.sh .
COPY run_with_normalization.sh .

# airbyte base commands
ENV AIRBYTE_SPEC_CMD "/airbyte/javabase.sh --spec"
ENV AIRBYTE_CHECK_CMD "/airbyte/javabase.sh --check"
ENV AIRBYTE_DISCOVER_CMD "/airbyte/javabase.sh --discover"
ENV AIRBYTE_READ_CMD "/airbyte/javabase.sh --read"
ENV AIRBYTE_WRITE_CMD "/airbyte/javabase.sh --write"

ENV AIRBYTE_ENTRYPOINT "/airbyte/base.sh"
ENTRYPOINT ["/airbyte/base.sh"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/integration-base-java
30 changes: 30 additions & 0 deletions airbyte-integrations/bases/base-java-async/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id 'java-library'
id 'airbyte-docker'
}

dependencies {
implementation libs.airbyte.protocol
implementation project(':airbyte-config-oss:config-models-oss')
implementation project(':airbyte-commons-cli')
implementation project(':airbyte-json-validation')

implementation 'commons-cli:commons-cli:1.4'
implementation 'net.i2p.crypto:eddsa:0.3.0'
implementation 'org.apache.sshd:sshd-mina:2.8.0'
// bouncycastle is pinned to version-match the transitive dependency from kubernetes client-java
// because a version conflict causes "parameter object not a ECParameterSpec" on ssh tunnel initiation
implementation 'org.bouncycastle:bcprov-jdk15on:1.66'
implementation 'org.bouncycastle:bcpkix-jdk15on:1.66'
implementation 'org.bouncycastle:bctls-jdk15on:1.66'

implementation libs.jackson.annotations
implementation libs.connectors.testcontainers
implementation libs.connectors.testcontainers.jdbc
implementation libs.bundles.datadog

implementation files(project(':airbyte-integrations:bases:base').airbyteDocker.outputs)

testImplementation 'commons-lang:commons-lang:2.6'
implementation group: 'org.apache.logging.log4j', name: 'log4j-layout-template-json', version: '2.17.2'
}
26 changes: 26 additions & 0 deletions airbyte-integrations/bases/base-java-async/javabase.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

set -e

# if IS_CAPTURE_HEAP_DUMP_ON_ERROR is set to true, then will capture Heap dump on OutOfMemory error
if [[ $IS_CAPTURE_HEAP_DUMP_ON_ERROR = true ]]; then

arrayOfSupportedConnectors=("source-postgres" "source-mssql" "source-mysql" )

# The heap dump would be captured only in case when java-based connector fails with OutOfMemory error
if [[ " ${arrayOfSupportedConnectors[*]} " =~ " $APPLICATION " ]]; then
JAVA_OPTS=$JAVA_OPTS" -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/dump.hprof"
export JAVA_OPTS
echo "Added JAVA_OPTS=$JAVA_OPTS"
echo "APPLICATION=$APPLICATION"
fi
fi

# Wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is
# set by the dockerfile that inherits base-java, so it cannot be evaluated when base-java is built.
# We also need to make sure that stdin of the script is piped to the stdin of the java application.
if [[ $A = --write ]]; then
cat <&0 | /airbyte/bin/"$APPLICATION" "$@"
else
/airbyte/bin/"$APPLICATION" "$@"
fi
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
# Intentionally no set -e, because we want to run normalization even if the destination fails
set -o pipefail

/airbyte/base.sh $@
destination_exit_code=$?

if test "$1" != 'write'
then
normalization_exit_code=0
elif test "$NORMALIZATION_TECHNIQUE" = 'LEGACY'
then
echo '{"type": "LOG","log":{"level":"INFO","message":"Starting in-connector normalization"}}'
# the args in a write command are `write --catalog foo.json --config bar.json`
# so if we remove the `write`, we can just pass the rest directly into normalization
/airbyte/entrypoint.sh run ${@:2} --integration-type $AIRBYTE_NORMALIZATION_INTEGRATION | java -cp "/airbyte/lib/*" io.airbyte.integrations.destination.normalization.NormalizationLogParser
normalization_exit_code=$?
echo '{"type": "LOG","log":{"level":"INFO","message":"Completed in-connector normalization"}}'
else
echo '{"type": "LOG","log":{"level":"INFO","message":"Skipping in-connector normalization"}}'
normalization_exit_code=0
fi

if test $destination_exit_code -ne 0
then
exit $destination_exit_code
elif test $normalization_exit_code -ne 0
then
exit $normalization_exit_code
else
exit 0
fi
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.Integration;
import io.airbyte.protocol.models.v0.ConnectorSpecification;

public abstract class BaseConnector implements Integration {

/**
* By convention the spec is stored as a resource for java connectors. That resource is called
* spec.json.
*
* @return specification.
* @throws Exception - any exception.
*/
@Override
public ConnectorSpecification spec() throws Exception {
// return a JsonSchema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AirbyteExceptionHandler implements Thread.UncaughtExceptionHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteExceptionHandler.class);
public static final String logMessage = "Something went wrong in the connector. See the logs for more details.";

@Override
public void uncaughtException(Thread t, Throwable e) {
// This is a naive AirbyteTraceMessage emission in order to emit one when any error occurs in a
// connector.
// If a connector implements AirbyteTraceMessage emission itself, this code will result in an
// additional one being emitted.
// this is fine tho because:
// "The earliest AirbyteTraceMessage where type=error will be used to populate the FailureReason for
// the sync."
// from the spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
LOGGER.error(logMessage, e);
AirbyteTraceMessageUtility.emitSystemErrorTrace(e, logMessage);
terminate();
}

// by doing this in a separate method we can mock it to avoid closing the jvm and therefore test
// properly
protected void terminate() {
System.exit(1);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.protocol.models.v0.AirbyteMessage;

/**
* Interface for the destination's consumption of incoming records wrapped in an
* {@link io.airbyte.protocol.models.v0.AirbyteMessage}.
*
* This is via the accept method, which commonly handles parsing, validation, batching and writing
* of the transformed data to the final destination i.e. the technical system data is being written
* to.
*
* Lifecycle:
* <ul>
* <li>1. Instantiate consumer.</li>
* <li>2. start() to initialize any resources that need to be created BEFORE the consumer consumes
* any messages.</li>
* <li>3. Consumes ALL records via {@link AirbyteMessageConsumer#accept(AirbyteMessage)}</li>
* <li>4. Always (on success or failure) finalize by calling
* {@link AirbyteMessageConsumer#close()}</li>
* </ul>
* We encourage implementing this interface using the {@link FailureTrackingAirbyteMessageConsumer}
* class.
*/
public interface AirbyteMessageConsumer extends CheckedConsumer<AirbyteMessage, Exception>, AutoCloseable {

void start() throws Exception;

/**
* Consumes all {@link AirbyteMessage}s
*
* @param message {@link AirbyteMessage} to be processed
* @throws Exception
*/
@Override
void accept(AirbyteMessage message) throws Exception;

/**
* Executes at the end of consumption of all incoming streamed data regardless of success or failure
*
* @throws Exception
*/
@Override
void close() throws Exception;

/**
* Append a function to be called on {@link AirbyteMessageConsumer#close}.
*/
static AirbyteMessageConsumer appendOnClose(final AirbyteMessageConsumer consumer, final VoidCallable voidCallable) {
return new AirbyteMessageConsumer() {

@Override
public void start() throws Exception {
consumer.start();
}

@Override
public void accept(final AirbyteMessage message) throws Exception {
consumer.accept(message);
}

@Override
public void close() throws Exception {
consumer.close();
voidCallable.call();
}

};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base;

import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteTraceMessage;
import java.util.function.Consumer;
import org.apache.commons.lang3.exception.ExceptionUtils;

public final class AirbyteTraceMessageUtility {

private AirbyteTraceMessageUtility() {}

public static void emitSystemErrorTrace(final Throwable e, final String displayMessage) {
emitErrorTrace(e, displayMessage, FailureType.SYSTEM_ERROR);
}

public static void emitConfigErrorTrace(final Throwable e, final String displayMessage) {
emitErrorTrace(e, displayMessage, FailureType.CONFIG_ERROR);
}

public static void emitEstimateTrace(final long byteEstimate,
final AirbyteEstimateTraceMessage.Type type,
final long rowEstimate,
final String streamName,
final String streamNamespace) {
emitMessage(makeAirbyteMessageFromTraceMessage(
makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ESTIMATE)
.withEstimate(new AirbyteEstimateTraceMessage()
.withByteEstimate(byteEstimate)
.withType(type)
.withRowEstimate(rowEstimate)
.withName(streamName)
.withNamespace(streamNamespace))));
}

public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType));
}

public static void emitStreamStatusTrace(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) {
emitMessage(makeStreamStatusTraceAirbyteMessage(airbyteStreamStatusHolder));
}

// todo: handle the other types of trace message we'll expect in the future, see
// io.airbyte.protocol.models.v0.AirbyteTraceMessage
// & the tech spec:
// https://docs.google.com/document/d/1ctrj3Yh_GjtQ93aND-WH3ocqGxsmxyC3jfiarrF6NY0/edit#
// public void emitNotificationTrace() {}
// public void emitMetricTrace() {}

private static void emitMessage(final AirbyteMessage message) {
// Not sure why defaultOutputRecordCollector is under Destination specifically,
// but this matches usage elsewhere in base-java
final Consumer<AirbyteMessage> outputRecordCollector = Destination::defaultOutputRecordCollector;
outputRecordCollector.accept(message);
}

private static AirbyteMessage makeErrorTraceAirbyteMessage(
final Throwable e,
final String displayMessage,
final FailureType failureType) {

return makeAirbyteMessageFromTraceMessage(
makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ERROR)
.withError(new AirbyteErrorTraceMessage()
.withFailureType(failureType)
.withMessage(displayMessage)
.withInternalMessage(e.toString())
.withStackTrace(ExceptionUtils.getStackTrace(e))));
}

private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) {
return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage());
}

private static AirbyteMessage makeAirbyteMessageFromTraceMessage(final AirbyteTraceMessage airbyteTraceMessage) {
return new AirbyteMessage().withType(Type.TRACE).withTrace(airbyteTraceMessage);
}

private static AirbyteTraceMessage makeAirbyteTraceMessage(final AirbyteTraceMessage.Type traceMessageType) {
return new AirbyteTraceMessage().withType(traceMessageType).withEmittedAt((double) System.currentTimeMillis());
}

}
Loading

0 comments on commit c673b0a

Please sign in to comment.