Skip to content

Commit b408d8b

Browse files
committed
Adding EVENT_* constants to reader
Implementing separate parsing of header-row Implementing row-counter
1 parent 0e1d6c9 commit b408d8b

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

examples/country-codes-reader.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@
1212
include '../vendor/autoload.php';
1313

1414
$loop = Factory::create();
15-
$input = new Reader(new ReadableResourceStream(fopen('country-codes.csv', 'r'), $loop));
15+
16+
$start = microtime();
17+
$inputFd = fopen('country-codes.csv', 'rn');
18+
$end = microtime();
19+
$input = new Reader(new ReadableResourceStream($inputFd, $loop));
1620
$input->setDelimiter(",");
21+
$input->setParseHeader(false);
1722

1823
$input->on('data', function ($field) {
1924
echo $field[10] . PHP_EOL;

examples/writer.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,26 @@
55
use React\EventLoop\Timer\Timer;
66
use React\Stream\WritableResourceStream;
77

8+
chdir(__DIR__);
89
require '../vendor/autoload.php';
910

11+
// Preperation
1012
$loop = Factory::create();
11-
12-
$writer = new Writer(new WritableResourceStream(fopen('php://stdout', 'w'), $loop));
13+
$writer = new Writer(
14+
new WritableResourceStream(
15+
fopen('php://stdout', 'wn'),
16+
$loop
17+
)
18+
);
1319
$int = (object)0;
1420

21+
// Adding a timer to periodically print out some data
1522
$loop->addPeriodicTimer(1, function ($timer) use ($writer, $int) {
1623
/** @var Timer $timer */
1724
$writer->write([2 * $int->scalar, "fds " . 3 * $int->scalar . " a", 4 * $int->scalar]);
1825
$int->scalar++;
26+
27+
// Clean Stopping after printing 10 times
1928
if ($int->scalar > 10) {
2029
$writer->close();
2130
$timer->cancel();

src/Reader.php

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,35 @@
66
use Evenement\EventEmitterTrait;
77
use React\Stream\ReadableStreamInterface;
88
use React\Stream\Util;
9-
use React\Stream\WritableStreamInterface;
109

1110
class Reader implements EventEmitterInterface
1211
{
1312
use EventEmitterTrait;
1413

14+
const EVENT_HEADER = 'header';
15+
const EVENT_DATA = 'data';
16+
1517
/** @var ReadableStreamInterface $stream */
1618
protected $stream;
1719

1820
/** @var resource $buffer */
1921
protected $buffer;
2022
protected $paused = false;
23+
24+
/** @var null|array $header */
25+
protected $header;
26+
protected $parseHeader = true;
27+
protected $headerParsed = false;
28+
2129
protected $delimiter = ",";
2230
protected $enclosure = "\"";
2331
protected $escape = "\\";
2432

33+
protected $rowsParsed = 0;
34+
2535
/**
2636
* Reader constructor.
27-
* @param WritableStreamInterface $stream
37+
* @param ReadableStreamInterface $stream
2838
*/
2939
public function __construct(ReadableStreamInterface $stream)
3040
{
@@ -56,7 +66,17 @@ public function parseBuffer()
5666
$this->stream->isReadable() === false
5767
) {
5868
$start = ftell($this->buffer);
59-
$this->emit("data", [$field]);
69+
++$this->rowsParsed;
70+
if (
71+
$this->headerParsed === false &&
72+
$this->parseHeader === true
73+
) {
74+
$this->header = $field;
75+
$this->headerParsed = true;
76+
$this->emit("header", [$field]);
77+
} else {
78+
$this->emit("data", [$field]);
79+
}
6080
}
6181
}
6282

@@ -66,6 +86,11 @@ public function parseBuffer()
6686
fputs($this->buffer, $dataRemainig);
6787
}
6888

89+
public function setParseHeader($parseHeader)
90+
{
91+
$this->parseHeader = (bool)$parseHeader;
92+
}
93+
6994
public function isPaused()
7095
{
7196
return $this->paused;
@@ -96,6 +121,16 @@ public function close()
96121
$this->stream->close();
97122
}
98123

124+
public function getHeader()
125+
{
126+
return $this->header;
127+
}
128+
129+
public function getRowsParsed()
130+
{
131+
return $this->rowsParsed;
132+
}
133+
99134
public function setDelimiter($delimiter)
100135
{
101136
$this->delimiter = mb_substr($delimiter, 0, 1);

0 commit comments

Comments
 (0)