Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abolish scala. make test harnesses java, avoid any link-time dependency on scala #65

Merged
merged 3 commits into from
Sep 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,12 @@ group = 'com.linkedin.kafka.clients'
import com.linkedin.gradle.build.DistributeTask

apply plugin: 'java'
apply plugin: 'scala'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'checkstyle'
apply plugin: 'maven-publish'
apply plugin: 'com.jfrog.artifactory'

//project layout

sourceSets {
test {
java {
srcDirs = []
}
//needed because our java tests depend on scala classes, so must be compiled by scala
scala {
srcDirs = ['src/test/java', 'src/test/scala']
}
}
}

//project dependencies

repositories {
Expand All @@ -48,10 +33,12 @@ repositories {
dependencies {
compile 'org.apache.kafka:kafka-clients:0.10.2.1'

testCompile 'org.apache.kafka:kafka_2.10:0.10.2.1:test'
testCompile 'org.apache.kafka:kafka_2.10:0.10.2.1'
testCompile 'org.bouncycastle:bcpkix-jdk15on:1.54'
testCompile 'commons-io:commons-io:2.5'
testCompile 'com.101tec:zkclient:0.7'
testCompile 'org.testng:testng:6.11'

testRuntime 'org.apache.kafka:kafka_2.10:0.10.2.1'
}

//test configurations
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.0.6
version=0.0.7
org.gradle.daemon=false
org.gradle.parallel=false
org.gradle.jvmargs=-Xms512m -Xmx512m
Original file line number Diff line number Diff line change
Expand Up @@ -173,35 +173,36 @@ private void waitForNextTick(long now) {

// protected methods
/**
* Get the current audit stats.
* @return the current audit stats.
*/
protected AuditStats currentStats() {
return _currentStats;
}

/**
* Get the next audit stats.
* @return the next audit stats.
*/
protected AuditStats nextStats() {
return _nextStats;
}

/**
* Get the time when the next tick will occur.
* @return the time when the next tick will occur.
*/
protected long nextTick() {
return _nextTick;
}

/**
* Get the total number of ticks occurred so far.
* @return the total number of ticks occurred so far.
*/
protected long ticks() {
return _ticks;
}

/**
* Manually tick the abstract auditor and get the AuditStats of the last reporting interval.
* @return stats for the previous interval
*/
protected AuditStats tickAndGetStats() {
// We only allow one thread to tick.
Expand Down Expand Up @@ -262,6 +263,7 @@ protected boolean isShuttingDown() {
/**
* Create a new AuditStats. This method will be called when the abstract auditor rolls out a new AuditStat for a new
* reporting interval.
* @return new stats
*/
protected abstract AuditStats createAuditStats();

Expand All @@ -276,6 +278,7 @@ protected boolean isShuttingDown() {
* @param timestamp the timestamp of the record being audited.
* @param messageCount the number of records being audited.
* @param bytesCount the number of bytes being audited.
* @param auditType type of activity to report
*
* @return An object that can be served as an key in a {@link java.util.HashMap}. Returning null means skipping the
* auditing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ public interface LiKafkaConsumer<K, V> extends Consumer<K, V> {
* partition. With the internal state, the consumer guarantees the messages after the offset sought to will
* be redelivered. To reduce memory footprint, the consumer only tracks a configurable number of messages.
* <p>
* <p>
* When this method is invoked, the behavior of this method is following:
* <ul>
* <li> If no message has ever been consumed from the partition, the consumer will seek to the user passed in
Expand Down Expand Up @@ -501,7 +500,6 @@ public interface LiKafkaConsumer<K, V> extends Consumer<K, V> {
* The safe offset of a message <b>M</b> is the largest offset that guarantees all the messages after <b>M</b> will
* be consumed if user starts to consume from that offset.
* <p>
* <p>
* For example, consider the following message/segment sequence:
* <ul>
* <li>offset 0 ----&gt; message0_segment0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
public interface UUIDFactory<K, V> extends Configurable {

/**
* Get a random UUID.
* @return a non-null UUID
*/
UUID createUuid();

/**
* Get the UUID based on the producer record.
* @param record a producer record to get the UUID from/by
* @return a UUID based on the producer record.
*/
UUID getUuid(ProducerRecord<K, V> record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class ConfigureAuditorTest extends AbstractKafkaClientsIntegrationTestHar
@BeforeMethod
@Override
public void setUp() {
brokerList_$eq("localhost:9092");
super.setUp();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,10 @@
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand All @@ -44,23 +31,39 @@
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;


/**
* Integration test for LiKafkaConsumer
*/
Expand All @@ -81,7 +84,7 @@ public class LiKafkaConsumerIntegrationTest extends AbstractKafkaClientsIntegrat
@Override
public Properties overridingProps() {
Properties props = new Properties();
props.setProperty(KafkaConfig.NumPartitionsProp(), Integer.toString(NUM_PARTITIONS));
props.setProperty("num.partitions", Integer.toString(NUM_PARTITIONS));
return props;
}

Expand Down Expand Up @@ -1052,6 +1055,14 @@ private void produceSyntheticMessages(String topic) {
producer.close();
}

//TODO - remove / refactor
private Producer<byte[], byte[]> createKafkaProducer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we default all tests to SSL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. this method is rather hacky and creates an ad-hoc producer<byte[], byte[]> where the rest of the harness code creates <string, string>, and so should be cleaned up somehow.
however, our tests currently rely on this heavily nd i didnt want ot make this PR any bogger.

Properties props = new Properties();
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
Properties finalProducerProps = getProducerProperties(props);
return new KafkaProducer(finalProducerProps);
}

private class RebalanceTestConsumerThread extends Thread {
private final LiKafkaConsumer<String, String> _consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import java.io.IOException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.testng.Assert;
import scala.Option;
import org.testng.annotations.Test;


@Test
public class LiKafkaConsumerSSLIntegrationTest extends LiKafkaConsumerIntegrationTest {

private File _trustStoreFile;
Expand All @@ -24,8 +26,8 @@ public LiKafkaConsumerSSLIntegrationTest() {
}

@Override
public Option<File> trustStoreFile() {
return Option.apply(_trustStoreFile);
public File trustStoreFile() {
return _trustStoreFile;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -34,8 +33,6 @@

/**
* The integration test for large message.
* This class has to be in scala source directory because it depends on the scala classes which will only be
* compiled after java classes are compiled.
*/
public class LargeMessageIntegrationTest extends AbstractKafkaClientsIntegrationTestHarness {
private static final String TOPIC = "TestLargeMessage";
Expand All @@ -50,7 +47,7 @@ public int clusterSize() {
@Override
public Properties overridingProps() {
Properties props = new Properties();
props.setProperty(KafkaConfig.NumPartitionsProp(), Integer.toString(NUM_PARTITIONS));
props.setProperty("num.partitions", Integer.toString(NUM_PARTITIONS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems safer to use the method instead of hard coding it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that prop is in scala code. it is unclean ...

return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.io.IOException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.testng.Assert;
import scala.Option;


public class LiKafkaProducerSSLIntegrationTest extends LiKafkaProducerIntegrationTest {
Expand All @@ -25,8 +24,8 @@ public LiKafkaProducerSSLIntegrationTest() {
}

@Override
public Option<File> trustStoreFile() {
return Option.apply(_trustStoreFile);
public File trustStoreFile() {
return _trustStoreFile;
}

@Override
Expand Down
Loading