Skip to content

Commit

Permalink
Push experimental support for bulk vote forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
Ichbinjoe committed Dec 10, 2018
1 parent 4cd2ce5 commit 590b761
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vexsoftware.votifier.forwarding;

import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import com.vexsoftware.votifier.NuVotifierBukkit;
import com.vexsoftware.votifier.model.Vote;
import com.vexsoftware.votifier.util.GsonInst;
Expand All @@ -10,6 +11,7 @@
import org.bukkit.plugin.Plugin;
import org.bukkit.plugin.messaging.PluginMessageListener;

import java.io.CharArrayReader;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;

Expand Down Expand Up @@ -37,11 +39,13 @@ public void halt() {

@Override
public void onPluginMessageReceived(String s, Player player, byte[] bytes) {
try {
String message = new String(bytes, StandardCharsets.UTF_8);
JsonObject jsonObject = GsonInst.gson.fromJson(message, JsonObject.class);
Vote v = new Vote(jsonObject);
listener.onForward(v);
String message = new String(bytes, StandardCharsets.UTF_8);
try (JsonReader reader = new JsonReader(new CharArrayReader(message.toCharArray()))){
while (reader.hasNext()) {
JsonObject jsonObject = GsonInst.gson.fromJson(reader, JsonObject.class);
Vote v = new Vote(jsonObject);
listener.onForward(v);
}
} catch (Exception e) {
NuVotifierBukkit.getInstance().getLogger().log(Level.SEVERE, "There was an unknown error when processing a forwarded vote.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -44,7 +45,16 @@ public void forward(Vote v) {

protected boolean forwardSpecific(BackendServer connection, Vote vote) {
byte[] rawData = vote.serialize().toString().getBytes(StandardCharsets.UTF_8);
return connection.sendPluginMessage(channel, rawData);
return forwardSpecific(connection, rawData);
}

protected boolean forwardSpecific(BackendServer connection, Collection<Vote> votes) {
StringBuilder data = new StringBuilder();
for (Vote v : votes) {
data.append(v.serialize().toString());
}

return forwardSpecific(connection, data.toString().getBytes(StandardCharsets.UTF_8));
}

private boolean forwardSpecific(BackendServer connection, byte[] data) {
Expand Down Expand Up @@ -99,25 +109,24 @@ private void dumpVotesToServer(Collection<Vote> cachedVotes, BackendServer targe
plugin.getScheduler().delayedOnPool(() -> {
int evicted = 0;
Iterator<Vote> vi = cachedVotes.iterator();

Collection<Vote> chunk = new ArrayList<>(dumpRate);
while (vi.hasNext() && evicted < dumpRate) {
Vote v = vi.next();
if (forwardSpecific(target, v)) {
vi.remove();
evicted++;
} else {
// so since our forwarding failed, break like we are done
break;
}
chunk.add(vi.next());
vi.remove();
}

if (evicted >= dumpRate && !cachedVotes.isEmpty()) {
// if we evicted everything we could but still need to evict more
dumpVotesToServer(cachedVotes, target, identifier, evictedAlready + evicted, cb);
return;
}
if (forwardSpecific(target, chunk)) {
evicted += chunk.size();

// we either successfully
if (evicted >= dumpRate && !cachedVotes.isEmpty()) {
// if we evicted everything we could but still need to evict more
dumpVotesToServer(cachedVotes, target, identifier, evictedAlready + evicted, cb);
return;
}
} else {
// so since our forwarding failed, break like we are done
cachedVotes.addAll(chunk);
}

if (plugin.isDebug()) {
plugin.getPluginLogger().info("Successfully evicted " + (evictedAlready + evicted) + " votes to " + identifier + ".");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vexsoftware.votifier.sponge.forwarding;

import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import com.vexsoftware.votifier.model.Vote;
import com.vexsoftware.votifier.sponge.VotifierPlugin;
import com.vexsoftware.votifier.util.GsonInst;
Expand All @@ -11,6 +12,7 @@
import org.spongepowered.api.network.RawDataListener;
import org.spongepowered.api.network.RemoteConnection;

import java.io.CharArrayReader;
import java.nio.charset.StandardCharsets;

public class SpongePluginMessagingForwardingSink implements ForwardingVoteSink, RawDataListener {
Expand All @@ -33,12 +35,14 @@ public void halt() {

@Override
public void handlePayload(ChannelBuf channelBuf, RemoteConnection remoteConnection, Platform.Type type) {
try {
byte[] msgDirBuf = channelBuf.readBytes(channelBuf.available());
String message = new String(msgDirBuf, StandardCharsets.UTF_8);
JsonObject jsonObject = GsonInst.gson.fromJson(message, JsonObject.class);
Vote v = new Vote(jsonObject);
listener.onForward(v);
byte[] msgDirBuf = channelBuf.readBytes(channelBuf.available());
String message = new String(msgDirBuf, StandardCharsets.UTF_8);
try (JsonReader reader = new JsonReader(new CharArrayReader(message.toCharArray()))){
while (reader.hasNext()) {
JsonObject jsonObject = GsonInst.gson.fromJson(reader, JsonObject.class);
Vote v = new Vote(jsonObject);
listener.onForward(v);
}
} catch (Exception e) {
p.getLogger().error("There was an unknown error when processing a forwarded vote.", e);
}
Expand Down

0 comments on commit 590b761

Please sign in to comment.