This is a library like Rx.NET for asynchronous or event based programs, based on the concept of OutputRange.
The operators' name is based on std.algorithm and ReactiveX.
import rx;
import std.conv : to;
import std.range : iota, put;
void main()
{
// create own source of int
auto subject = new SubjectObject!int;
// define result array
string[] result;
// define pipeline and subscribe
// sequence: source -> filter by even -> map to string -> join to result
auto disposable = subject.filter!(n => n % 2 == 0).map!(o => to!string(o))
.doSubscribe!(text => result ~= text);
// set unsubscribe on exit
// it is not necessary in this simple example,
// but in many cases you should call dispose to prevent memory leaks.
scope (exit)
disposable.dispose();
// put values to source.
put(subject, iota(10));
// result is like this
assert(result == ["0", "2", "4", "6", "8"]);
}
And more examples or Documents
Setting dependencies in dub.json
{
...
"dependencies": {
"rx": "~>0.10.0"
}
}
or dub.sdl
dependency "rx" version="~>0.10.0"
All operators are written using template and struct for optimization. this example is a binary interface like std.range.interfaces.
//module rx.disposable
interface Disposable
{
void dispose();
}
//module rx.observer
interface Observer(E) : OutputRange!E
{
//void put(E obj); //inherits from OutputRange!E
void completed();
void failure(Exception e);
}
//module rx.observable
interface Observable(E)
{
alias ElementType = E;
Disposable subscribe(Observer!E observer);
}
Supported compilers are dmd
and ldc
that latest 3 versions.
This library is under the MIT License.
Some code is borrowed from Rx.NET.
Issue and PullRequest are welcome! 😃
Refer to CONTRIBUTING.md for details.
git clone https://github.com/lempiji/rx
cd rx
dub test
Use https://github.com/adamdruppe/adrdox
- generic observable factory
- create, start, timer, interval
- more subjects
- publish, connectable
- more algorithms
- window, multicast
- more test
- more documents