Skip to content

Commit f5d4c82

Browse files
committed
Change pipeline implementation
1 parent 6fb9f55 commit f5d4c82

File tree

2 files changed

+166
-211
lines changed

2 files changed

+166
-211
lines changed

src/Pipeline.php

Lines changed: 111 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -2,239 +2,225 @@
22

33
namespace Marquine\Etl;
44

5-
use IteratorAggregate;
5+
use Iterator;
6+
use Marquine\Etl\Loaders\Loader;
7+
use Marquine\Etl\Extractors\Extractor;
8+
use Marquine\Etl\Transformers\Transformer;
69

7-
class Pipeline
10+
class Pipeline implements Iterator
811
{
912
/**
1013
* The pipeline data flow.
1114
*
12-
* @var \IteratorAggregate
15+
* @var \Generator
1316
*/
1417
protected $flow;
1518

1619
/**
17-
* The array of tasks.
20+
* The maximum number of rows.
1821
*
19-
* @var array
22+
* @var int
2023
*/
21-
protected $tasks = [];
24+
protected $limit;
2225

2326
/**
24-
* Current row.
27+
* The number of rows to skip.
2528
*
2629
* @var int
2730
*/
28-
protected $current = 0;
31+
protected $skip;
2932

3033
/**
31-
* Total rows.
34+
* The iteration key.
3235
*
3336
* @var int
3437
*/
35-
protected $total = 0;
38+
protected $key;
3639

3740
/**
38-
* Maximum number of rows.
41+
* The current iteration row.
3942
*
40-
* @var int
43+
* @var \Marquine\Etl\Row
4144
*/
42-
protected $limit;
45+
protected $current;
4346

4447
/**
45-
* Number of rows to skip.
48+
* The etl extractor.
4649
*
47-
* @var int
50+
* @var \Marquine\Etl\Extractors\Extractor
4851
*/
49-
protected $skip;
52+
protected $extractor;
5053

5154
/**
52-
* Pre execution callbacks.
55+
* The array of steps for the pipeline.
5356
*
5457
* @var array
5558
*/
56-
protected $preExecutionCallbacks = [];
59+
protected $steps = [];
5760

5861
/**
59-
* Post execution callbacks.
62+
* Set the pipeline extractor.
6063
*
61-
* @var array
64+
* @param \Marquine\Etl\Extractors\Extractor $extractor
65+
* @return void
6266
*/
63-
protected $postExecutionCallbacks = [];
67+
public function extractor(Extractor $extractor)
68+
{
69+
$this->extractor = $extractor;
70+
}
6471

6572
/**
66-
* Set the pipeline flow.
73+
* Add a step to the pipeline.
6774
*
68-
* @param \IteratorAggregate $flow
75+
* @param \Marquine\EtlStep $step
6976
* @return void
7077
*/
71-
public function flow(IteratorAggregate $flow)
78+
public function pipe(Step $step)
7279
{
73-
$this->flow = $flow;
80+
$this->steps[] = $step;
7481
}
7582

7683
/**
77-
* Pipe a task.
84+
* Set the row limit.
7885
*
79-
* @param callable $task
80-
* @return $this
86+
* @param int $limit
87+
* @return void
8188
*/
82-
public function pipe(callable $task)
89+
public function limit($limit)
8390
{
84-
$this->tasks[] = $task;
85-
86-
return $this;
91+
$this->limit = $limit;
8792
}
8893

8994
/**
90-
* Get the pipeline data generator.
95+
* Set the number of rows to skip.
9196
*
92-
* @return \Generator
97+
* @param int $skip
98+
* @return void
9399
*/
94-
public function get()
100+
public function skip($skip)
95101
{
96-
$this->total = $this->getRowsCount();
97-
98-
foreach ($this->preExecutionCallbacks as $callback) {
99-
$callback();
100-
}
101-
102-
foreach ($this->flow as $index => $row) {
103-
if ($this->skip && $index < $this->skip) {
104-
continue;
105-
}
106-
107-
if ($this->limit && $this->current == $this->limit) {
108-
break;
109-
}
110-
111-
$this->current++;
112-
113-
$row = $this->runTasks($row);
114-
115-
if (!empty($row)) {
116-
yield $row;
117-
}
118-
}
119-
120-
foreach ($this->postExecutionCallbacks as $callback) {
121-
$callback();
122-
}
102+
$this->skip = $skip;
123103
}
124104

125105
/**
126-
* Get a sample row of the flow.
106+
* Get the current element.
127107
*
128-
* @return array
108+
* @return void
129109
*/
130-
public function sample()
110+
public function current()
131111
{
132-
return $this->flow->getIterator()->current();
112+
return $this->current->toArray();
133113
}
134114

135115
/**
136-
* Run tasks for the given row.
116+
* Move forward to next element.
137117
*
138-
* @param array $row
139-
* @return array
118+
* @return void
140119
*/
141-
protected function runTasks($row)
120+
public function next()
142121
{
143-
foreach ($this->tasks as $task) {
144-
$row = $task($row);
145-
146-
if (empty($row)) {
147-
break;
148-
}
149-
}
150-
151-
return $row;
122+
$this->key++;
123+
$this->flow->next();
152124
}
153125

154126
/**
155-
* Get the total rows count.
127+
* Get the key of the current element.
156128
*
157129
* @return int
158130
*/
159-
protected function getRowsCount()
131+
public function key()
160132
{
161-
$count = iterator_count($this->flow);
162-
163-
if ($this->skip) {
164-
$count -= $this->skip;
165-
}
166-
167-
if ($this->limit && $count > $this->limit) {
168-
$count = $this->limit;
169-
}
170-
171-
return $count;
133+
return $this->key;
172134
}
173135

174136
/**
175-
* Get the metadata.
137+
* Checks if current position is valid.
176138
*
177-
* @return array
139+
* @return bool
178140
*/
179-
public function metadata($attribute = null)
141+
public function valid()
180142
{
181-
$metadata = [
182-
'current' => $this->current,
183-
'total' => $this->total,
184-
];
143+
if (! $this->flow->valid() || $this->limitReached()) {
144+
$this->finalize();
145+
146+
return false;
147+
}
148+
149+
$this->current = $this->flow->current();
150+
151+
152+
foreach ($this->steps as $step) {
153+
if ($this->current->discarded()) {
154+
$this->key--;
155+
$this->next();
156+
157+
return $this->valid();
158+
}
159+
160+
if ($step instanceof Transformer) {
161+
$step->transform($this->current);
162+
}
163+
164+
if ($step instanceof Loader) {
165+
$step->load($this->current);
166+
}
167+
}
185168

186-
return $metadata[$attribute] ?? (object) $metadata;
169+
return true;
187170
}
188171

189172
/**
190-
* Set the maximum number of rows.
173+
* Rewind the Iterator to the first element.
191174
*
192-
* @param int $value
193-
* @return $this
175+
* @return void
194176
*/
195-
public function limit($value)
177+
public function rewind()
196178
{
197-
$this->limit = $value;
179+
$this->initialize();
180+
181+
$this->key = 0;
182+
$this->flow = $this->extractor->extract();
198183

199-
return $this;
184+
while ($this->flow->key() < $this->skip && $this->flow->valid()) {
185+
$this->flow->next();
186+
}
200187
}
201188

202189
/**
203-
* Set the number of rows to skip.
190+
* Check if the row limit was reached.
204191
*
205-
* @param int $value
206-
* @return $this
192+
* @return bool
207193
*/
208-
public function skip($value)
194+
protected function limitReached()
209195
{
210-
$this->skip = $value;
211-
212-
return $this;
196+
return $this->limit && $this->key() === $this->limit;
213197
}
214198

215199
/**
216-
* Register pre execution callback.
200+
* Initialize the steps.
217201
*
218-
* @param callable $callback
219-
* @return $this
202+
* @return void
220203
*/
221-
public function before(callable $callback)
204+
protected function initialize()
222205
{
223-
$this->preExecutionCallbacks[] = $callback;
206+
$this->extractor->initialize();
224207

225-
return $this;
208+
foreach ($this->steps as $step) {
209+
$step->initialize();
210+
}
226211
}
227212

228213
/**
229-
* Register post execution callback.
214+
* Finalize the steps.
230215
*
231-
* @param callable $callback
232-
* @return $this
216+
* @return void
233217
*/
234-
public function after(callable $callback)
218+
protected function finalize()
235219
{
236-
$this->postExecutionCallbacks[] = $callback;
220+
$this->extractor->finalize();
237221

238-
return $this;
222+
foreach ($this->steps as $step) {
223+
$step->finalize();
224+
}
239225
}
240226
}

0 commit comments

Comments
 (0)