Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Connection failure Management #8

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Add the library to your project using Maven Central:
<dependency>
<groupId>com.newrelic.labs</groupId>
<artifactId>custom-log4j2-appender</artifactId>
<version>1.0.4</version>
<version>1.0.5</version>
</dependency>
```

Expand All @@ -38,7 +38,7 @@ Or, if using a locally built JAR file:
<dependency>
<groupId>com.newrelic.labs</groupId>
<artifactId>custom-log4j2-appender</artifactId>
<version>1.0.4</version>
<version>1.0.5</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/custom-log4j2-appender.jar</systemPath>
</dependency>
Expand Down Expand Up @@ -69,7 +69,9 @@ Replace `[your-api-key]` with the ingest key obtained from the New Relic platfor
maxMessageSize="1048576"
flushInterval="120000"
customFields="businessGroup=exampleGroup,environment=production"
mergeCustomFields="true">
mergeCustomFields="true"
maxRetries="5"
timeout="5000">
<PatternLayout pattern="[%d{MM-dd HH:mm:ss}] %-5p %c{1} [%t]: %m%n"/>
</NewRelicBatchingAppender>
</Appenders>
Expand All @@ -95,6 +97,8 @@ Replace `[your-api-key]` with the ingest key obtained from the New Relic platfor
| flushInterval | No | 120000 | Interval (in milliseconds) at which the log entries are flushed to New Relic|
| customFields | No | | Add extra context to your logs with custom fields, represented as comma-separated name-value pairs.|
| mergeCustomFields | No | false | (Default: false) All custom fields will be available as `custom.field1`, `custom.field2` else `field1` , `field2` will be available as the main attributes |
| maxRetries | No | 3 | Maximum number of retry attempts for sending logs. If logs cannot be sent successfully within the specified retries, they will be discarded. |
| timeout | No | 30000 | Connection timeout (in milliseconds) for HTTP requests to New Relic's logging service. Adjust based on network conditions and server response times. |



Expand Down
4 changes: 2 additions & 2 deletions custom-log4j2-appender/build-jar.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jar {
'Implementation-Title': 'Custom Log4j2 Appender',
'Implementation-Vendor': 'New Relic Labs',
'Implementation-Vendor-Id': 'com.newrelic.labs',
'Implementation-Version': '1.0.4'
'Implementation-Version': '1.0.5'
)
}
}
Expand Down Expand Up @@ -53,7 +53,7 @@ publishing {

groupId = 'com.newrelic.labs'
artifactId = 'custom-log4j2-appender'
version = '1.0.4'
version = '1.0.5'

pom {
name = 'Custom Log4j2 Appender'
Expand Down
4 changes: 2 additions & 2 deletions custom-log4j2-appender/build-shadowJar.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ shadowJar {
'Implementation-Title': 'Custom Log4j2 Appender',
'Implementation-Vendor': 'New Relic Labs',
'Implementation-Vendor-Id': 'com.newrelic.labs',
'Implementation-Version': '1.0.4'
'Implementation-Version': '1.0.5'
)
}
}
Expand Down Expand Up @@ -55,7 +55,7 @@ publishing {

groupId = 'com.newrelic.labs'
artifactId = 'custom-log4j2-appender'
version = '1.0.4'
version = '1.0.5'

pom {
name = 'Custom Log4j2 Appender'
Expand Down
4 changes: 2 additions & 2 deletions custom-log4j2-appender/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jar {
'Implementation-Title': 'Custom Log4j2 Appender',
'Implementation-Vendor': 'New Relic Labs',
'Implementation-Vendor-Id': 'com.newrelic.labs',
'Implementation-Version': '1.0.4'
'Implementation-Version': '1.0.5'
)
}
}
Expand Down Expand Up @@ -53,7 +53,7 @@ publishing {

groupId = 'com.newrelic.labs'
artifactId = 'custom-log4j2-appender'
version = '1.0.4'
version = '1.0.5'

pom {
name = 'Custom Log4j2 Appender'
Expand Down
4 changes: 2 additions & 2 deletions custom-log4j2-appender/publish-jar-legacy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cp build-jar.gradle build.gradle
# Set variables
GROUP_ID="com.newrelic.labs"
ARTIFACT_ID="custom-log4j2-appender"
VERSION="1.0.4"
VERSION="1.0.5"
KEY_ID="0ED9FD74E81E6D83FAE25F235640EA0B1C631C6F" # Replace with your actual key ID

# Get the current directory (assuming the script is run from the custom-log4j2-appender directory)
Expand Down Expand Up @@ -163,4 +163,4 @@ cd "$CURRENT_DIR/bundle"
echo "Creating jar bundle-$VERSION.jar"
jar -cvf ../bundle-$VERSION.jar *
cd ..
echo "Artifacts prepared and zipped successfully. You can now upload bundle-$VERSION.jar to Sonatype OSSRH."
echo "Artifacts prepared and zipped successfully. You can now upload bundle-$VERSION.jar to Sonatype OSSRH https://oss.sonatype.org/#welcome ."
2 changes: 1 addition & 1 deletion custom-log4j2-appender/publish-jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cp build-jar.gradle build.gradle
# Set variables
GROUP_ID="io.github.newrelic-experimental"
ARTIFACT_ID="custom-log4j2-appender"
VERSION="1.0.4"
VERSION="1.0.5"
KEY_ID="0ED9FD74E81E6D83FAE25F235640EA0B1C631C6F" # Replace with your actual key ID

# Get the current directory (assuming the script is run from the custom-log4j2-appender directory)
Expand Down
2 changes: 1 addition & 1 deletion custom-log4j2-appender/publish-shadowJar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cp build-shadowJar.gradle build.gradle
# Set variables
GROUP_ID="io.github.newrelic-experimental"
ARTIFACT_ID="custom-log4j2-appender"
VERSION="1.0.4"
VERSION="1.0.5"
KEY_ID="0ED9FD74E81E6D83FAE25F235640EA0B1C631C6F" # Replace with your actual key ID

# Get the current directory (assuming the script is run from the custom-log4j2-appender directory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -22,23 +23,34 @@ public class LogForwarder {
private final BlockingQueue<LogEntry> logQueue;
private final String apiKey;
private final String apiURL;
private final OkHttpClient client = new OkHttpClient();
private final ObjectMapper objectMapper = new ObjectMapper();
private final OkHttpClient client;
private final ObjectMapper objectMapper;
private final long maxMessageSize;
// 1.0.5
private final int maxRetries;
private final long timeout; // New parameter for connection timeout
// 1.0.5

public LogForwarder(String apiKey, String apiURL, long maxMessageSize, BlockingQueue<LogEntry> logQueue) {
public LogForwarder(String apiKey, String apiURL, long maxMessageSize, BlockingQueue<LogEntry> logQueue,
int maxRetries, long timeout) {
this.apiKey = apiKey;
this.apiURL = apiURL;
this.maxMessageSize = maxMessageSize;
this.logQueue = logQueue;
this.maxRetries = maxRetries;
this.timeout = timeout;
this.client = new OkHttpClient.Builder().connectTimeout(timeout, TimeUnit.MILLISECONDS).build();
this.objectMapper = new ObjectMapper();

}

public boolean isInitialized() {
return apiKey != null && apiURL != null;
}

public void flush(List<LogEntry> logEntries, boolean mergeCustomFields, Map<String, Object> customFields) {
public boolean flush(List<LogEntry> logEntries, boolean mergeCustomFields, Map<String, Object> customFields) {
InetAddress localhost = null;
boolean bStatus = false;
try {
localhost = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
Expand Down Expand Up @@ -79,19 +91,25 @@ public void flush(List<LogEntry> logEntries, boolean mergeCustomFields, Map<Stri
byte[] compressedPayload = gzipCompress(jsonPayload);

if (compressedPayload.length > maxMessageSize) {
splitAndSendLogs(logEntries, mergeCustomFields, customFields);
// System.out.println("splitAndSendLogs: Called size exceeded " +
// compressedPayload.length);
bStatus = splitAndSendLogs(logEntries, mergeCustomFields, customFields);
} else {
sendLogs(logEvents);
bStatus = sendLogs(logEvents);
}
} catch (IOException e) {
System.err.println("Error during log forwarding: " + e.getMessage());
bStatus = false;
}
return bStatus;
}

private void splitAndSendLogs(List<LogEntry> logEntries, boolean mergeCustomFields,
private boolean splitAndSendLogs(List<LogEntry> logEntries, boolean mergeCustomFields,
Map<String, Object> customFields) throws IOException {

List<LogEntry> subBatch = new ArrayList<>();
int currentSize = 0;
boolean bStatus = false;
for (LogEntry entry : logEntries) {
Map<String, Object> logEvent = objectMapper.convertValue(entry, LowercaseKeyMap.class);
logEvent.put("hostname", InetAddress.getLocalHost().getHostName());
Expand All @@ -118,16 +136,17 @@ private void splitAndSendLogs(List<LogEntry> logEntries, boolean mergeCustomFiel
String entryJson = objectMapper.writeValueAsString(logEvent);
int entrySize = gzipCompress(entryJson).length;
if (currentSize + entrySize > maxMessageSize) {
sendLogs(convertToLogEvents(subBatch, mergeCustomFields, customFields));
bStatus = sendLogs(convertToLogEvents(subBatch, mergeCustomFields, customFields));
subBatch.clear();
currentSize = 0;
}
subBatch.add(entry);
currentSize += entrySize;
}
if (!subBatch.isEmpty()) {
sendLogs(convertToLogEvents(subBatch, mergeCustomFields, customFields));
bStatus = sendLogs(convertToLogEvents(subBatch, mergeCustomFields, customFields));
}
return bStatus;
}

private List<Map<String, Object>> convertToLogEvents(List<LogEntry> logEntries, boolean mergeCustomFields,
Expand Down Expand Up @@ -168,7 +187,7 @@ private List<Map<String, Object>> convertToLogEvents(List<LogEntry> logEntries,
return logEvents;
}

private void sendLogs(List<Map<String, Object>> logEvents) throws IOException {
private boolean sendLogs(List<Map<String, Object>> logEvents) throws IOException {
String jsonPayload = objectMapper.writeValueAsString(logEvents);
byte[] compressedPayload = gzipCompress(jsonPayload);

Expand All @@ -183,6 +202,7 @@ private void sendLogs(List<Map<String, Object>> logEvents) throws IOException {
System.err.println("Failed to send logs to New Relic: " + response.code() + " - " + response.message());
System.err.println("Response body: " + response.body().string());
requeueLogs(logEvents); // Requeue logs if the response is not successful
return false;
} else {
// Comment out the following lines to prevent infinite loop
// LocalDateTime timestamp = LocalDateTime.now();
Expand All @@ -194,7 +214,9 @@ private void sendLogs(List<Map<String, Object>> logEvents) throws IOException {
} catch (IOException e) {
System.err.println("Error during log forwarding: " + e.getMessage());
requeueLogs(logEvents); // Requeue logs if an exception occurs
return false;
}
return true;
}

private void requeueLogs(List<Map<String, Object>> logEvents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ public class NewRelicBatchingAppender extends AbstractAppender {
private final String name;
private final LogForwarder logForwarder;
private static final Logger logger = StatusLogger.getLogger();
private int attempt = 0; // Track attempts across harvest cycles

private final int batchSize;
private final long maxMessageSize;
private final long flushInterval;
private final Map<String, Object> customFields;
private final int maxRetries;

private static final int DEFAULT_BATCH_SIZE = 5000;
private static final long DEFAULT_MAX_MESSAGE_SIZE = 1048576; // 1 MB
Expand All @@ -53,20 +55,22 @@ public class NewRelicBatchingAppender extends AbstractAppender {

protected NewRelicBatchingAppender(String name, Filter filter, Layout<? extends Serializable> layout,
final boolean ignoreExceptions, String apiKey, String apiUrl, String applicationName, Integer batchSize,
Long maxMessageSize, Long flushInterval, String logType, String customFields, Boolean mergeCustomFields) {
Long maxMessageSize, Long flushInterval, String logType, String customFields, Boolean mergeCustomFields,
int maxRetries, long timeout) {
super(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY);
this.queue = new LinkedBlockingQueue<>();
this.apiKey = apiKey;
this.apiUrl = apiUrl;
this.applicationName = applicationName;
this.name = name;
this.maxRetries = maxRetries;
this.batchSize = batchSize != null ? batchSize : DEFAULT_BATCH_SIZE;
this.maxMessageSize = maxMessageSize != null ? maxMessageSize : DEFAULT_MAX_MESSAGE_SIZE;
this.flushInterval = flushInterval != null ? flushInterval : DEFAULT_FLUSH_INTERVAL;
this.logType = ((logType != null) && (logType.length() > 0)) ? logType : LOG_TYPE;
this.customFields = parsecustomFields(customFields);
this.mergeCustomFields = mergeCustomFields != null ? mergeCustomFields : MERGE_CUSTOM_FIELDS;
this.logForwarder = new LogForwarder(apiKey, apiUrl, this.maxMessageSize, this.queue);
this.logForwarder = new LogForwarder(apiKey, apiUrl, this.maxMessageSize, this.queue, maxRetries, timeout);
startFlushingTask();
}

Expand All @@ -93,7 +97,9 @@ public static NewRelicBatchingAppender createAppender(@PluginAttribute("name") S
@PluginAttribute(value = "maxMessageSize") Long maxMessageSize, @PluginAttribute("logType") String logType,
@PluginAttribute(value = "flushInterval") Long flushInterval,
@PluginAttribute("customFields") String customFields,
@PluginAttribute(value = "mergeCustomFields") Boolean mergeCustomFields) {
@PluginAttribute(value = "mergeCustomFields") Boolean mergeCustomFields,
@PluginAttribute(value = "maxRetries") Integer maxRetries,
@PluginAttribute(value = "timeout") Long timeout) {

if (name == null) {
logger.error("No name provided for NewRelicBatchingAppender");
Expand All @@ -109,8 +115,11 @@ public static NewRelicBatchingAppender createAppender(@PluginAttribute("name") S
return null;
}

int retries = maxRetries != null ? maxRetries : 3; // Default to 3 retries if not specified
long connectionTimeout = timeout != null ? timeout : 30000; // Default to 30 seconds if not specified

return new NewRelicBatchingAppender(name, filter, layout, true, apiKey, apiUrl, applicationName, batchSize,
maxMessageSize, flushInterval, logType, customFields, mergeCustomFields);
maxMessageSize, flushInterval, logType, customFields, mergeCustomFields, retries, connectionTimeout);
}

@Override
Expand Down Expand Up @@ -141,20 +150,35 @@ public void append(LogEvent event) {
new LogEntry(message, applicationName, muleAppName, logType, timestamp, custom, mergeCustomFields));
// Check if the batch size is reached and flush immediately
if (queue.size() >= batchSize) {
flushQueue();
if (attempt == 0) {
boolean bStatus = flushQueue();
if (!bStatus) {
attempt++;
logger.warn("Attempt {} failed. Retrying in next harvest cycle...", attempt);
logger.warn("batchsize check is now disabled due to unhealthy connection");
} else {
logger.debug("Batchsize-check: Successfully sent logs.");
}
} else {
logger.debug(
"Skipping {}/{} sending log entries to New Relic ( batchsize check ) - harvest cycle did not report healthy connection",
batchSize, queue.size());
}
}
} catch (Exception e) {
logger.error("Unable to insert log entry into log queue. ", e);
}
}

private void flushQueue() {
private boolean flushQueue() {
List<LogEntry> batch = new ArrayList<>();
boolean bStatus = false;
queue.drainTo(batch, batchSize);
if (!batch.isEmpty()) {
logger.debug("Flushing {} log entries to New Relic", batch.size());
logForwarder.flush(batch, mergeCustomFields, customFields);
logger.debug("Flushing {}/{} log entries to New Relic", batch.size(), queue.size() + batch.size());
bStatus = logForwarder.flush(batch, mergeCustomFields, customFields);
}
return bStatus;
}

private Map<String, Object> extractcustom(LogEvent event) {
Expand All @@ -180,21 +204,44 @@ private boolean checkEntryConditions() {

private void startFlushingTask() {
Runnable flushTask = new Runnable() {

@Override
public void run() {
while (true) {
try {
logger.debug("Flushing task running...");
List<LogEntry> batch = new ArrayList<>();
queue.drainTo(batch, batchSize);

if (!batch.isEmpty()) {
logger.debug("Flushing {} log entries to New Relic", batch.size());
logForwarder.flush(batch, mergeCustomFields, customFields);
logger.debug("Flushing {}/{} log entries to New Relic", batch.size(),
queue.size() + batch.size());
boolean success = logForwarder.flush(batch, mergeCustomFields, customFields);

if (success) {
logger.debug("Harvest Cycle: Successfully sent logs.");
attempt = 0; // Reset attempt counter on success
} else {
attempt++;
logger.warn("Attempt {} failed. Retrying in next cycle...", attempt);
}

if (attempt >= maxRetries) {
logger.error("Exhausted all retry attempts across cycles. Discarding {} logs.",
queue.size());
queue.clear(); // Clear the queue after maxRetries
attempt = 0; // Reset attempt counter after discarding

logger.debug("Queue Size: {} ", queue.size());
}
}

// Wait for the next harvest cycle
Thread.sleep(flushInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Flushing task interrupted", e);
break;
}
}
}
Expand Down
Loading