Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ImportJobTest is not flaky by checking WriteToStore metric and requesting adequate resources for testing #332

Merged
Merged
12 changes: 12 additions & 0 deletions .prow/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ presubmits:
containers:
- image: maven:3.6-jdk-8
command: [".prow/scripts/test-core-ingestion.sh"]
resources:
requests:
cpu: "1000m"
memory: "512Mi"
limit:
memory: "4096Mi"

- name: test-serving
decorate: true
Expand Down Expand Up @@ -97,6 +103,12 @@ presubmits:
containers:
- image: maven:3.6-jdk-8
command: [".prow/scripts/test-end-to-end.sh"]
resources:
requests:
cpu: "1000m"
memory: "1024Mi"
limit:
memory: "4096Mi"

# TODO: do a release when a git tag is pushed
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,29 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;

@AutoValue
public abstract class WriteToStore extends PTransform<PCollection<FeatureRow>, PDone> {

private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteToStore.class);

public static final String METRIC_NAMESPACE = "WriteToStore";
public static final String ELEMENTS_WRITTEN_METRIC = "elements_written";

private static final Counter elementsWritten = Metrics
.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC);

public abstract Store getStore();

public abstract Map<String, FeatureSetSpec> getFeatureSetSpecs();
Expand Down Expand Up @@ -140,6 +151,12 @@ public void processElement(ProcessContext context) {
break;
}

input.apply("IncrementWriteToStoreElementsWrittenCounter",
MapElements.into(TypeDescriptors.booleans()).via((FeatureRow row) -> {
elementsWritten.inc();
return true;
}));

return PDone.in(input.getPipeline());
}
}
89 changes: 52 additions & 37 deletions ingestion/src/test/java/feast/ingestion/ImportJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import feast.types.ValueProto.ValueType.Enum;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -49,6 +48,7 @@
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -77,10 +77,15 @@ public class ImportJobTest {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6380;

// Expected time taken for the import job to be ready to receive Feature Row input
private static final int IMPORT_JOB_READY_DURATION_SEC = 5;
// Expected time taken for the import job to finish writing to Store
private static final int IMPORT_JOB_RUN_DURATION_SEC = 30;
// No of samples of feature row that will be generated and used for testing.
// Note that larger no of samples will increase completion time for ingestion.
private static final int IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE = 128;
// Expected time taken for the import job to be ready to receive Feature Row input.
private static final int IMPORT_JOB_READY_DURATION_SEC = 10;
// The interval between checks for import job to finish writing elements to store.
private static final int IMPORT_JOB_CHECK_INTERVAL_DURATION_SEC = 5;
// Max duration to wait until the import job finishes writing to Store.
private static final int IMPORT_JOB_MAX_RUN_DURATION_SEC = 300;

@BeforeClass
public static void setup() throws IOException, InterruptedException {
Expand Down Expand Up @@ -161,50 +166,60 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
options.setProject("");
options.setBlockOnRun(false);

int inputSize = 128;
List<FeatureRow> input = new ArrayList<>();
Map<RedisKey, FeatureRow> expected = new HashMap<>();

LOGGER.info("Generating test data ...");
IntStream.range(0, inputSize)
.forEach(
i -> {
FeatureRow randomRow = TestUtil.createRandomFeatureRow(spec);
RedisKey redisKey = TestUtil.createRedisKey(spec, randomRow);
input.add(randomRow);
expected.put(redisKey, randomRow);
});
IntStream.range(0, IMPORT_JOB_SAMPLE_FEATURE_ROW_SIZE).forEach(i -> {
FeatureRow randomRow = TestUtil.createRandomFeatureRow(spec);
RedisKey redisKey = TestUtil.createRedisKey(spec, randomRow);
input.add(randomRow);
expected.put(redisKey, randomRow);
});

LOGGER.info("Starting Import Job with the following options: {}", options.toString());
PipelineResult pipelineResult = ImportJob.runPipeline(options);
Thread.sleep(Duration.ofSeconds(IMPORT_JOB_READY_DURATION_SEC).toMillis());
Thread.sleep(Duration.standardSeconds(IMPORT_JOB_READY_DURATION_SEC).getMillis());
Assert.assertEquals(pipelineResult.getState(), State.RUNNING);

LOGGER.info("Publishing {} Feature Row messages to Kafka ...", input.size());
TestUtil.publishFeatureRowsToKafka(
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_TOPIC,
input,
ByteArraySerializer.class,
KAFKA_PUBLISH_TIMEOUT_SEC);
Thread.sleep(Duration.ofSeconds(IMPORT_JOB_RUN_DURATION_SEC).toMillis());
TestUtil.publishFeatureRowsToKafka(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC, input,
ByteArraySerializer.class, KAFKA_PUBLISH_TIMEOUT_SEC);
TestUtil.waitUntilAllElementsAreWrittenToStore(pipelineResult,
Duration.standardSeconds(IMPORT_JOB_MAX_RUN_DURATION_SEC),
Duration.standardSeconds(IMPORT_JOB_CHECK_INTERVAL_DURATION_SEC));

LOGGER.info("Validating the actual values written to Redis ...");
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
expected.forEach(
(key, expectedValue) -> {
byte[] actualByteValue = jedis.get(key.toByteArray());
Assert.assertNotNull("Key not found in Redis: " + key, actualByteValue);
FeatureRow actualValue = null;
try {
actualValue = FeatureRow.parseFrom(actualByteValue);
} catch (InvalidProtocolBufferException e) {
Assert.fail(
String.format(
"Actual Redis value cannot be parsed as FeatureRow, key: %s, value :%s",
key, new String(actualByteValue, StandardCharsets.UTF_8)));
}
Assert.assertEquals(expectedValue, actualValue);
});
expected.forEach((key, expectedValue) -> {

// Ensure ingested key exists.
byte[] actualByteValue = jedis.get(key.toByteArray());
if (actualByteValue == null) {
LOGGER.error("Key not found in Redis: " + key);
LOGGER.info("Redis INFO:");
LOGGER.info(jedis.info());
String randomKey = jedis.randomKey();
if (randomKey != null) {
LOGGER.info("Sample random key, value (for debugging purpose):");
LOGGER.info("Key: " + randomKey);
LOGGER.info("Value: " + jedis.get(randomKey));
}
Assert.fail("Missing key in Redis.");
}

// Ensure value is a valid serialized FeatureRow object.
FeatureRow actualValue = null;
try {
actualValue = FeatureRow.parseFrom(actualByteValue);
} catch (InvalidProtocolBufferException e) {
Assert.fail(String
.format("Actual Redis value cannot be parsed as FeatureRow, key: %s, value :%s",
key, new String(actualByteValue, StandardCharsets.UTF_8)));
}

// Ensure the retrieved FeatureRow is equal to the ingested FeatureRow.
Assert.assertEquals(expectedValue, actualValue);
});
}
}
66 changes: 66 additions & 0 deletions ingestion/src/test/java/feast/test/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.ingestion.transform.WriteToStore;
import feast.storage.RedisProto.RedisKey;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRow.Builder;
Expand All @@ -41,13 +42,18 @@
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.joda.time.Duration;
import redis.embedded.RedisServer;

@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -343,4 +349,64 @@ public static Field field(String name, Object value, ValueType.Enum valueType) {
throw new IllegalStateException("Unexpected valueType: " + value.getClass());
}
}

/**
* This blocking method waits until an ImportJob pipeline has written all elements to the store.
* <p>
* The pipeline must be in the RUNNING state before calling this method.
*
* @param pipelineResult result of running the Pipeline
* @param maxWaitDuration wait until this max amount of duration
* @throws InterruptedException if the thread is interruped while waiting
*/
public static void waitUntilAllElementsAreWrittenToStore(PipelineResult pipelineResult,
Duration maxWaitDuration, Duration checkInterval) throws InterruptedException {
if (pipelineResult.getState().isTerminal()) {
return;
}

if (!pipelineResult.getState().equals(State.RUNNING)) {
throw new IllegalArgumentException(
"Pipeline must be in RUNNING state before calling this method.");
}

MetricResults metricResults;
try {
metricResults = pipelineResult.metrics();
} catch (UnsupportedOperationException e) {
// Runner does not support metrics so we just wait as long as we are allowed to.
Thread.sleep(maxWaitDuration.getMillis());
return;
}

String writeToStoreMetric =
WriteToStore.METRIC_NAMESPACE + ":" + WriteToStore.ELEMENTS_WRITTEN_METRIC;
long committed = 0;
long maxSystemTimeMillis = System.currentTimeMillis() + maxWaitDuration.getMillis();

while (System.currentTimeMillis() <= maxSystemTimeMillis) {
Thread.sleep(checkInterval.getMillis());

for (MetricResult<Long> metricResult : metricResults.allMetrics().getCounters()) {
// We are only concerned with the metric: count of elements that have been
// written to the store.
if (!metricResult.getName().toString().contains(writeToStoreMetric)) {
continue;
}
try {
// If between check interval, no more changes in the no of committed elements
// we can assume the pipeline has finished writing all the elements to store.
if (metricResult.getCommitted() == committed) {
return;
}
committed = metricResult.getCommitted();
break;
} catch (UnsupportedOperationException e) {
// Runner does not support committed metrics so we just wait as long as we are allowed to.
Thread.sleep(maxWaitDuration.getMillis());
return;
}
}
}
}
}