diff --git a/compat/operator/shareReplay.ts b/compat/operator/shareReplay.ts index c5a0ebf455..b13ab405b5 100644 --- a/compat/operator/shareReplay.ts +++ b/compat/operator/shareReplay.ts @@ -1,11 +1,17 @@ import { Observable, SchedulerLike } from 'rxjs'; import { shareReplay as higherOrder } from 'rxjs/operators'; +import { ShareReplayConfig } from 'rxjs/internal-compatibility'; /** * @method shareReplay * @owner Observable */ -export function shareReplay(this: Observable, bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): +export function shareReplay(this: Observable, config: ShareReplayConfig): Observable; +export function shareReplay(this: Observable, bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): Observable; +export function shareReplay(this: Observable, configOrBufferSize?: ShareReplayConfig | number, windowTime?: number, scheduler?: SchedulerLike): Observable { - return higherOrder(bufferSize, windowTime, scheduler)(this) as Observable; + if (configOrBufferSize && typeof configOrBufferSize === 'object') { + return higherOrder(configOrBufferSize as ShareReplayConfig)(this) as Observable; + } + return higherOrder(configOrBufferSize as number | undefined, windowTime, scheduler)(this) as Observable; } diff --git a/doc/README.md b/doc/README.md deleted file mode 100644 index b935917e7c..0000000000 --- a/doc/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# RxJS V5 official docs at http://reactivex.io/rxjs/manual/overview.html - -These files are not meant for reading directly in GitHub, they are just source code for generating the official page. You should find the docs at http://reactivex.io/rxjs/manual/overview.html, containing all the documentation. -Be aware, navigating to http://reactivex.io/rxjs will redirect you directly to the new V6 docs at https://rxjs-dev.firebaseapp.com. - -*EVERYTHING WITHIN THIS FOLDER IS RELATED TO RxJS 5 AND WILL NOT BE MAINTAINED. CHECK OUT THE [NEW DOCS](../docs_app/README.md)* \ No newline at end of file diff --git a/spec-dtslint/operators/shareReplay-spec.ts b/spec-dtslint/operators/shareReplay-spec.ts index 2eee8f9f35..d51dc1d37f 100644 --- a/spec-dtslint/operators/shareReplay-spec.ts +++ b/spec-dtslint/operators/shareReplay-spec.ts @@ -1,30 +1,30 @@ -import { of, asyncScheduler } from 'rxjs'; +import { of, asyncScheduler } from 'rxjs'; import { shareReplay } from 'rxjs/operators'; -it('should infer correctly', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay()); // $ExpectType Observable +it('should accept an individual bufferSize parameter', () => { + const o = of(1, 2, 3).pipe(shareReplay(1)); // $ExpectType Observable }); -it('should support a bufferSize', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay(6)); // $ExpectType Observable +it('should accept individual bufferSize and windowTime parameters', () => { + const o = of(1, 2, 3).pipe(shareReplay(1, 2)); // $ExpectType Observable }); -it('should support a windowTime', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay(6, 4)); // $ExpectType Observable +it('should accept individual bufferSize, windowTime and scheduler parameters', () => { + const o3 = of(1, 2, 3).pipe(shareReplay(1, 2, asyncScheduler)); // $ExpectType Observable }); -it('should support a scheduler', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay(6, 4, asyncScheduler)); // $ExpectType Observable +it('should accept a bufferSize config parameter', () => { + const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, refCount: true })); // $ExpectType Observable }); -it('should enforce type of bufferSize', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay('abc')); // $ExpectError +it('should accept bufferSize and windowTime config parameters', () => { + const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, windowTime: 2, refCount: true })); // $ExpectType Observable }); -it('should enforce type of windowTime', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay(5, 'abc')); // $ExpectError +it('should accept bufferSize, windowTime and scheduler config parameters', () => { + const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, windowTime: 2, scheduler: asyncScheduler, refCount: true })); // $ExpectType Observable }); -it('should enforce type of scheduler', () => { - const o = of('foo', 'bar', 'baz').pipe(shareReplay(5, 3, 'abc')); // $ExpectError +it('should require a refCount config parameter', () => { + const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1 })); // $ExpectError }); diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index ebe47d1330..3f1914e455 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -163,20 +163,6 @@ describe('shareReplay operator', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); - it('should not restart if refCount hits 0 due to unsubscriptions', () => { - const results: number[] = []; - const source = interval(10, rxTestScheduler).pipe( - take(10), - shareReplay(1) - ); - const subs = source.subscribe(x => results.push(x)); - rxTestScheduler.schedule(() => subs.unsubscribe(), 35); - rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54); - - rxTestScheduler.flush(); - expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]); - }); - it('when no windowTime is given ReplaySubject should be in _infiniteTimeWindow mode', () => { const spy = sinon.spy(rxTestScheduler, 'now'); @@ -187,6 +173,45 @@ describe('shareReplay operator', () => { expect(spy, 'ReplaySubject should not call scheduler.now() when no windowTime is given').to.be.not.called; }); + it('should not restart due to unsubscriptions if refCount is false', () => { + const source = cold('a-b-c-d-e-f-g-h-i-j'); + const sub1 = '^------!'; + const expected1 = 'a-b-c-d-'; + const sub2 = '-----------^-------'; + const expected2 = '-----------fg-h-i-j'; + + const shared = source.pipe(shareReplay({ bufferSize: 1, refCount: false })); + + expectObservable(shared, sub1).toBe(expected1); + expectObservable(shared, sub2).toBe(expected2); + }); + + it('should restart due to unsubscriptions if refCount is true', () => { + const source = cold('a-b-c-d-e-f-g-h-i-j'); + const sub1 = '^------!'; + const expected1 = 'a-b-c-d-'; + const sub2 = '-----------^------------------'; + const expected2 = '-----------a-b-c-d-e-f-g-h-i-j'; + + const shared = source.pipe(shareReplay({ bufferSize: 1, refCount: true })); + + expectObservable(shared, sub1).toBe(expected1); + expectObservable(shared, sub2).toBe(expected2); + }); + + it('should default to refCount being false', () => { + const source = cold('a-b-c-d-e-f-g-h-i-j'); + const sub1 = '^------!'; + const expected1 = 'a-b-c-d-'; + const sub2 = '-----------^-------'; + const expected2 = '-----------fg-h-i-j'; + + const shared = source.pipe(shareReplay(1)); + + expectObservable(shared, sub1).toBe(expected1); + expectObservable(shared, sub2).toBe(expected2); + }); + it('should not break lift() composability', (done: MochaDone) => { class MyCustomObservable extends Observable { lift(operator: Operator): Observable { diff --git a/src/internal-compatibility/index.ts b/src/internal-compatibility/index.ts index a8998475a0..c7b0058207 100644 --- a/src/internal-compatibility/index.ts +++ b/src/internal-compatibility/index.ts @@ -23,6 +23,7 @@ export { SubscribeOnObservable } from '../internal/observable/SubscribeOnObserva export { Timestamp } from '../internal/operators/timestamp'; export { TimeInterval } from '../internal/operators/timeInterval'; export { GroupedObservable } from '../internal/operators/groupBy'; +export { ShareReplayConfig } from '../internal/operators/shareReplay'; export { ThrottleConfig, defaultThrottleConfig } from '../internal/operators/throttle'; export { rxSubscriber } from '../internal/symbol/rxSubscriber'; diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index ec20a784a2..263b98f39d 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -4,6 +4,13 @@ import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { Subscriber } from '../Subscriber'; +export interface ShareReplayConfig { + bufferSize?: number; + windowTime?: number; + refCount: boolean; + scheduler?: SchedulerLike; +} + /** * Share source and replay specified number of emissions on subscription. * @@ -49,18 +56,36 @@ import { Subscriber } from '../Subscriber'; * @method shareReplay * @owner Observable */ +export function shareReplay(config: ShareReplayConfig): MonoTypeOperatorFunction; +export function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; export function shareReplay( - bufferSize: number = Number.POSITIVE_INFINITY, - windowTime: number = Number.POSITIVE_INFINITY, + configOrBufferSize?: ShareReplayConfig | number, + windowTime?: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler)); + let config: ShareReplayConfig; + if (configOrBufferSize && typeof configOrBufferSize === 'object') { + config = configOrBufferSize as ShareReplayConfig; + } else { + config = { + bufferSize: configOrBufferSize as number | undefined, + windowTime, + refCount: false, + scheduler + }; + } + return (source: Observable) => source.lift(shareReplayOperator(config)); } -function shareReplayOperator(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) { - let subject: ReplaySubject; +function shareReplayOperator({ + bufferSize = Number.POSITIVE_INFINITY, + windowTime = Number.POSITIVE_INFINITY, + refCount: useRefCount, + scheduler +}: ShareReplayConfig) { + let subject: ReplaySubject | undefined; let refCount = 0; - let subscription: Subscription; + let subscription: Subscription | undefined; let hasError = false; let isComplete = false; @@ -83,13 +108,14 @@ function shareReplayOperator(bufferSize?: number, windowTime?: number, schedu } const innerSub = subject.subscribe(this); - - return () => { + this.add(() => { refCount--; innerSub.unsubscribe(); - if (subscription && refCount === 0 && isComplete) { + if (subscription && !isComplete && useRefCount && refCount === 0) { subscription.unsubscribe(); + subscription = undefined; + subject = undefined; } - }; + }); }; }