Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
[DECISION-264] Avro Serialization for example data
Browse files Browse the repository at this point in the history
  • Loading branch information
josepablofernandez committed Mar 28, 2016
1 parent 25dea12 commit 0fec6f5
Showing 1 changed file with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.stratio.decision.commons.avro.Action;
import com.stratio.decision.commons.avro.ColumnType;
import com.stratio.decision.commons.avro.InsertMessage;
import com.stratio.decision.commons.constants.InternalTopic;
import com.stratio.decision.commons.constants.STREAM_OPERATIONS;
import com.stratio.decision.commons.messages.ColumnNameTypeValue;
Expand All @@ -26,23 +29,31 @@
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.serialization.Serializer;

public class HardwareEmulatorMain {

public static final String sensorDataStream = "sensor_grid";

public static void main(String[] args) {

int multiplicator = 1;
Producer<String, String> producer = null;
Producer<String, byte[]> avroProducer = null;

if (args != null && args.length > 1) {
multiplicator = Integer.valueOf(args[0]);
producer = new Producer<String, String>(createProducerConfig(args[1]));
avroProducer = new Producer<>(createProducerConfig(args[1]));
} else {
throw new RuntimeException(
"Usage: \n param 1 - data multiplier (default = 1) \n param 2 - kafka broker list");
Expand Down Expand Up @@ -90,45 +101,51 @@ public static void main(String[] args) {
System.out.println("TOTAL VALUES: " + totalValues);

ExecutorService es = Executors.newFixedThreadPool(10);
es.execute(new DataSender(producer, cpuValues, "cpu"));
es.execute(new DataSender(producer, memValues, "memory"));
es.execute(new DataSender(producer, diskUsageValues, "disk usage"));
es.execute(new DataSender(producer, memorySwapValues, "memory swap"));
es.execute(new DataSender(producer, threadsValues, "threads"));
es.execute(new DataSender(producer, runningValues, "running processes"));
es.execute(new DataSender(avroProducer, cpuValues, "cpu"));
es.execute(new DataSender(avroProducer, memValues, "memory"));
es.execute(new DataSender(avroProducer, diskUsageValues, "disk usage"));
es.execute(new DataSender(avroProducer, memorySwapValues, "memory swap"));
es.execute(new DataSender(avroProducer, threadsValues, "threads"));
es.execute(new DataSender(avroProducer, runningValues, "running processes"));

es.shutdown();
}

private static ProducerConfig createProducerConfig(String brokerList) {
Properties properties = new Properties();
properties.put("serializer.class", "kafka.serializer.StringEncoder");

properties.put("metadata.broker.list", brokerList);
properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

return new ProducerConfig(properties);

}

private static class DataSender implements Runnable {

private final Producer<String, String> producer;
private final Producer<String, byte[]> avroProducer;
private final List<Double> values;
private final String name;

public DataSender(Producer<String, String> producer, List<Double> values, String name) {
public DataSender(Producer<String, byte[]> avroProducer, List<Double> values, String name) {
super();
this.producer = producer;
this.avroProducer = avroProducer;
this.values = values;
this.name = name;
}

@Override
public void run() {
Gson gson = new Gson();

for (StratioStreamingMessage message : generateStratioStreamingMessages(values, name)) {
KeyedMessage<String, String> busMessage = new KeyedMessage<String, String>(
InternalTopic.TOPIC_DATA.getTopicName(), STREAM_OPERATIONS.MANIPULATION.INSERT,
gson.toJson(message));
producer.send(busMessage);

KeyedMessage<String, byte[]> busMessage = new KeyedMessage<>(
InternalTopic.TOPIC_DATA.getTopicName(), serializeStratioStreamingMessage(message));

avroProducer.send(busMessage);
}

}
Expand Down Expand Up @@ -157,5 +174,50 @@ private List<StratioStreamingMessage> generateStratioStreamingMessages(List<Doub
return result;
}

private byte[] serializeStratioStreamingMessage(StratioStreamingMessage message){

List<com.stratio.decision.commons.avro.ColumnType> columns = null;

if (message.getColumns() != null) {
columns = new java.util.ArrayList<>();
ColumnType c = null;

for (ColumnNameTypeValue messageColumn : message.getColumns()) {
c = new ColumnType(messageColumn.getColumn(), messageColumn.getValue().toString(),
messageColumn.getValue()
.getClass().getName());
columns.add(c);
}

}

InsertMessage insertMessage = new InsertMessage(message.getOperation(), message.getStreamName(), message
.getSession_id(), message.getTimestamp(), columns, null);

return getInsertMessageBytes(insertMessage);

}

private byte[] getInsertMessageBytes(InsertMessage insertMessage){

byte[] result = null;

ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
SpecificDatumWriter writer = new SpecificDatumWriter<InsertMessage>(InsertMessage.getClassSchema());

try {
writer.write(insertMessage, encoder);
encoder.flush();
out.close();
result = out.toByteArray();
}catch (IOException e){
return null;
}

return result;

}

}
}

0 comments on commit 0fec6f5

Please sign in to comment.