Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-greenplum-service-nar</artifactId>
<artifactId>nifi-greengage-service-nar</artifactId>
<version>1.28.1</version>
<type>nar</type>
</dependency>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.type;

import org.apache.calcite.rel.type.RelDataTypeSystemImpl;

public class DecimalTypeSystem extends RelDataTypeSystemImpl {
private static final int MAX_PRECISION = 38;
private static final int MAX_SCALE = 38;

@Override
public int getMaxNumericPrecision() {
return MAX_PRECISION;
}

@Override
public int getMaxNumericScale() {
return MAX_SCALE;
}

@Override
public boolean shouldConvertRaggedUnionTypesToVarying() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
public interface ColumnDataType {
String getName();

GreenplumDataType getType();
GreengageDataType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package org.apache.nifi.gpfdist.metadata;

public enum GreenplumDataType {
public enum GreengageDataType {
BOOLEAN,
MONEY,
UUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface GpfdistService extends ControllerService {

RecordSinkProvider getRecordSinkProvider();

GreenplumService getGreenplumTableService();
GreengageService getGreengageTableService();

TransferDataQueryExecutor getQueryExecutor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.sql.DatabaseMetaData;

public interface GreenplumService {
public interface GreengageService {

DatabaseMetaData getDatabaseMetadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<packaging>jar</packaging>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-greenplum-service-bundle</artifactId>
<artifactId>nifi-greengage-service-bundle</artifactId>
<version>1.28.1</version>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.nifi.gpfdist.metadata.ColumnDescription;
import org.apache.nifi.gpfdist.metadata.TableDescription;
import org.apache.nifi.gpfdist.service.GpfdistService;
import org.apache.nifi.gpfdist.service.GreenplumService;
import org.apache.nifi.gpfdist.service.GreengageService;
import org.apache.nifi.gpfdist.service.RecordSink;
import org.apache.nifi.gpfdist.service.TransferDataQueryExecutor;
import org.apache.nifi.gpfdist.service.load.context.WriteContext;
Expand All @@ -57,45 +57,45 @@
import java.util.stream.Collectors;

import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.gpfdist.service.util.GreenplumUtil.QUOTE;
import static org.apache.nifi.gpfdist.service.util.GreengageUtil.QUOTE;

@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"record", "put", "greenplum"})
@CapabilityDescription("Writes the contents of a FlowFile to Greenplum")
public class PutGreenplumRecord extends AbstractProcessor {
@Tags({"record", "put", "greengage"})
@CapabilityDescription("Writes the contents of a FlowFile to Greengage")
public class PutGreengageRecord extends AbstractProcessor {
static final PropertyDescriptor GPFDIST_SERVICE = new PropertyDescriptor.Builder()
.name("gpfdist-record-processing-service")
.displayName("Gpfdist Service")
.description("The Controller Service that is used to load records into greenplum.")
.description("The Controller Service that is used to load records into greengage.")
.required(true)
.identifiesControllerService(GpfdistService.class)
.build();
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("put-greenplum-record-record-reader")
.name("put-greengage-record-record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("put-greenplum-record-schema-name")
.name("put-greengage-record-schema-name")
.displayName("Schema Name")
.description("The name of the schema where the data will be loaded.")
.required(false)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("put-greenplum-record-table-name")
.name("put-greengage-record-table-name")
.displayName("Table Name")
.description("Name of the table where the data will be loaded.")
.required(true)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor TABLE_COLUMNS = new PropertyDescriptor.Builder()
.name("put-greenplum-table-columns")
.name("put-greengage-table-columns")
.displayName("Table Columns")
.description("Columns of the table where the data will be loaded.")
.required(true)
Expand All @@ -108,7 +108,7 @@ public class PutGreenplumRecord extends AbstractProcessor {
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if records cannot be loaded into Greenplum.")
.description("A FlowFile is routed to this relationship if records cannot be loaded into Greengage.")
.build();

private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
Expand Down Expand Up @@ -151,21 +151,23 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (flowFile == null) {
return;
}

final String schema = context.getProperty(SCHEMA_NAME)
.evaluateAttributeExpressions(flowFile)
.getValue();
final String table = context.getProperty(TABLE_NAME)
.evaluateAttributeExpressions(flowFile)
.getValue();
final GpfdistService gpfdistService = context.getProperty(GPFDIST_SERVICE)
.asControllerService(GpfdistService.class);
final TransferDataQueryExecutor transferDataQueryExecutor = gpfdistService.getQueryExecutor();
final GreenplumService greenplumService = gpfdistService.getGreenplumTableService();
final TableDescription tableDescription = greenplumService.getTableDescription(
context.getProperty(SCHEMA_NAME).getValue(),
context.getProperty(TABLE_NAME).getValue()
);
final GreengageService greengageService = gpfdistService.getGreengageTableService();
final TableDescription tableDescription = greengageService.getTableDescription(schema, table);
final StopWatch stopWatch = new StopWatch(true);

RecordSink recordSink = null;
try (final InputStream in = session.read(flowFile)) {
final String destinationUrl = greenplumService.getDatabaseMetadata().getURL();
final List<ColumnDescription> columnDescriptions = getColumnDescriptions(context, tableDescription);
final String destinationUrl = greengageService.getDatabaseMetadata().getURL();
final List<ColumnDescription> columnDescriptions = getColumnDescriptions(flowFile, context, tableDescription);
final RecordReader recordReader = context.getProperty(RECORD_READER_FACTORY)
.asControllerService(RecordReaderFactory.class)
.createRecordReader(flowFile, in, getLogger());
Expand All @@ -176,6 +178,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
recordSink = gpfdistService.getRecordSinkProvider()
.createRecordSink(tableDescription, columnDescriptions, readerSchema);
recordSinks.add(recordSink);
WriteContext writeContext = (WriteContext) recordSink.getContext();

CompletableFuture<Void> queryLoadFuture = transferDataQueryExecutor.execute(writeContext.getMetadata());
Expand Down Expand Up @@ -231,9 +234,12 @@ private CompletableFuture<Throwable> withErrorRecording(CompletableFuture<?> fut
});
}

private List<ColumnDescription> getColumnDescriptions(ProcessContext context,
private List<ColumnDescription> getColumnDescriptions(FlowFile flowFile,
ProcessContext context,
TableDescription tableDescription) {
List<String> columns = Arrays.stream(context.getProperty(TABLE_COLUMNS).getValue().split(","))
List<String> columns = Arrays.stream(context.getProperty(TABLE_COLUMNS)
.evaluateAttributeExpressions(flowFile)
.getValue().split(","))
.map(col -> col.replace(QUOTE, "").trim())
.collect(Collectors.toList());
List<ColumnDescription> columnDescriptions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ private GpfdistProperties() {
.defaultValue("60000")
.build();
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("put-greenplum-record-dcbp-service")
.name("put-greengage-record-dcbp-service")
.displayName("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain a connection to the greenplum for executing queries.")
.description("The Controller Service that is used to obtain a connection to the greengage for executing queries.")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor WRITE_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("put-greenplum-record-write-buffer-size")
.name("put-greengage-record-write-buffer-size")
.displayName("Write buffer size in bytes")
.description("Write byte buffer size for serialized records.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.nifi.gpfdist.server.DefaultGpfdistServer;
import org.apache.nifi.gpfdist.server.GpfdistServer;
import org.apache.nifi.gpfdist.server.config.GpfdistServerConfig;
import org.apache.nifi.gpfdist.service.greenplum.DefaultGreenplumService;
import org.apache.nifi.gpfdist.service.greengage.DefaultGreengageService;
import org.apache.nifi.gpfdist.service.load.context.WriteContextManager;
import org.apache.nifi.gpfdist.service.load.metadata.factory.CreateReadableExternalTableQueryFactory;
import org.apache.nifi.gpfdist.service.load.metadata.factory.DefaulGpfdistLocationFactory;
Expand Down Expand Up @@ -61,7 +61,7 @@
import static org.apache.nifi.gpfdist.service.GpfdistProperties.WRITE_BUFFER_SIZE;

@Tags({"gpfdist"})
@CapabilityDescription("Provides the ability to load data to Greenplum segments directly")
@CapabilityDescription("Provides the ability to load data to Greengage segments directly")
public class StandartGpfdistService extends AbstractControllerService implements GpfdistService {
private static final List<PropertyDescriptor> PROPERTIES;

Expand All @@ -80,7 +80,7 @@ public class StandartGpfdistService extends AbstractControllerService implements

private GpfdistServer server;
private RecordSinkProvider recordSinkProvider;
private GreenplumService greenplumService;
private GreengageService greengageService;
private TransferDataQueryExecutor queryExecutor;

@Override
Expand Down Expand Up @@ -125,7 +125,7 @@ public void onConfigured(final ConfigurationContext context) {
requestExecutorService,
logger);
server.start();
greenplumService = new DefaultGreenplumService(dbcpService, logger);
greengageService = new DefaultGreengageService(dbcpService, logger);
final DefaultGpfdistLoadMetadataFactory loadMetadataFactory =
new DefaultGpfdistLoadMetadataFactory(new DefaulGpfdistLocationFactory(new GpfdistServerConfig(server.getPort(),
server.getHost(),
Expand Down Expand Up @@ -174,8 +174,8 @@ public RecordSinkProvider getRecordSinkProvider() {
}

@Override
public GreenplumService getGreenplumTableService() {
return greenplumService;
public GreengageService getGreengageTableService() {
return greengageService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.nifi.gpfdist.service.datatype;

import org.apache.nifi.gpfdist.metadata.ColumnDataType;
import org.apache.nifi.gpfdist.metadata.GreenplumDataType;
import org.apache.nifi.gpfdist.metadata.GreengageDataType;

public class ArrayDataType
implements ColumnDataType {
Expand All @@ -32,8 +32,8 @@ public String getName() {
}

@Override
public GreenplumDataType getType() {
return GreenplumDataType.ARRAY;
public GreengageDataType getType() {
return GreengageDataType.ARRAY;
}

public ColumnDataType getElementType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.nifi.gpfdist.service.datatype;

import org.apache.nifi.gpfdist.metadata.ColumnDataType;
import org.apache.nifi.gpfdist.metadata.GreenplumDataType;
import org.apache.nifi.gpfdist.metadata.GreengageDataType;

public class BigintDataType
implements ColumnDataType {
Expand All @@ -30,8 +30,8 @@ public String getName() {
}

@Override
public GreenplumDataType getType() {
return GreenplumDataType.BIGINT;
public GreengageDataType getType() {
return GreengageDataType.BIGINT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.nifi.gpfdist.service.datatype;

import org.apache.nifi.gpfdist.metadata.ColumnDataType;
import org.apache.nifi.gpfdist.metadata.GreenplumDataType;
import org.apache.nifi.gpfdist.metadata.GreengageDataType;

import java.util.Optional;

Expand All @@ -34,8 +34,8 @@ public String getName() {
}

@Override
public GreenplumDataType getType() {
return GreenplumDataType.BIT;
public GreengageDataType getType() {
return GreengageDataType.BIT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.nifi.gpfdist.service.datatype;

import org.apache.nifi.gpfdist.metadata.ColumnDataType;
import org.apache.nifi.gpfdist.metadata.GreenplumDataType;
import org.apache.nifi.gpfdist.metadata.GreengageDataType;

public class BooleanDataType
implements ColumnDataType {
Expand All @@ -30,8 +30,8 @@ public String getName() {
}

@Override
public GreenplumDataType getType() {
return GreenplumDataType.BOOLEAN;
public GreengageDataType getType() {
return GreengageDataType.BOOLEAN;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.nifi.gpfdist.service.datatype;

import org.apache.nifi.gpfdist.metadata.ColumnDataType;
import org.apache.nifi.gpfdist.metadata.GreenplumDataType;
import org.apache.nifi.gpfdist.metadata.GreengageDataType;

public class ByteaDataType
implements ColumnDataType {
Expand All @@ -30,8 +30,8 @@ public String getName() {
}

@Override
public GreenplumDataType getType() {
return GreenplumDataType.BYTEA;
public GreengageDataType getType() {
return GreengageDataType.BYTEA;
}

@Override
Expand Down
Loading
Loading