Skip to content

ReactiveX/RxPHP

Repository files navigation

RxPHP

This is the development branch for RxPHP v2 and is not stable. For production, use v1 instead.

Reactive extensions for PHP. The reactive extensions for PHP are a set of libraries to compose asynchronous and event-based programs using observable collections and LINQ-style query operators in PHP.

Build Status Coverage Status

Example

$source = \Rx\Observable::fromArray([1, 2, 3, 4]);

$source->subscribe(
    function ($x) {
        echo 'Next: ', $x, PHP_EOL;
    },
    function (Exception $ex) {
        echo 'Error: ', $ex->getMessage(), PHP_EOL;
    },
    function () {
        echo 'Completed', PHP_EOL;
    }
);

//Next: 1
//Next: 2
//Next: 3
//Next: 4
//Completed

Try out the demos

$ git clone git@github.com:reactivex/RxPHP.git -b 2.x
$ cd RxPHP
$ composer install
$ php demo/interval/interval.php

Have fun running the demos in /demo.

note: The demos are automatically run within Loop::execute. When using RxPHP within your own project, you'll need to install a loop implementation.

Installation

  1. Install one async-interop event loop implementation.

With ReactPHP:

$ composer require wyrihaximus/react-async-interop-loop

With amphp:

$ composer require amphp/loop:dev-master

With KoolKode:

$ composer require koolkode/async
  1. Install RxPHP using composer.
$ composer require reactivex/rxphp:2.x-dev
  1. Write some code
<?php

require_once __DIR__ . '/vendor/autoload.php';

use Rx\Observable;
use Interop\Async\Loop;

Loop::execute(function () {

    Observable::interval(1000)
        ->take(5)
        ->flatMap(function ($i) {
            return Observable::of($i + 1);
        })
        ->subscribe(function ($e) {
            echo $e, PHP_EOL;
        });

});

Working with Promises

Some async PHP frameworks have yet to fully embrace the awesome power of observables. To help ease the transition, RxPHP has support for promise libraries that implement the async-interop promise specification.

Mixing a promise into an observable stream:

Observable::interval(1000)
    ->flatMap(function ($i) {
        return Observable::fromPromise(new Resolved($i));
    })
    ->subscribe(function ($v) {
        echo $v . PHP_EOL;
    });

Converting an Observable into a promise. (This is useful for libraries that use generators and coroutines):

$observable = Observable::interval(1000)
    ->take(10)
    ->toArray()
    ->map('json_encode');

$promise = $observable->toPromise();

License

RxPHP is licensed under the MIT License - see the LICENSE file for details

About

Reactive extensions for PHP

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 16

Languages