Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ public void track(Telemetry telemetry) {
getChannel().send(telemetry);
}

/**
* Flushes possible pending Telemetries in the channel.
*/
public void flush() {
getChannel().flush();
}

/**
* Gets the channel used by the client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public interface TelemetryChannel {
* @param timeUnit The units of the 'timeout' parameter
*/
void stop(long timeout, TimeUnit timeUnit);

/**
* Flushes the data that the channel might have internally.
*/
void flush();
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ public synchronized void stop(long timeout, TimeUnit timeUnit) {
}
}

/**
* Flushes the data that the channel might have internally.
*/
@Override
public void flush() {
telemetryBuffer.flush();
}

private void writeTelemetryToDebugOutput(Telemetry telemetry) {
InternalLogger.INSTANCE.trace("InProcessTelemetryChannel sending telemetry");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TelemetriesTransmitter;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;
import com.microsoft.applicationinsights.telemetry.Telemetry;

/**
Expand Down Expand Up @@ -169,20 +170,33 @@ public void add(String telemetry) {
if (!sender.sendNow(prepareTelemetriesForSend())) {
// 'prepareTelemetriesForSend' already created a new container
// so basically we have nothing to do, the old container is lost
// TODO: internal log
InternalLogger.INSTANCE.error("Failed to send buffer data to network");
}
} else if (currentSize == 1) {
if (!sender.scheduleSend(new TelemetryBufferTelemetriesFetcher(generation), transmitBufferTimeoutInSeconds, TimeUnit.SECONDS)) {
// We cannot schedule send so we give up the Telemetry
// The reason for this is that in case the maximum buffer size is greater than 2
// than in case a new Telemetry arrives it won't trigger the schedule and might be lost too
// TODO: internal log
InternalLogger.INSTANCE.error("Failed to schedule send of the buffer to network");
telemetries.clear();
}
}
}
}

/**
* The method will flush the telemetries currently in the buffer to the {@link com.microsoft.applicationinsights.internal.channel.TelemetriesTransmitter}
*/
public void flush() {
synchronized (lock) {
if (telemetries.size() != 0) {
if (!sender.sendNow(prepareTelemetriesForSend())) {
InternalLogger.INSTANCE.error("Failed to flush buffer data to network");
}
}
}
}

/**
* The method assumes that the lock is held before calling it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,9 @@ public void send(Telemetry item)
public void stop(long timeout, TimeUnit timeUnit) {
}

@Override
public void flush() {
}

private boolean developerMode = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public void send(Telemetry item) {
@Override
public void stop(long timeout, TimeUnit timeUnit) {
}

@Override
public void flush() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ public void initialize(TelemetryContext context) {
client.track(telemetry);
}

@Test
public void testFlush() {
client.flush();

Mockito.verify(channel, Mockito.times(1)).flush();
}

// endregion Track tests

// region Private methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import com.microsoft.applicationinsights.internal.channel.TelemetriesTransmitter;
import com.microsoft.applicationinsights.telemetry.Telemetry;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -208,7 +207,6 @@ public void testSendWhenBufferIsFullInNonDeveloperMode() throws Exception {
TelemetryBuffer testedBuffer = new TelemetryBuffer(mockSender, 2, 1200);

for (int i = 0; i < 2; ++i) {
Telemetry mockTelemetry = Mockito.mock(Telemetry.class);
testedBuffer.add("mockTelemetry");
}

Expand Down Expand Up @@ -252,7 +250,6 @@ public Collection<String> getSendNowCollection() {
List<String> all = new ArrayList<String>();
List<String> expected = new ArrayList<String>();
for (int i = 0; i < 4; ++i) {
Telemetry mockTelemetry = Mockito.mock(Telemetry.class);
String mockSerializedTelemetry = "mockTelemtry" + String.valueOf(i);
all.add(mockSerializedTelemetry);

Expand Down Expand Up @@ -285,7 +282,6 @@ public void testSendWhenBufferIsFullInDeveloperMode() throws Exception {
TelemetryBuffer testedBuffer = new TelemetryBuffer(mockSender, 1, 1200);

for (int i = 0; i < 2; ++i) {
Telemetry mockTelemetry = Mockito.mock(Telemetry.class);
testedBuffer.add("mockTelemetry");
}

Expand Down Expand Up @@ -324,10 +320,49 @@ public void testSendBufferAfterTimeoutExpiresButBufferWasAlreadySent() throws Ex
TelemetryBuffer testedBuffer = new TelemetryBuffer(mockSender, 10, 3);

for (int i = 0; i < 10; ++i) {
Telemetry mockTelemetry = Mockito.mock(Telemetry.class);
testedBuffer.add("mockTelemetry");
}

mockSender.waitForFinish(6L);
}

@Test
public void testFlushWithZero() throws Exception {
TelemetriesTransmitter mockSender = Mockito.mock(TelemetriesTransmitter.class);

// Create a buffer with max buffer size of 10 and timeout of 10 seconds
TelemetryBuffer testedBuffer = new TelemetryBuffer(mockSender, 10, 3);
testedBuffer.flush();

Mockito.verify(mockSender, Mockito.never()).sendNow(anyCollectionOf(String.class));
}

@Test
public void testFlushWithOneInTheBuffer() throws Exception {
testFlushWithData(1);
}

@Test
public void testFlushWithSevenInTheBuffer() throws Exception {
testFlushWithData(7);
}

private void testFlushWithData(int expectedTelemetriesNumberInSendNow) {
MockSender mockSender = new MockSender()
.setExpectedNumberOfScheduleSendCalls(1)
.setExpectedNumberOfSendNowCalls(1)
.setExpectedTelemetriesNumberInScheduleSend(0)
.setExpectedTelemetriesNumberInSendNow(expectedTelemetriesNumberInSendNow);

// Create a buffer with max buffer size of 10 and timeout of 10 seconds
TelemetryBuffer testedBuffer = new TelemetryBuffer(mockSender, 10, 3);

for (int i = 0; i < expectedTelemetriesNumberInSendNow; ++i) {
testedBuffer.add("mockTelemetry");
}

testedBuffer.flush();

mockSender.waitForFinish(6L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package com.microsoft.applicationinsights.internal.shared;

import java.lang.Override;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -52,4 +53,8 @@ public void send(Telemetry item) {
@Override
public void stop(long timeout, TimeUnit timeUnit) {
}

@Override
public void flush() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public void stop(long timeout, TimeUnit timeUnit) {

}

@Override
public void flush() {
}

public void reset() {
telemetryItems.clear();
}
Expand Down