Skip to content

Added multipart support #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"guzzle/parser": "~3.0",
"react/socket": "0.4.*",
"react/stream": "0.4.*",
"react/filesystem": "dev-master",
"evenement/evenement": "~2.0"
},
"autoload": {
Expand Down
62 changes: 61 additions & 1 deletion src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ class Request extends EventEmitter implements ReadableStreamInterface
private $readable = true;
private $method;
private $path;
private $post;
private $files;
private $query;
private $httpVersion;
private $headers;

// metadata, implicitly added externally
public $remoteAddress;

public function __construct($method, $path, $query = array(), $httpVersion = '1.1', $headers = array())
public function __construct($method, $path, $query = array(), $post = array(), $files = array(), $httpVersion = '1.1', $headers = array())
{
$this->method = $method;
$this->path = $path;
$this->query = $query;
$this->post = $post;
$this->files = $files;
$this->httpVersion = $httpVersion;
$this->headers = $headers;
}
Expand Down Expand Up @@ -53,11 +57,67 @@ public function getHeaders()
return $this->headers;
}

public function getHeader($name)
{
if(array_key_exists($name, $this->headers)) {
return $this->headers[$name];
}
return null;
}

public function setPost($post)
{
$this->post = $post;
}

public function addPost($key, $value)
{
$this->post[$key] = $value;
}

public function getPost()
{
return $this->post;
}

public function addFile($key, $name, $temp_name, $error, $size)
{
$this->files[$key] = [
'filename' => $name,
'tmp_name' => $temp_name,
'error' => $error,
'size' => $size
];
}

public function setFiles($files)
{
$this->files = $files;
}

public function getFiles()
{
return $this->files;
}

public function getContentType()
{
return $this->getHeader('Content-Type');
}

public function expectsContinue()
{
return isset($this->headers['Expect']) && '100-continue' === $this->headers['Expect'];
}

public function isMultiPart()
{
$type = $this->getContentType();
if ($type == null || 0 !== strpos($type, "multipart/form-data"))
return false;
return true;
}

public function isReadable()
{
return $this->readable;
Expand Down
57 changes: 0 additions & 57 deletions src/RequestHeaderParser.php

This file was deleted.

134 changes: 134 additions & 0 deletions src/RequestParser.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<?php

namespace React\Http;

use Evenement\EventEmitter;
use Guzzle\Parser\Message\MessageParser;

/**
* @event headers
* @event error
*/
class RequestParser extends EventEmitter
{
private $buffer = '';
private $maxSize = 4096;
private $parsingDone;

public function feed($data)
{
if (strlen($this->buffer) + strlen($data) > $this->maxSize) {
$this->emit('error', array(new \OverflowException("Maximum header size of {$this->maxSize} exceeded."), $this));

return;
}

$this->buffer .= $data;

if (false !== strpos($this->buffer, "\r\n\r\n")) {
$this->parseRequest($this->buffer);
}
}

public function parseRequest($data)
{
$this->parsingDone = false;
$parser = new MessageParser();
$parsed = $parser->parseRequest($data);

$parsedQuery = array();
if ($parsed['request_url']['query']) {
parse_str($parsed['request_url']['query'], $parsedQuery);
}

$request = new Request(
$parsed['method'],
$parsed['request_url']['path'],
$parsedQuery,
array(),
array(),
$parsed['version'],
$parsed['headers']
);

$request->on('parsed' , function() use($request) {
$this->emit('request', array($request));
$this->removeAllListeners();
});

if($request->isMultiPart()) {
$this->parseMultipart($request, $parsed['body']);
}
else {
$request->body = $parsed['body'];
$this->parsingDone = true;
}

if($this->parsingDone === true) {
$this->emit('request', array($request));
$this->removeAllListeners();
}
}

private function parseMultiPart($request, $body)
{
$boundary = $this->extractBoundary($request);

$blocks = $this->sliceBody($body, $boundary);
array_pop($blocks);

$file_count = 0;

foreach($blocks as $block) {
if(empty($block))
continue;
if (strpos($block, 'application/octet-stream') !== FALSE)
{
preg_match("/name=\"([^\"]*)\"; filename=\"([^\"]*)\".*stream[\n|\r]+([^\n\r].*)?$/s", $block, $matches);
$file_count++;
$temp_name = tempnam(sys_get_temp_dir(), 'Reactor');
$this->emit('trigger', array(function($loop) use($request, $temp_name, $file_count, $matches) {
\React\Filesystem\Filesystem::create($loop)->file($temp_name)->open('cw')->then(function ($stream) use($request, $temp_name, $file_count, $matches){
$stream->write($matches[3]);
$stream->close();
$request->addFile($matches[1], $matches[2], $temp_name, UPLOAD_ERR_OK, strlen($matches[3]));
$file_count--;
if($file_count === 0) {
$this->parsingDone = true;
$request->emit('parsed', array());
}
});
}));
}
else
{
preg_match('/name=\"([^\"]*)\"[\n|\r]+([^\n\r].*)?\r$/s', $block, $matches);
$request->addPost($matches[1], $matches[2]);
}
}

if($file_count === 0) {
$this->parsingDone = true;
}
}

private function sliceBody($body, $boundary)
{
$blocks = preg_split("/-+$boundary/", $body);

return $blocks;
}

private function extractBoundary($request)
{
$type = $request->getContentType();

preg_match('/boundary=(.*)$/', $type, $matches);

if(!count($matches)) {
throw new \Exception('Cannot extract boundary. Boundary missing.');
}

return $matches[1];
}
}
86 changes: 54 additions & 32 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,56 @@
class Server extends EventEmitter implements ServerInterface
{
private $io;
private $loop;

public function __construct(SocketServerInterface $io)
public function __construct($host = '0.0.0.0', $port = '8080', $loop = null)
{
$this->io = $io;

$this->io->on('connection', function (ConnectionInterface $conn) {
// TODO: http 1.1 keep-alive
// TODO: chunked transfer encoding (also for outgoing data)
// TODO: multipart parsing

$parser = new RequestHeaderParser();
$parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $parser) {
// attach remote ip to the request as metadata
$request->remoteAddress = $conn->getRemoteAddress();

$this->handleRequest($conn, $request, $bodyBuffer);

$conn->removeListener('data', array($parser, 'feed'));
$conn->on('end', function () use ($request) {
$request->emit('end');
});
$conn->on('data', function ($data) use ($request) {
$request->emit('data', array($data));
});
$request->on('pause', function () use ($conn) {
$conn->emit('pause');
});
$request->on('resume', function () use ($conn) {
$conn->emit('resume');
});
if ($loop === null)
$loop = \React\EventLoop\Factory::create();

$this->loop = $loop;

$this->io = new \React\Socket\Server($loop);
$this->io->listen($port, $host);

$this->io->on('connection', array($this, 'handleConnection'));
}

public function handleConnection(ConnectionInterface $conn)
{
// TODO: http 1.1 keep-alive
// TODO: chunked transfer encoding (also for outgoing data)

$parser = new RequestParser();
$parser->on('request', function (Request $request) use ($conn, $parser) {
// attach remote ip to the request as metadata
$request->remoteAddress = $conn->getRemoteAddress();

$this->handleRequest($conn, $request);

$conn->removeListener('data', array($parser, 'feed'));
$conn->on('end', function () use ($request) {
$request->emit('end');
});
$conn->on('data', function ($data) use ($request) {
$request->emit('data', array($data));
});
$request->on('pause', function () use ($conn) {
$conn->emit('pause');
});
$request->on('resume', function () use ($conn) {
$conn->emit('resume');
});
});

$conn->on('data', array($parser, 'feed'));
$parser->on('trigger', function($closure) {
$this->loop->nextTick($closure);
});

$conn->on('data', array($parser, 'feed'));
}

public function handleRequest(ConnectionInterface $conn, Request $request, $bodyBuffer)
public function handleRequest(ConnectionInterface $conn, Request $request)
{
$response = new Response($conn);
$response->on('close', array($request, 'close'));
Expand All @@ -56,8 +69,17 @@ public function handleRequest(ConnectionInterface $conn, Request $request, $body

return;
}

$this->emit('request', array($request, $response));
$request->emit('data', array($bodyBuffer));
}

public function run()
{
$this->loop->run();
}

public function stop()
{
$this->io->shutdown();
$this->loop->stop();
}
}
Loading