Skip to content

Commit

Permalink
[amqp091] Completed but transactions
Browse files Browse the repository at this point in the history
[common] Uniform json mapper
[common] Docs
[common] Logging connection
  • Loading branch information
kendarorg committed Feb 29, 2024
1 parent 30865d8 commit d60bcee
Show file tree
Hide file tree
Showing 150 changed files with 2,237 additions and 1,660 deletions.
5 changes: 4 additions & 1 deletion .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

![](protocolmaster_s.gif)

The "protocol-master" is a command line tool (usable for any language) with (if you want to embed it)
The "protocol-master" is a command line tool (usable for any language) with (if you want to embed it)
a set of java libraries, aimed at various tasks (and an executable jar if you want to try it quickly)

* Create a state machine able to interpret a generic wire protocol, handling
Expand Down Expand Up @@ -30,9 +30,9 @@ several database wire protocol implementations:
* Simple authentication (could add an "auth provider")
* [MongoDB](protocol-mongo/README.md)
* Basic authentication
* [RabbitMq/AMQP 0.9.1](protocol-amqp-091/README.md)
* [RabbitMq/AMQP 0.9.1](protocol-amqp-091/README.md)
* Support for basic queue/publish/consume
* Channels multiplexing
* Channels multiplexing
* Redis (soon)

## If you like it Buy me a coffe :)
Expand Down
10 changes: 9 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,12 @@
* Recording asynchronously
* Replaying

STATE CHARTS: https://github.com/klangfarbe/UML-Statechart-Framework-for-Java
* Use the interrupt to generate "preformed" events for the state chart

STATE CHARTS:

* https://www.ascii-code.com/
* https://github.com/klangfarbe/UML-Statechart-Framework-for-Java
* https://github.com/klangfarbe/UML-Statechart-Framework-for-Java
* https://www.graphviz.org/
* https://en.wikipedia.org/wiki/DOT_(graph_description_language)
26 changes: 21 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<module>protocol-runner</module>
<module>protocol-amqp-091</module>
<module>jacoco</module>
<module>todo</module>
</modules>

<properties>
Expand All @@ -38,24 +39,38 @@
<jaxb.api.version>2.3.0</jaxb.api.version>
<commons.cli.version>1.6.0</commons.cli.version>
<amqp.client.version>5.20.0</amqp.client.version>
<logback.version>1.4.8</logback.version>
<slf4j.version>2.0.7</slf4j.version>
</properties>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-api</artifactId>-->
<!-- <version>2.0.7</version>-->

<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- <version>2.0.7</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.logging.log4j</groupId>-->
<!-- <artifactId>log4j-api</artifactId>-->
<!-- <version>2.22.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
Expand Down Expand Up @@ -105,6 +120,7 @@
<id>central2</id>
<name>Central Repository2</name>
<url>https://repo.maven.apache.org/maven2</url>
<!-- https://repo.maven.apache.org/maven2 -->
<layout>default</layout>
<snapshots>
<enabled>false</enabled>
Expand Down
3 changes: 1 addition & 2 deletions protocol-amqp-091/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@

* https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf (page 31)
* https://docs.vmware.com/en/VMware-RabbitMQ-for-Kubernetes/1/rmq/amqp-wireshark.html
* https://dzone.com/refcardz/amqp-essentials (amqp 1.0)
* https://github.com/cloudamqp/amqproxy
* https://crystal-lang.org/api/1.11.2
* https://github.com/wireshark/wireshark/blob/master/epan/dissectors/packet-amqp.c
* https://github.com/bloomberg/amqpprox/tree/main/libamqpprox
* https://stackoverflow.com/questions/18403623/rabbitmq-amqp-basicproperties-builder-values
* https://www.ascii-code.com/


* v0.9.1 vs 0.10.0
Expand All @@ -20,3 +18,4 @@

amqp0-9-1.pdf, section 4.2.3
The size is an INTEGER not a long
Actually transactions are not supported
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public class AmqpFileStorage extends BaseFileStorage<JsonNode, JsonNode> impleme
private final List<StorageItem<JsonNode, JsonNode>> inMemoryDb = new ArrayList<>();
private final List<StorageItem<JsonNode, JsonNode>> compareData = new ArrayList<>();
private final List<StorageItem<JsonNode, JsonNode>> outItems = new ArrayList<>();
private boolean initialized = false;
private final Object lockObject = new Object();
private final Object responseLockObject = new Object();
private boolean initialized = false;

public AmqpFileStorage(String targetDir) {
super(targetDir);
Expand All @@ -42,7 +42,7 @@ public StorageItem<JsonNode, JsonNode> read(JsonNode node, String type) {
synchronized (lockObject) {
var item = inMemoryDb.stream()
.filter(a -> type.equalsIgnoreCase(a.getType()) &&
a.getCaller().equalsIgnoreCase("AMQP")).findFirst();
a.getCaller().equalsIgnoreCase("AMQP")).findFirst();
if (item.isPresent()) {
inMemoryDb.remove(item.get());
return item.get();
Expand Down Expand Up @@ -73,7 +73,7 @@ public List<StorageItem<JsonNode, JsonNode>> readResponses(long afterIndex) {
var result = new ArrayList<StorageItem<JsonNode, JsonNode>>();
for (var item : compareData.stream().filter(a -> a.getIndex() > afterIndex).collect(Collectors.toList())) {
if (item.getType().equalsIgnoreCase("RESPONSE")) {
if(outItems.contains(item)) {
if (outItems.contains(item)) {
result.add(item);
log.trace("[SERVER][CB] After: " + afterIndex + " Index: " + item.getIndex() + " Type: " + item.getOutput().get("type").textValue());
outItems.remove(item);
Expand Down
101 changes: 49 additions & 52 deletions protocol-amqp-091/src/main/java/org/kendar/amqp/v09/AmqpProtocol.java
Original file line number Diff line number Diff line change
@@ -1,103 +1,100 @@
package org.kendar.amqp.v09;

import org.kendar.amqp.v09.executor.AmqpProtoContext;
import org.kendar.amqp.v09.fsm.AmqpFrameTranslator;
import org.kendar.amqp.v09.fsm.ProtocolHeader;
import org.kendar.amqp.v09.fsm.events.AmqpFrame;
import org.kendar.amqp.v09.messages.frames.BodyFrame;
import org.kendar.amqp.v09.messages.frames.HeaderFrame;
import org.kendar.amqp.v09.messages.frames.HearthBeatFrame;
import org.kendar.amqp.v09.messages.methods.basic.BasicAck;
import org.kendar.amqp.v09.messages.methods.basic.BasicConsume;
import org.kendar.amqp.v09.messages.methods.basic.BasicPublish;
import org.kendar.amqp.v09.messages.methods.basic.*;
import org.kendar.amqp.v09.messages.methods.channel.ChannelClose;
import org.kendar.amqp.v09.messages.methods.channel.ChannelOpen;
import org.kendar.amqp.v09.messages.methods.connection.ConnectionClose;
import org.kendar.amqp.v09.messages.methods.connection.ConnectionOpen;
import org.kendar.amqp.v09.messages.methods.connection.ConnectionStartOk;
import org.kendar.amqp.v09.messages.methods.connection.ConnectionTuneOk;
import org.kendar.amqp.v09.messages.methods.queue.QueueDeclare;
import org.kendar.amqp.v09.messages.methods.exchange.ExchangeBind;
import org.kendar.amqp.v09.messages.methods.exchange.ExchangeDeclare;
import org.kendar.amqp.v09.messages.methods.exchange.ExchangeDelete;
import org.kendar.amqp.v09.messages.methods.exchange.ExchangeUnbind;
import org.kendar.amqp.v09.messages.methods.queue.*;
import org.kendar.protocol.context.NetworkProtoContext;
import org.kendar.protocol.context.ProtoContext;
import org.kendar.protocol.context.Tag;
import org.kendar.protocol.descriptor.NetworkProtoDescriptor;
import org.kendar.protocol.descriptor.ProtoDescriptor;
import org.kendar.protocol.events.BytesEvent;
import org.kendar.protocol.states.special.ProtoStateSequence;
import org.kendar.protocol.states.special.ProtoStateSwitchCase;
import org.kendar.protocol.states.special.ProtoStateWhile;
import org.kendar.protocol.states.special.Tagged;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class AmqpProtocol extends NetworkProtoDescriptor {

private static final boolean IS_BIG_ENDIAN = true;
private static final int PORT = 5672;
public static ConcurrentHashMap<Integer, NetworkProtoContext> consumeContext;
public static AtomicInteger consumeIdCounter;
private static final int PORT = 5672;
private int port = PORT;

public AmqpProtocol() {
private AmqpProtocol() {
consumeContext = new ConcurrentHashMap<>();
consumeIdCounter = new AtomicInteger(1);

}

public AmqpProtocol(int port) {
consumeContext = new ConcurrentHashMap<>();
consumeIdCounter = new AtomicInteger(1);
this();
this.port = port;
}

@Override
protected void initializeProtocol() {
addInterruptState(new HearthBeatFrame());

// initialize(
// new ProtoStateSequence(
// new ProtocolHeader(BytesEvent.class),
// new ConnectionStartOk(BytesEvent.class),
// new ConnectionTuneOk(BytesEvent.class),
// new ConnectionOpen(BytesEvent.class),
// new ProtoStateWhile(
// new ChannelOpen(BytesEvent.class),
// new ProtoStateSequence(
// new QueueDeclare(BytesEvent.class),
// new ProtoStateSwitchCase(
// new ProtoStateWhile(
// new BasicPublish(BytesEvent.class),
// new HeaderFrame(BytesEvent.class),
// new ProtoStateWhile(
// new BodyFrame(BytesEvent.class)
// )
// ),
// new ProtoStateWhile(
// new BasicConsume(BytesEvent.class),
// new BasicAck(BytesEvent.class)
// )
// ).asOptional()
// ).asOptional(),
// new ChannelClose(BytesEvent.class)
// ).asOptional(),
// new ConnectionClose(BytesEvent.class)
// )
// );
addInterruptState(new AmqpFrameTranslator(BytesEvent.class));

initialize(
new ProtoStateSequence(
new ProtocolHeader(BytesEvent.class),
new ConnectionStartOk(BytesEvent.class),
new ConnectionTuneOk(BytesEvent.class),
new ConnectionOpen(BytesEvent.class),
new ProtoStateWhile(
new ProtoStateSwitchCase(
new ChannelOpen(BytesEvent.class),
new ChannelClose(BytesEvent.class),
new QueueDeclare(BytesEvent.class),
new BasicPublish(BytesEvent.class),
new HeaderFrame(BytesEvent.class),
new BodyFrame(BytesEvent.class),
new BasicConsume(BytesEvent.class),
new BasicAck(BytesEvent.class)
new ConnectionStartOk(AmqpFrame.class),
new ConnectionTuneOk(AmqpFrame.class),
new ConnectionOpen(AmqpFrame.class),
new Tagged(
Tag.ofKeys("CHANNEL"),
new ProtoStateSequence(
new ChannelOpen(AmqpFrame.class),
new ProtoStateWhile(
new ProtoStateSwitchCase(
new QueueDeclare(AmqpFrame.class),
new QueueBind(AmqpFrame.class),
new QueueUnbind(AmqpFrame.class),
new QueuePurge(AmqpFrame.class),
new QueueDelete(AmqpFrame.class),
new ExchangeDeclare(AmqpFrame.class),
new ExchangeBind(AmqpFrame.class),
new ExchangeUnbind(AmqpFrame.class),
new ExchangeDelete(AmqpFrame.class),
new BasicConsume(AmqpFrame.class),
new BasicCancel(AmqpFrame.class),
new BasicGet(AmqpFrame.class),
new ProtoStateSequence(
new BasicPublish(AmqpFrame.class),
new HeaderFrame(AmqpFrame.class),
new BodyFrame(AmqpFrame.class)
),
new BasicAck(AmqpFrame.class),
new BasicNack(AmqpFrame.class),
new Reject(AmqpFrame.class)
)
),
new ChannelClose(AmqpFrame.class)
)
),
new ConnectionClose(BytesEvent.class)
new ConnectionClose(AmqpFrame.class)
)
);
}
Expand Down
Loading

0 comments on commit d60bcee

Please sign in to comment.