Skip to content

Commit

Permalink
NIFI-6780: Add QueryNiFiReportingTask, RecordSinkService, S2S and DB …
Browse files Browse the repository at this point in the history
…impls (#3826)

* NIFI-6780: Introduce RecordSinkService to separate format and destination, refactor common S2S utils

Added QueryNiFiReportingTask to SQL query NiFi status and metrics

Add PROCESSOR_STATUS and PROCESS_GROUP_STATUS

Add CONNECTION_STATUS_PREDICTIONS

check for null predictions

Add ConnectionStatusRecursiveIterator

Fix issue w/ duplicate iterator outputs

Refactored query interfaces, fixed assembly POM

Rebased v master, fixed isBackPressureEnabled and Checkstyle/RAT errors

Add DatabaseRecordSink service (#13)

* Add DatabaseRecordSink service

* Incorporated review comments

* NIFI-6780: Add/fix docs, cleanup warnings, fixed some table definitions

* Added bundle profile, remove predictions table if not enabled

* Added doc for which tables are available when

This closes #3826.
  • Loading branch information
mattyb149 authored and YolandaMDavis committed Oct 22, 2019
1 parent d148fb1 commit ace23c3
Show file tree
Hide file tree
Showing 64 changed files with 6,173 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,11 @@ public interface ReportingContext extends PropertyContext {
* has not yet been established
*/
String getClusterNodeIdentifier();

/**
* @return true if reporting analytics (connection status predictions, e.g.) are enabled, false otherwise
*/
default boolean isAnalyticsEnabled() {
return false;
}
}
15 changes: 15 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,21 @@ language governing permissions and limitations under the License. -->
</dependency>
</dependencies>
</profile>
<profile>
<id>include-sql-reporting</id>
<!-- This profile handles the inclusion of nifi-sql-reporting artifacts. -->
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-sql-reporting-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</profile>
<profile>
<id>rpm</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
private final VariableRegistry variableRegistry;
private final ParameterLookup parameterLookup;
private final boolean analyticsEnabled;

public StandardReportingContext(final FlowController flowController, final BulletinRepository bulletinRepository,
final Map<PropertyDescriptor, String> properties, final ReportingTask reportingTask,
Expand All @@ -67,6 +68,7 @@ public StandardReportingContext(final FlowController flowController, final Bulle
this.reportingTask = reportingTask;
this.variableRegistry = variableRegistry;
this.parameterLookup = parameterLookup;
this.analyticsEnabled = flowController.getStatusAnalyticsEngine() != null;
preparedQueries = new HashMap<>();

for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
Expand Down Expand Up @@ -180,4 +182,9 @@ public String getClusterNodeIdentifier() {
final NodeIdentifier nodeId = flowController.getNodeId();
return nodeId == null ? null : nodeId.getId();
}

@Override
public boolean isAnalyticsEnabled() {
return this.analyticsEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,21 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import javax.json.JsonArray;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import javax.net.ssl.SSLContext;

import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
Expand All @@ -72,9 +61,6 @@
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
Expand All @@ -92,101 +78,6 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
protected static final String DESTINATION_URL_PATH = "/nifi";
protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";

static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder()
.name("Destination URL")
.displayName("Destination URL")
.description("The URL of the destination NiFi instance or, if clustered, a comma-separated list of address in the format "
+ "of http(s)://host:port/nifi. This destination URL will only be used to initiate the Site-to-Site connection. The "
+ "data sent by this reporting task will be load-balanced on all the nodes of the destination (if clustered).")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new NiFiUrlValidator())
.build();
static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder()
.name("Input Port Name")
.displayName("Input Port Name")
.description("The name of the Input Port to deliver data to.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.displayName("SSL Context Service")
.description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder()
.name("Instance URL")
.displayName("Instance URL")
.description("The URL of this instance to use in the Content URI of each event.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("http://${hostname(true)}:8080/nifi")
.addValidator(new NiFiUrlValidator())
.build();
static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder()
.name("Compress Events")
.displayName("Compress Events")
.description("Indicates whether or not to compress the data being sent.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.displayName("Communications Timeout")
.description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction")
.required(true)
.defaultValue("30 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.displayName("Batch Size")
.description("Specifies how many records to send in a single batch, at most.")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder()
.name("s2s-transport-protocol")
.displayName("Transport Protocol")
.description("Specifies which transport protocol to use for Site-to-Site communication.")
.required(true)
.allowableValues(SiteToSiteTransportProtocol.values())
.defaultValue(SiteToSiteTransportProtocol.RAW.name())
.build();
static final PropertyDescriptor HTTP_PROXY_HOSTNAME = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-hostname")
.displayName("HTTP Proxy hostname")
.description("Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor HTTP_PROXY_PORT = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-port")
.displayName("HTTP Proxy port")
.description("Specify the proxy server's port number, optional. If not specified, default port 80 will be used.")
.required(false)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-username")
.displayName("HTTP Proxy username")
.description("Specify an user name to connect to the proxy server, optional.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("s2s-http-proxy-password")
.displayName("HTTP Proxy password")
.description("Specify an user password to connect to the proxy server, optional.")
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
Expand All @@ -210,61 +101,26 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DESTINATION_URL);
properties.add(PORT_NAME);
properties.add(SSL_CONTEXT);
properties.add(INSTANCE_URL);
properties.add(COMPRESS);
properties.add(TIMEOUT);
properties.add(BATCH_SIZE);
properties.add(TRANSPORT_PROTOCOL);
properties.add(HTTP_PROXY_HOSTNAME);
properties.add(HTTP_PROXY_PORT);
properties.add(HTTP_PROXY_USERNAME);
properties.add(HTTP_PROXY_PASSWORD);
properties.add(SiteToSiteUtils.DESTINATION_URL);
properties.add(SiteToSiteUtils.PORT_NAME);
properties.add(SiteToSiteUtils.SSL_CONTEXT);
properties.add(SiteToSiteUtils.INSTANCE_URL);
properties.add(SiteToSiteUtils.COMPRESS);
properties.add(SiteToSiteUtils.TIMEOUT);
properties.add(SiteToSiteUtils.BATCH_SIZE);
properties.add(SiteToSiteUtils.TRANSPORT_PROTOCOL);
properties.add(SiteToSiteUtils.HTTP_PROXY_HOSTNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PORT);
properties.add(SiteToSiteUtils.HTTP_PROXY_USERNAME);
properties.add(SiteToSiteUtils.HTTP_PROXY_PASSWORD);
properties.add(RECORD_WRITER);
properties.add(ALLOW_NULL_VALUES);
return properties;
}

@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
final ComponentLog logger = getLogger();
final EventReporter eventReporter = new EventReporter() {
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
switch (severity) {
case WARNING:
logger.warn(message);
break;
case ERROR:
logger.error(message);
break;
default:
break;
}
}
};

final String destinationUrl = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();

final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
: new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(),
context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue());

siteToSiteClient = new SiteToSiteClient.Builder()
.urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
.portName(context.getProperty(PORT_NAME).getValue())
.useCompression(context.getProperty(COMPRESS).asBoolean())
.eventReporter(eventReporter)
.sslContext(sslContext)
.timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(mode)
.httpProxy(httpProxy)
.build();
siteToSiteClient = SiteToSiteUtils.getClient(context, getLogger());
}

@OnStopped
Expand Down Expand Up @@ -315,28 +171,6 @@ protected byte[] getData(final ReportingContext context, InputStream in, Map<Str
}
}

static class NiFiUrlValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
try {
SiteToSiteRestApiClient.parseClusterUrls(value);
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(true)
.build();
} catch (IllegalArgumentException ex) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(ex.getLocalizedMessage())
.build();
}
}
}

protected void addField(final JsonObjectBuilder builder, final String key, final Boolean value, final boolean allowNullValues) {
if (value != null) {
builder.add(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.scheduling.SchedulingStrategy;

@Tags({"bulletin", "site", "site to site"})
Expand All @@ -68,15 +67,6 @@
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask {

static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("nifi")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

private volatile long lastSentBulletinId = -1L;

public SiteToSiteBulletinReportingTask() throws IOException {
Expand All @@ -87,8 +77,8 @@ public SiteToSiteBulletinReportingTask() throws IOException {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
properties.remove(BATCH_SIZE);
properties.add(SiteToSiteUtils.PLATFORM);
properties.remove(SiteToSiteUtils.BATCH_SIZE);
return properties;
}

Expand Down Expand Up @@ -125,7 +115,7 @@ public void onTrigger(final ReportingContext context) {
return;
}

final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
final String platform = context.getProperty(SiteToSiteUtils.PLATFORM).evaluateAttributeExpressions().getValue();
final Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();

final Map<String, ?> config = Collections.emptyMap();
Expand All @@ -148,11 +138,11 @@ public void onTrigger(final ReportingContext context) {

// Send the JSON document for the current batch
try {
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
return;
}
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will attempt to send data later");
return;
}

final Map<String, String> attributes = new HashMap<>();
final String transactionId = UUID.randomUUID().toString();
Expand Down
Loading

0 comments on commit ace23c3

Please sign in to comment.