Skip to content

Commit

Permalink
fix: metadata getter/setters for jackson to work
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 8, 2019
1 parent 806b70b commit 0431f12
Showing 1 changed file with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@

final class KafkaStreamsMetadata {
static KafkaStreamsMetadata create(
Collection<org.apache.kafka.streams.state.StreamsMetadata> other) {
Collection<org.apache.kafka.streams.state.StreamsMetadata> other) {
KafkaStreamsMetadata metadata = new KafkaStreamsMetadata();
metadata.metadata = other.stream().map(StreamsMetadata::create).collect(Collectors.toSet());
return metadata;
}

Set<StreamsMetadata> metadata;

Set<StreamsMetadata> getMetadata() {
KafkaStreamsMetadata() {
}

public void setMetadata(
Set<StreamsMetadata> metadata) {
this.metadata = metadata;
}

public Set<StreamsMetadata> getMetadata() {
return metadata;
}

Expand All @@ -37,15 +45,31 @@ static StreamsMetadata create(org.apache.kafka.streams.state.StreamsMetadata oth
metadata.hostInfo = HostInfo.create(other.hostInfo());
metadata.storeNames = other.stateStoreNames();
metadata.topicPartitions = other.topicPartitions().stream()
.map(TopicPartition::create)
.collect(Collectors.toSet());
.map(TopicPartition::create)
.collect(Collectors.toSet());
return metadata;
}

HostInfo hostInfo;
Set<String> storeNames;
Set<TopicPartition> topicPartitions;

StreamsMetadata() {
}

public void setHostInfo(HostInfo hostInfo) {
this.hostInfo = hostInfo;
}

public void setStoreNames(Set<String> storeNames) {
this.storeNames = storeNames;
}

public void setTopicPartitions(
Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
}

public HostInfo getHostInfo() {
return hostInfo;
}
Expand All @@ -69,6 +93,17 @@ static HostInfo create(org.apache.kafka.streams.state.HostInfo other) {
String host;
Integer port;

HostInfo() {
}

public void setHost(String host) {
this.host = host;
}

public void setPort(Integer port) {
this.port = port;
}

public String getHost() {
return host;
}
Expand All @@ -86,9 +121,20 @@ static TopicPartition create(org.apache.kafka.common.TopicPartition other) {
return topicPartition;
}

TopicPartition() {
}

String topic;
Integer partition;

public void setTopic(String topic) {
this.topic = topic;
}

public void setPartition(Integer partition) {
this.partition = partition;
}

public String getTopic() {
return topic;
}
Expand Down

0 comments on commit 0431f12

Please sign in to comment.