Skip to content

Commit

Permalink
MFH
Browse files Browse the repository at this point in the history
  • Loading branch information
thekid committed Apr 4, 2020
2 parents ddac88d + b5e5f7a commit 2899b3e
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 81 deletions.
26 changes: 24 additions & 2 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,32 @@ stomp ChangeLog

## ?.?.? / ????-??-??

## 10.0.1 / 2020-04-04
## 10.1.2 / 2020-04-04

* Made compatible with XP 10 - @thekid

## 10.1.1 / 2019-01-28

* Fixed buffering issue which would lead to `recvFrame()` not returning
sent STOMP frames in certain situations
(@thekid)

## 10.1.0 / 2019-01-27

* Added `Message::getHeader()` to access a header by its name - @thekid
* Fixed `Destination` instances' string representations - @thekid
* Changed `Connection::connect()` to return the connection itself,
enabling a fluent programming style.
(@thekid)
* Added accessor for underlying socket to `peer.stomp.Connection` class
to support `select()`ing on it.
(@thekid)
* Fixed value encoding in headers, see "Value Encoding" in specification:
https://stomp.github.io/stomp-specification-1.2.html#Value_Encoding
(@thekid)
* Fixed reading frames with `content-length:0` - @thekid
>>>>>>> b5e5f7a80d640a7169eeea7eb38c3a0056bcafb2
## 10.0.0 / 2018-08-24

* Made compatible with `xp-framework/logging` version 9.0.0 - @thekid
Expand Down Expand Up @@ -77,7 +99,7 @@ stomp ChangeLog

## 7.0.1 / 2015-02-12

* Changed dependency to use XP ~6.0 (instead of dev-master) - @thekid
* Changed dependency to use `XP ~6.0` (instead of dev-master) - @thekid

## 7.0.0 / 2015-01-11

Expand Down
30 changes: 10 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ Examples
A message producer

```php
use peer\stomp\Connection;
use peer\stomp\SendableMessage;
use peer\URL;
use peer\stomp\{Connection, SendableMessage};

$conn= new Connection(new URL('stomp://localhost:61613/'));
$conn= new Connection('stomp://localhost:61613/');
$conn->connect();

$conn->getDestination('/queue/producer')->send(
Expand All @@ -37,14 +35,12 @@ $conn->getDestination('/queue/producer')->send(
A simple message consumer (subscriber):

```php
use peer\stomp\Connection;
use peer\stomp\Subscription;
use peer\URL;
use peer\stomp\{Connection, Subscription};

$conn= new Connection(new URL('stomp://localhost:61613/'));
$conn= new Connection('stomp://localhost:61613/');
$conn->connect();

$sub= $conn->subscribeTo(new Subscription('/queue/producer', function($message) {
$conn->subscribeTo(new Subscription('/queue/producer', function($message) {
Console::writeLine('---> Received message: ', $message);
$message->ack();
}));
Expand All @@ -56,20 +52,15 @@ $conn->consume();
A consumer with a broker network may connect to any host when available:

```php
use peer\stomp\Connection;
use peer\stomp\Subscription;
use peer\stomp\Failover;
use peer\URL;
use peer\stomp\{Connection, Subscription, Failover};

$conn= new Connection(Failover::using([
new URL('stomp://one.example.com:61613/'),
new URL('stomp://two.example.com:61613/')
])->byRandom());
$nodes= ['stomp://one.example.com:61613/', 'stomp://two.example.com:61613/'];

// Connect randomly to one or the other
$conn= new Connection(Failover::using($nodes)->byRandom());
$conn->connect();

$sub= $conn->subscribeTo(new Subscription('/queue/producer', function($message) {
$conn->subscribeTo(new Subscription('/queue/producer', function($message) {
Console::writeLine('---> Received message: ', $message);
$message->ack();
}));
Expand All @@ -82,12 +73,11 @@ $conn->consume();
### The connection URL
The URL specifies the options how and where to connect:

* `protocol` should be `stomp` or `stomp+ssl` (not implemented yet)
* `protocol` should be `stomp` or `stomp+ssl` (*SSL not implemented yet*)
* `host` is the hostname to connect
* `port` is the port to connect (default: 61613)
* `user`, `pass` can be given in the URL and will be used for authentication
* Supported parameters:
* `log` - pass a log category to log protocol debug output (eg: `?log=default`)
* `vhost` - virtual host name, since STOMP 1.1 (eg. `?vhost=example.com`)
* `versions` - to specify list of supported versions (eg. `?versions=1.0,1.1`); default is to support 1.0, 1.1

6 changes: 0 additions & 6 deletions TODO.md

This file was deleted.

47 changes: 20 additions & 27 deletions src/main/php/peer/stomp/Connection.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use peer\stomp\frame\MessageFrame;
use peer\stomp\frame\ReceiptFrame;
use util\Objects;
use util\log\Logger;
use util\log\Traceable;

/**
Expand All @@ -36,6 +37,7 @@ class Connection implements Traceable {
protected $socket = null;
protected $in = null;
protected $out = null;
protected $buffered = null;
protected $subscriptions = [];
protected $cat = null;

Expand Down Expand Up @@ -74,6 +76,9 @@ public function __construct($url) {
});
}

/** @return ?peer.Socket */
public function socket() { return $this->socket; }

private static function urlFrom($thing) {
if ($thing instanceof URL) {
return $thing;
Expand Down Expand Up @@ -121,45 +126,31 @@ private function debug(... $args) {
* @return peer.stomp.frame.Frame or null
*/
public function recvFrame($timeout= 0.2) {
if (null === $this->buffered) {
$this->buffered= typeof($this->in)->getField('buf')->setAccessible(true);
}

// Check whether we can read, before we actually read...
if ($this->socket instanceof Socket && !$this->socket->canRead($timeout)) {
$this->debug('<<<', '0 bytes - reading no frame.');
return null;
if ('' === trim($this->buffered->get($this->in))) {
if ($this->socket instanceof Socket && !$this->socket->canRead($timeout)) {
$this->debug('<<<', '0 bytes - reading no frame.');
return null;
}
}

// Swallow any empty newlines on the socket, these are used for heartbeat purposes
do {
$line= $this->in->readLine();
if (null === $line) {
$this->_disconnect();
throw new ServerDisconnected('Got disconnected from '.$this->socket->toString());
}
} while ('' === $line);
$this->debug('<<<', 'Have "'.trim($line).'" command');

if (0 == strlen($line)) throw new ProtocolException('Expected frame token, got '.Objects::stringOf($line));

$frame= self::$frames->loadClass(ucfirst(strtolower(trim($line))).'Frame')->newInstance();
$this->debug('<<<', 'Have "'.$line.'" command');
$frame= self::$frames->loadClass(ucfirst(strtolower($line)).'Frame')->newInstance();
$frame->setTrace($this->cat);
$frame->fromWire($this->in);

// According to the STOMP protocol, the NUL ("\0") delimiter may be followed
// by any number of EOL ("\n") characters. Read them here but be careful not
// to read across past a socket's current stream end!
// FIXME: This conflicts with heart-beating, we might be swallowing that here
// but not reacting correctly in other places!
$c= '';
while (
($this->socket instanceof Socket ? $this->socket->canRead(0.01) : $this->in->getStream()->available()) &&
"\n" === ($c= $this->in->read(1))
) {
// Skip
$this->debug('~ ate a byte: '.Objects::stringOf($c));
}

$f= typeof($this->in)->getField('buf')->setAccessible(true);
$f->set($this->in, $c.$f->get($this->in));

return $frame;
}

Expand Down Expand Up @@ -200,6 +191,7 @@ protected function _connect(URL $url) {

$this->in= new StringReader(new SocketInputStream($this->socket));
$this->out= new StringWriter(new SocketOutputStream($this->socket));
$this->buffered= null;
}

/**
Expand All @@ -209,6 +201,7 @@ protected function _connect(URL $url) {
protected function _disconnect() {
$this->out= null;
$this->in= null;
$this->buffered= null;
$this->socket->close();
}

Expand Down Expand Up @@ -248,7 +241,7 @@ private function _sendAuthenticateFrame(URL $url, $timeout) {
* Connect to server with given username and password
*
* @param float $timeout Defaults to 2 seconds
* @return bool
* @return self
* @throws peer.AuthenticationException if login failed
*/
public function connect($timeout= null) {
Expand All @@ -261,7 +254,7 @@ public function connect($timeout= null) {
return true;
}));

return true;
return $this;
}

/**
Expand Down
24 changes: 20 additions & 4 deletions src/main/php/peer/stomp/Destination.class.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
<?php namespace peer\stomp;

use lang\Value;
use util\Objects;

/**
* STOMP Destination
*/
class Destination {
class Destination implements Value {
protected $name = null;
protected $conn = null;

Expand Down Expand Up @@ -48,10 +49,25 @@ public function send(SendableMessage $message) {
}

/**
* Retrieve string representation
*
* @return string
* Compare
*
* @param var $value
* @return int
*/
public function compareTo($value) {
if ($value instanceof self && $this->conn === $value->conn) {
return strcmp($this->name, $value->name);
} else {
return 1;
}
}

/** @return string */
public function hashCode() {
return '@'.$this->name;
}

/** @return string */
public function toString() {
return $this->name.' -> '.Objects::stringOf($this->conn, ' ');
}
Expand Down
12 changes: 11 additions & 1 deletion src/main/php/peer/stomp/Message.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,20 @@ public function addHeader($name, $value) {
$this->customHeader[$name]= $value;
}

/**
* Get header
*
* @param string $key
* @return ?string
*/
public function getHeader($key) {
return isset($this->customHeader[$key]) ? $this->customHeader[$key] : null;
}

/**
* Retrieve all headers
*
* @return <string,string>[]
* @return [:string]
*/
public function getHeaders() {
return $this->customHeader;
Expand Down
21 changes: 15 additions & 6 deletions src/main/php/peer/stomp/frame/Frame.class.php
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
<?php namespace peer\stomp\frame;

use lang\Value;
use peer\stomp\Header;
use util\log\Traceable;

/**
* Abstract frame base class
*
* @test xp://peer.stomp.unittest.StompFrameTest
* @test xp://peer.stomp.unittest.FrameFromWireTest
* @test xp://peer.stomp.unittest.FrameToWireTest
*/
abstract class Frame implements \lang\Value, \util\log\Traceable {
abstract class Frame implements Value, Traceable {
protected $headers = [];
protected $body = null;

Expand Down Expand Up @@ -141,13 +145,13 @@ public function setBody($data) {
*/
public function fromWire(\io\streams\InputStreamReader $in) {

// Read headers
// Read headers. See https://stomp.github.io/stomp-specification-1.2.html#Value_Encoding
$line= $in->readLine();
while (0 != strlen($line)) {
while (0 !== strlen($line)) {
$this->debug('<<<', $line);

list($key, $value)= explode(':', $line, 2);
$this->addHeader($key, $value);
$this->addHeader($key, strtr($value, ['\\\\' => '\\', '\c' => ':', '\r' => "\r", '\n' => "\n"]));

// Next line
$line= $in->readLine();
Expand All @@ -159,7 +163,12 @@ public function fromWire(\io\streams\InputStreamReader $in) {

// If content-length is given, read that many bytes as body from
// stream and assert that it is followed by a chr(0) byte.
$data= $in->read($this->getHeader(Header::CONTENTLENGTH));
$length= (int)$this->getHeader(Header::CONTENTLENGTH);
if ($length > 0) {
$data= $in->read($length);
} else {
$data= null;
}

if ("\0" != $in->read(1)) throw new \peer\ProtocolException(
'Expected chr(0) after frame w/ given content-length'
Expand Down Expand Up @@ -189,7 +198,7 @@ public function write(\io\streams\OutputStreamWriter $out) {
$out->write($this->command()."\n");

foreach ($this->getHeaders() as $key => $value) {
$out->write($key.':'.$value."\n");
$out->write($key.':'.strtr($value, ['\\' => '\\\\', ':' => '\c', "\r" => '\r', "\n" => '\n'])."\n");
}

$out->write("\n".$this->getBody().chr(0));
Expand Down
8 changes: 4 additions & 4 deletions src/test/php/peer/stomp/unittest/BaseTest.class.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<?php namespace peer\stomp\unittest;

use peer\stomp\Connection;
use peer\URL;
use io\streams\StringReader;
use io\streams\StringWriter;
use io\streams\MemoryInputStream;
use io\streams\MemoryOutputStream;
use io\streams\StringReader;
use io\streams\StringWriter;
use peer\URL;
use peer\stomp\Connection;

abstract class BaseTest extends \unittest\TestCase {
protected $fixture= null;
Expand Down
Loading

0 comments on commit 2899b3e

Please sign in to comment.