Skip to content

Commit

Permalink
Send Log statements to log Topic for Java Functions (#1447)
Browse files Browse the repository at this point in the history
* Added LogTopic support to Java functions

* UseLogAppender instead of PulsarAppender

* Reverted changes to PulsarAppender

* No need to take dep on pulsar-log4j plugin

* Start the appender

* Fixed comments

* Reverted back logging function change
  • Loading branch information
srkukarni authored and sijie committed Mar 27, 2018
1 parent 4bf44d9 commit 86b9546
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -85,6 +88,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
@Getter(AccessLevel.PACKAGE)
private final Map<String, Consumer> inputConsumers;
private LinkedList<String> inputTopicsToResubscribe = null;
private LogAppender logAppender;

// provide tables for storing states
private final String stateStorageServiceUrl;
Expand Down Expand Up @@ -188,6 +192,8 @@ JavaInstance setupJavaInstance() throws Exception {
startOutputProducer();
// start the input consumer
startInputConsumer();
// start any log topic handler
setupLogHandler();

return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers);
}
Expand Down Expand Up @@ -262,10 +268,12 @@ public void run() {
}
long processAt = System.currentTimeMillis();
stats.incrementProcessed(processAt);
addLogTopicHandler();
result = javaInstance.handleMessage(
msg.getActualMessage().getMessageId(),
msg.getTopicName(),
input);
removeLogTopicHandler();

long doneProcessing = System.currentTimeMillis();
log.debug("Got result: {}", result.getResult());
Expand Down Expand Up @@ -755,4 +763,34 @@ private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
}
}
}

private void setupLogHandler() {
if (instanceConfig.getFunctionConfig().getLogTopic() != null &&
!instanceConfig.getFunctionConfig().getLogTopic().isEmpty()) {
logAppender = new LogAppender(client, instanceConfig.getFunctionConfig().getLogTopic(),
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()));
logAppender.start();
}
}

private void addLogTopicHandler() {
if (logAppender == null) return;
LoggerContext context = LoggerContext.getContext(false);
Configuration config = context.getConfiguration();
config.addAppender(logAppender);
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.addAppender(logAppender, null, null);
}
config.getRootLogger().addAppender(logAppender, null, null);
}

private void removeLogTopicHandler() {
if (logAppender == null) return;
LoggerContext context = LoggerContext.getContext(false);
Configuration config = context.getConfiguration();
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.removeAppender(logAppender.getName());
}
config.getRootLogger().removeAppender(logAppender.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import org.apache.logging.log4j.core.*;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
* LogAppender class that is used to send log statements from Pulsar Functions logger
* to a log topic.
*/
public class LogAppender implements Appender {
private PulsarClient pulsarClient;
private String logTopic;
private String fqn;
private State state;
private ErrorHandler errorHandler;
private Producer<byte[]> producer;

public LogAppender(PulsarClient pulsarClient, String logTopic, String fqn) {
this.pulsarClient = pulsarClient;
this.logTopic = logTopic;
this.fqn = fqn;
}

@Override
public void append(LogEvent logEvent) {
producer.sendAsync(logEvent.getMessage().getFormattedMessage().getBytes());
}

@Override
public String getName() {
return fqn;
}

@Override
public Layout<? extends Serializable> getLayout() {
return null;
}

@Override
public boolean ignoreExceptions() {
return false;
}

@Override
public ErrorHandler getHandler() {
return errorHandler;
}

@Override
public void setHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

@Override
public State getState() {
return state;
}

@Override
public void initialize() {
this.state = State.INITIALIZED;
}

@Override
public void start() {
this.state = State.STARTING;
try {
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic(logTopic)
.producerName(fqn)
.blockIfQueueFull(false)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
producer = producerBuilder.create();
} catch (Exception e) {
throw new RuntimeException("Error starting LogTopic Producer", e);
}
this.state = State.STARTED;
}

@Override
public void stop() {
this.state = State.STOPPING;
producer.closeAsync();
producer = null;
this.state = State.STOPPED;
}

@Override
public boolean isStarted() {
return state == State.STARTED;
}

@Override
public boolean isStopped() {
return state == State.STOPPED;
}
}

0 comments on commit 86b9546

Please sign in to comment.