Skip to content

Commit

Permalink
6 redis protocol (#20)
Browse files Browse the repository at this point in the history
* Starting Redis!
* Resp3 Parser
* Resp3 First communication incoming!
* Resp3 Proxying without subscriptions
* Resp3 Testing subscriptions
* Resp3 Recording subs
* [Resp3] Correct ordering of recording
* [Resp3] Deserialization
* [Resp3] Replay test
* [Resp3] Added runner
* [Resp3] Binary test
* [Resp3] Review readme
  • Loading branch information
kendarorg authored May 20, 2024
1 parent 170ff49 commit 4c6a1b7
Show file tree
Hide file tree
Showing 95 changed files with 3,126 additions and 205 deletions.
2 changes: 0 additions & 2 deletions .idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 19 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ several database wire protocol implementations:
* [RabbitMq/AMQP 0.9.1](protocol-amqp-091/README.md)
* Support for basic queue/publish/consume
* Channels multiplexing
* Redis ([in progress](https://github.com/kendarorg/the-protocol-master/tree/6-redis-protocol))
* [Redis](protocol-redis/README.md)
* Support for subscriptions
* Support for push parsing
* RESP2 and RESP3 supported out of the box

## If you like it Buy me a coffe :)

Expand Down Expand Up @@ -69,17 +72,21 @@ java -cp "ojdbc11.jar;protocol-runner.jar" \

<pre>
usage: runner
-l [arg] Select listening port
-p [arg] Select protocol (mysql/mongo/postgres/amqp091)
-pl Replay from log directory
-xc [arg] Select remote connection string
-xd [arg] Select remote log directory (you can set a {timestamp} value
-l <arg> [all] Select listening port
-p <arg> Select protocol (mysql/mongo/postgres/amqp091/redis)
-pl [all] Replay from log/replay directory
-t <arg> [all] Set timeout in seconds towards proxied system (default
30s)
-v <arg> [all] Log level (default ERROR)
-xc <arg> [all] Select remote connection string (for redis use
redis://host:port
-xd <arg> [all] Select log/replay directory (you can set a {timestamp}
value
that will be replaced with the current timestamp)
-xl [arg] Select remote login
-xw [arg] Select remote password
-v [arg] Log level (default ERROR)
-js [arg] [jdbc] Set schema
-jr [arg] [jdbc] Replace queries
-xl <arg> [mysql/mongo/postgres/amqp091] Select remote login
-xw <arg> [mysql/mongo/postgres/amqp091] Select remote password
-jr <arg> [jdbc] Replace queries
-js <arg> [jdbc] Set schema
</pre>

Inside the chosen directory you will find simple jsons containing all the data exchanged
Expand All @@ -92,7 +99,7 @@ The set schema is called in case the jdbc driver does not allow setting the sche
### Replace Queries

Specify a file containing "replacement queries" this is specially useful when running ... the runner
as postgres and contacting a different kind of database. Here can be inserted the replacements.
as postgres and contacting a different kind of database. Here can be inserted the replacements.

SPACE ARE IMPORTANT INSIDE THE QUERY. THEY MUST MATCH THE REAL ONE.
AND NO ";" SHOULD BE ADDED AT THE END OF QUERIES
Expand Down
5 changes: 0 additions & 5 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
## TODO

* Timeouts
* Run from command line
* Recording asynchronously
* Replaying

* Use the interrupt to generate "preformed" events for the state chart

STATE CHARTS:
Expand Down
7 changes: 6 additions & 1 deletion jacoco/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.kendar.protocol</groupId>
<artifactId>protocol-master</artifactId>
<version>1.3.2</version>
<version>1.3.3</version>
</parent>

<artifactId>jacoco</artifactId>
Expand All @@ -30,6 +30,11 @@
<artifactId>protocol-mysql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.kendar.protocol</groupId>
<artifactId>protocol-redis</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>


Expand Down
7 changes: 5 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.kendar.protocol</groupId>
<artifactId>protocol-master</artifactId>
<version>1.3.2</version>
<version>1.3.3</version>
<packaging>pom</packaging>

<modules>
Expand All @@ -17,15 +17,17 @@
<module>protocol-mysql</module>
<module>protocol-runner</module>
<module>protocol-amqp-091</module>
<module>protocol-redis</module>
<module>jacoco</module>
<module>todo</module>
</modules>

<properties>
<project.version>1.3.2</project.version>
<project.version>1.3.3</project.version>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<testcontainers.version>1.19.1</testcontainers.version>
<testcontainers.redis.version>1.6.4</testcontainers.redis.version>
<junit.jupiter.version>RELEASE</junit.jupiter.version>
<mybatis.version>3.5.7</mybatis.version>
<mysql.version>8.2.0</mysql.version>
Expand All @@ -39,6 +41,7 @@
<jaxb.api.version>2.3.0</jaxb.api.version>
<commons.cli.version>1.6.0</commons.cli.version>
<amqp.client.version>5.20.0</amqp.client.version>
<jedis.version>5.1.2</jedis.version>
</properties>
<dependencies>
<!-- <dependency>-->
Expand Down
2 changes: 1 addition & 1 deletion protocol-amqp-091/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## AMQP 0.9.1 Protocol
# AMQP 0.9.1 Protocol

## Documentation used

Expand Down
2 changes: 1 addition & 1 deletion protocol-amqp-091/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.kendar.protocol</groupId>
<artifactId>protocol-master</artifactId>
<version>1.3.2</version>
<version>1.3.3</version>
</parent>

<artifactId>protocol-amqp-091</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public List<StorageItem<JsonNode, JsonNode>> readResponses(long afterIndex) {
for (var item : index.stream().filter(a -> a.getIndex() > afterIndex).collect(Collectors.toList())) {
if (item.getType().equalsIgnoreCase("RESPONSE")) {
var outItem = outItems.stream().filter(a -> a.getIndex() == item.getIndex()).findFirst();
if (!outItem.isEmpty()) {
if (outItem.isPresent()) {
result.add(outItem.get());
log.debug("[SERVER][CB] After: " + afterIndex + " Index: " + item.getIndex() + " Type: " +
outItem.get().getOutput().get("type").textValue());
Expand Down Expand Up @@ -168,8 +168,7 @@ protected Map<String, String> buildTag(StorageItem<JsonNode, JsonNode> item) {

@Override
protected List<StorageItem<JsonNode, JsonNode>> readAllItems() {
var res = super.readAllItems();
return res;
return super.readAllItems();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
public class AmqpProtoContext extends NetworkProtoContext {
private short channel = 1;

public AmqpProtoContext(ProtoDescriptor descriptor) {
super(descriptor);
}

@Override
public void disconnect(Object connection) {
ProxyConnection conn = ((ProxyConnection) getValue("CONNECTION"));
var sock = (ProxySocket) conn.getConnection();
sock.close();
}

public AmqpProtoContext(ProtoDescriptor descriptor) {
super(descriptor);
}

public short getChannel() {
return ++channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,21 @@ public void completed(Integer result, ByteBuffer attachment) {
stepsToInvoke = possible.executeEvent(be);
tempBuffer.truncate();
context.runSteps(stepsToInvoke, possible, be);
log.trace("[PROXY ][RX] Found(1): " + possible.getClass().getSimpleName());
log.trace("[PROXY ][RX][1]: " + possible.getClass().getSimpleName());
run = true;
break;
} else if (possible.canRunEvent(fre)) {
stepsToInvoke = possible.executeEvent(fre);
tempBuffer.truncate();
context.runSteps(stepsToInvoke, possible, fre);
log.trace("[PROXY ][RX] Found(1): " + possible.getClass().getSimpleName());
log.trace("[PROXY ][RX][2]: " + possible.getClass().getSimpleName());
run = true;
break;
}
}
if (!run && gf.canRun(be)) {
var event = gf.execute(be);
log.trace("[PROXY ][RX] Found(2): " + gf.getClass().getSimpleName());
log.trace("[PROXY ][RX][3]: " + gf.getClass().getSimpleName());
inputQueue.add(event);
tempBuffer.truncate();
run = true;
Expand Down Expand Up @@ -226,7 +226,7 @@ public void close() {
try {
channel.close();
} catch (IOException e) {

log.trace("Ignorable",e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import static org.kendar.protocol.states.ProtoState.iteratorOfRunnable;

public class ProxyedBehaviour {
private static JsonMapper mapper = new JsonMapper();
private static final JsonMapper mapper = new JsonMapper();

public static Iterator<ProtoStep> doStuff(Frame input,
NetworkProtoContext context, int channel,
Expand Down
2 changes: 1 addition & 1 deletion protocol-amqp-091/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<logger name="com.github" level="ERROR"/>
<logger name="org.kendar.amqp.v09.utils" level="INFO"/>
<logger name="org.testcontainers" level="ERROR"/>
<!-- <logger name="org.kendar.amqp.v09.utils.ProxySocket" level="TRACE"/>-->
<!-- <logger name="org.kendar.redis.utils.ProxySocket" level="TRACE"/>-->

<root level="ERROR">
<appender-ref ref="stdout"/>
Expand Down
2 changes: 1 addition & 1 deletion protocol-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.kendar.protocol</groupId>
<artifactId>protocol-master</artifactId>
<version>1.3.2</version>
<version>1.3.3</version>
</parent>

<artifactId>protocol-common</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions protocol-common/src/main/java/org/kendar/buffers/BBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ public byte[] getAll() {
return bytes;
}

public byte[] getRemaining() {
if (position < 0) return bytes;
var toCopy = new byte[bytes.length - position];
System.arraycopy(bytes, position, toCopy, 0, bytes.length - position);
return toCopy;
}

public byte[] toArray() {
return bytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void handleExceptionInternal(Exception ex) {
try {
client.close();
} catch (IOException e) {

log.trace("Ignorable",e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,58 +31,16 @@
*/
public abstract class ProtoContext {

private static Thread contextCleaner;
private static final ConcurrentHashMap<Integer, ProtoContext> contextsCache = new ConcurrentHashMap<>();
private static final AtomicInteger timeout = new AtomicInteger(30);
private static final Logger log = LoggerFactory.getLogger(ProtoContext.class);
private static final Thread contextCleaner;

static {
contextCleaner = new Thread(ProtoContext::contextsClean);
contextCleaner.start();
}

public static void setTimeout(int value){
timeout.set(value);
}


private static void contextsClean(){

while(true){
Sleeper.sleep(1000);
var fixedItemsList = new ArrayList<>(contextsCache.entrySet());
for(var item:fixedItemsList){
var now = getNow();
if(item.getValue().lastAccess.get()<(now+timeout.get())){
var context = item.getValue();
var contextConnection = context.getValue("CONNECTION");
if(contextConnection==null){
contextsCache.remove(item.getKey());
}

try {
context.disconnect(((ProxyConnection) contextConnection).getConnection());
log.debug("Disconnecting");
}catch (Exception ex){
log.trace("Error disconnecting",ex);
}
contextsCache.remove(item.getKey());
}else{
log.debug("keepalive");
}
}
}
}

public abstract void disconnect(Object connection);

protected AtomicLong lastAccess = new AtomicLong(getNow());

protected static long getNow() {
return System.currentTimeMillis() / 1000;
}

private static final Logger log = LoggerFactory.getLogger(ProtoContext.class);

/**
* Stores the variable relatives to the current instance execution
*/
Expand Down Expand Up @@ -110,6 +68,7 @@ protected static long getNow() {
* Exclusively lock the send operation
*/
private final Object sendLock = new Object();
protected final AtomicLong lastAccess = new AtomicLong(getNow());
/**
* Execution stack, this stores the current state
*/
Expand All @@ -122,12 +81,47 @@ protected static long getNow() {
* The last state used
*/
private ProtoState currentState;

public ProtoContext(ProtoDescriptor descriptor) {
this.contextId = ProtoDescriptor.getCounter("CONTEXT_ID");
this.descriptor = descriptor;
this.root = descriptor.getTaggedStates();
contextsCache.put(this.contextId,this);
contextsCache.put(this.contextId, this);
}

public static void setTimeout(int value) {
timeout.set(value);
}

private static void contextsClean() {

while (true) {
Sleeper.sleep(1000);
var fixedItemsList = new ArrayList<>(contextsCache.entrySet());
for (var item : fixedItemsList) {
var now = getNow();
if (item.getValue().lastAccess.get() < (now + timeout.get())) {
var context = item.getValue();
var contextConnection = context.getValue("CONNECTION");
if (contextConnection == null) {
contextsCache.remove(item.getKey());
}

try {
context.disconnect(((ProxyConnection) contextConnection).getConnection());
log.debug("Disconnecting");
} catch (Exception ex) {
log.trace("Error disconnecting", ex);
}
contextsCache.remove(item.getKey());
} else {
log.debug("keepalive");
}
}
}
}

protected static long getNow() {
return System.currentTimeMillis() / 1000;
}

/**
Expand Down Expand Up @@ -172,6 +166,8 @@ private static ProtoState retrieveExecutableState(ProtoState candidate, BaseEven
return result;
}

public abstract void disconnect(Object connection);

/**
* The instance id
*/
Expand Down
Loading

0 comments on commit 4c6a1b7

Please sign in to comment.