Skip to content

Commit

Permalink
feat(shareReplay): add config parameter (#4059)
Browse files Browse the repository at this point in the history
* test(shareReplay): add failing/conflicting test

* feat(shareReplay): add config parameter

Closes #3336

* chore(shareReplay): use config for all parameters

* chore(shareReplay): deprecate non-config signature

* chore(shareReplay): add compat signature

* test(dtslint): add shareReplay

* chore(shareReplay): undeprecate signature

* test(shareReplay): use marble tests

* chore(shareReplay): remove compat deprecation
  • Loading branch information
cartant authored and benlesh committed Jan 30, 2019
1 parent a388578 commit 0fd8707
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 47 deletions.
10 changes: 8 additions & 2 deletions compat/operator/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { Observable, SchedulerLike } from 'rxjs';
import { shareReplay as higherOrder } from 'rxjs/operators';
import { ShareReplayConfig } from 'rxjs/internal-compatibility';

/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike):
export function shareReplay<T>(this: Observable<T>, config: ShareReplayConfig): Observable<T>;
export function shareReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): Observable<T>;
export function shareReplay<T>(this: Observable<T>, configOrBufferSize?: ShareReplayConfig | number, windowTime?: number, scheduler?: SchedulerLike):
Observable<T> {
return higherOrder(bufferSize, windowTime, scheduler)(this) as Observable<T>;
if (configOrBufferSize && typeof configOrBufferSize === 'object') {
return higherOrder(configOrBufferSize as ShareReplayConfig)(this) as Observable<T>;
}
return higherOrder(configOrBufferSize as number | undefined, windowTime, scheduler)(this) as Observable<T>;
}
6 changes: 0 additions & 6 deletions doc/README.md

This file was deleted.

30 changes: 15 additions & 15 deletions spec-dtslint/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import { of, asyncScheduler } from 'rxjs';
import { of, asyncScheduler } from 'rxjs';
import { shareReplay } from 'rxjs/operators';

it('should infer correctly', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay()); // $ExpectType Observable<string>
it('should accept an individual bufferSize parameter', () => {
const o = of(1, 2, 3).pipe(shareReplay(1)); // $ExpectType Observable<number>
});

it('should support a bufferSize', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(6)); // $ExpectType Observable<string>
it('should accept individual bufferSize and windowTime parameters', () => {
const o = of(1, 2, 3).pipe(shareReplay(1, 2)); // $ExpectType Observable<number>
});

it('should support a windowTime', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(6, 4)); // $ExpectType Observable<string>
it('should accept individual bufferSize, windowTime and scheduler parameters', () => {
const o3 = of(1, 2, 3).pipe(shareReplay(1, 2, asyncScheduler)); // $ExpectType Observable<number>
});

it('should support a scheduler', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(6, 4, asyncScheduler)); // $ExpectType Observable<string>
it('should accept a bufferSize config parameter', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, refCount: true })); // $ExpectType Observable<number>
});

it('should enforce type of bufferSize', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay('abc')); // $ExpectError
it('should accept bufferSize and windowTime config parameters', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, windowTime: 2, refCount: true })); // $ExpectType Observable<number>
});

it('should enforce type of windowTime', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(5, 'abc')); // $ExpectError
it('should accept bufferSize, windowTime and scheduler config parameters', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, windowTime: 2, scheduler: asyncScheduler, refCount: true })); // $ExpectType Observable<number>
});

it('should enforce type of scheduler', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(5, 3, 'abc')); // $ExpectError
it('should require a refCount config parameter', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1 })); // $ExpectError
});
53 changes: 39 additions & 14 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,6 @@ describe('shareReplay operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should not restart if refCount hits 0 due to unsubscriptions', () => {
const results: number[] = [];
const source = interval(10, rxTestScheduler).pipe(
take(10),
shareReplay(1)
);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54);

rxTestScheduler.flush();
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});

it('when no windowTime is given ReplaySubject should be in _infiniteTimeWindow mode', () => {
const spy = sinon.spy(rxTestScheduler, 'now');

Expand All @@ -187,6 +173,45 @@ describe('shareReplay operator', () => {
expect(spy, 'ReplaySubject should not call scheduler.now() when no windowTime is given').to.be.not.called;
});

it('should not restart due to unsubscriptions if refCount is false', () => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
const sub1 = '^------!';
const expected1 = 'a-b-c-d-';
const sub2 = '-----------^-------';
const expected2 = '-----------fg-h-i-j';

const shared = source.pipe(shareReplay({ bufferSize: 1, refCount: false }));

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
});

it('should restart due to unsubscriptions if refCount is true', () => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
const sub1 = '^------!';
const expected1 = 'a-b-c-d-';
const sub2 = '-----------^------------------';
const expected2 = '-----------a-b-c-d-e-f-g-h-i-j';

const shared = source.pipe(shareReplay({ bufferSize: 1, refCount: true }));

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
});

it('should default to refCount being false', () => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
const sub1 = '^------!';
const expected1 = 'a-b-c-d-';
const sub2 = '-----------^-------';
const expected2 = '-----------fg-h-i-j';

const shared = source.pipe(shareReplay(1));

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
});

it('should not break lift() composability', (done: MochaDone) => {
class MyCustomObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
Expand Down
1 change: 1 addition & 0 deletions src/internal-compatibility/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export { SubscribeOnObservable } from '../internal/observable/SubscribeOnObserva
export { Timestamp } from '../internal/operators/timestamp';
export { TimeInterval } from '../internal/operators/timeInterval';
export { GroupedObservable } from '../internal/operators/groupBy';
export { ShareReplayConfig } from '../internal/operators/shareReplay';
export { ThrottleConfig, defaultThrottleConfig } from '../internal/operators/throttle';

export { rxSubscriber } from '../internal/symbol/rxSubscriber';
Expand Down
46 changes: 36 additions & 10 deletions src/internal/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';

export interface ShareReplayConfig {
bufferSize?: number;
windowTime?: number;
refCount: boolean;
scheduler?: SchedulerLike;
}

/**
* Share source and replay specified number of emissions on subscription.
*
Expand Down Expand Up @@ -49,18 +56,36 @@ import { Subscriber } from '../Subscriber';
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
configOrBufferSize?: ShareReplayConfig | number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
let config: ShareReplayConfig;
if (configOrBufferSize && typeof configOrBufferSize === 'object') {
config = configOrBufferSize as ShareReplayConfig;
} else {
config = {
bufferSize: configOrBufferSize as number | undefined,
windowTime,
refCount: false,
scheduler
};
}
return (source: Observable<T>) => source.lift(shareReplayOperator(config));
}

function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
let subject: ReplaySubject<T>;
function shareReplayOperator<T>({
bufferSize = Number.POSITIVE_INFINITY,
windowTime = Number.POSITIVE_INFINITY,
refCount: useRefCount,
scheduler
}: ShareReplayConfig) {
let subject: ReplaySubject<T> | undefined;
let refCount = 0;
let subscription: Subscription;
let subscription: Subscription | undefined;
let hasError = false;
let isComplete = false;

Expand All @@ -83,13 +108,14 @@ function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, schedu
}

const innerSub = subject.subscribe(this);

return () => {
this.add(() => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && isComplete) {
if (subscription && !isComplete && useRefCount && refCount === 0) {
subscription.unsubscribe();
subscription = undefined;
subject = undefined;
}
};
});
};
}

0 comments on commit 0fd8707

Please sign in to comment.