Skip to content

Commit

Permalink
docs(subject): Subject documentation for new docs app. (#4108)
Browse files Browse the repository at this point in the history
* docs(subject): Subject documentation for new docs app.

* Updated subject.md from old docs for consistency with the new API
* Copied and linked this to the new docs app

* docs(subject): Updating format and style

* Combine logs with code example output
* Add blockquote-like informal style to new docs

* docs(subject): minor updates for clarity / proper linking
  • Loading branch information
ajcrites authored and benlesh committed Sep 10, 2018
1 parent 2704490 commit 7a9ffb5
Show file tree
Hide file tree
Showing 4 changed files with 474 additions and 120 deletions.
234 changes: 115 additions & 119 deletions doc/subject.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,53 @@ Internally to the Subject, `subscribe` does not invoke a new execution that deli

In the example below, we have two Observers attached to a Subject, and we feed some values to the Subject:

```js
var subject = new Rx.Subject();
```ts
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);
```

With the following output on the console:

```none
observerA: 1
observerB: 1
observerA: 2
observerB: 2
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
```

Since a Subject is an Observer, this also means you may provide a Subject as the argument to the `subscribe` of any Observable, like the example below shows:

```js
var subject = new Rx.Subject();
```ts
import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});

var observable = Rx.Observable.from([1, 2, 3]);
const observable = from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject
```

Which executes as:

```none
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
```

With the approach above, we essentially just converted a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers.
Expand All @@ -75,17 +73,20 @@ A "multicasted Observable" passes notifications through a Subject which may have

Under the hood, this is how the `multicast` operator works: Observers subscribe to an underlying Subject, and the Subject subscribes to the source Observable. The following example is similar to the previous example which used `observable.subscribe(subject)`:

```js
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.pipe(multicast(subject));
```ts
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});

// This is, under the hood, `source.subscribe(subject)`:
Expand Down Expand Up @@ -115,22 +116,25 @@ Consider the following example where subscriptions occur as outlined by this lis

To achieve that with explicit calls to `connect()`, we write the following code:

```js
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.pipe(multicast(subject));
var subscription1, subscription2, subscriptionConnect;
```ts
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

Expand All @@ -152,23 +156,26 @@ If we wish to avoid explicit calls to `connect()`, we can use ConnectableObserva

Below is an example:

```js
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.pipe(multicast(subject), refCount());
var subscription1, subscription2;
```ts
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});

setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

Expand All @@ -183,19 +190,16 @@ setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
```

Which executes with the output:

```none
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
```

The `refCount()` method only exists on ConnectableObservable, and it returns an `Observable`, not another ConnectableObservable.
Expand All @@ -208,32 +212,30 @@ One of the variants of Subjects is the `BehaviorSubject`, which has a notion of

In the following example, the BehaviorSubject is initialized with the value `0` which the first Observer receives when it subscribes. The second Observer receives the value `2` even though it subscribed after the value `2` was sent.

```js
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
```ts
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3);
```

With output:

```none
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
```

## ReplaySubject
Expand All @@ -244,11 +246,12 @@ A `ReplaySubject` is similar to a `BehaviorSubject` in that it can send old valu

When creating a `ReplaySubject`, you can specify how many values to replay:

```js
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
```ts
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
Expand All @@ -257,71 +260,67 @@ subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
```

With output:

```none
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
```

You can also specify a *window time* in milliseconds, besides of the buffer size, to determine how old the recorded values can be. In the following example we use a large buffer size of `100`, but a window time parameter of just `500` milliseconds.

<!-- skip-example -->
```js
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);
```ts
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});

var i = 1;
let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
```

With the following output where the second Observer gets events `3`, `4` and `5` that happened in the last `500` milliseconds prior to its subscription:

```none
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
```

## AsyncSubject

The AsyncSubject is a variant where only the last value of the Observable execution is sent to its observers, and only when the execution completes.

```js
var subject = new Rx.AsyncSubject();
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
Expand All @@ -330,18 +329,15 @@ subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();
```

With output:

```none
observerA: 5
observerB: 5
// Logs:
// observerA: 5
// observerB: 5
```

The AsyncSubject is similar to the [`last()`](../class/es6/Observable.js~Observable.html#instance-method-last) operator, in that it waits for the `complete` notification in order to deliver a single value.
Loading

0 comments on commit 7a9ffb5

Please sign in to comment.