diff --git a/README.md b/README.md index cc6bb040..c4c35eab 100644 --- a/README.md +++ b/README.md @@ -3,11 +3,33 @@ Learning RxJava for Android by example This is a repository with real-world useful examples of using RxJava with Android. [It usually will be in a constant state of "Work in Progress" (WIP)](http://blog.kaush.co/2014/09/15/learning-rxjava-with-android-by-example/). -I also gave a talk at a local meetup about warming up to RxJava here. Here's a link to the [video and slides](https://newcircle.com/s/post/1744/2015/06/29/learning-rxjava-for-android-by-example). +I've also been giving talks about Learning Rx using many of the examples listed in this repo. + +* [Learning RxJava For Android by Example : Part 1](https://www.youtube.com/watch?v=k3D0cWyNno4) \[[slides](https://speakerdeck.com/kaushikgopal/learning-rxjava-for-android-by-example)\] (SF Android Meetup 2015) +* [Learning Rx by Example : Part 2](https://vimeo.com/190922794) \[[slides](https://speakerdeck.com/kaushikgopal/learning-rx-by-example-2)\] (Øredev 2016) ## Examples: -### Concurrency using schedulers +1. [Background work & concurrency (using Schedulers)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#1-background-work--concurrency-using-schedulers) +2. [Accumulate calls (using buffer)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#2-accumulate-calls-using-buffer) +3. [Instant/Auto searching text listeners (using Subjects & debounce)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#3-instantauto-searching-text-listeners-using-subjects--debounce) +4. [Networking with Retrofit & RxJava (using zip, flatmap)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#4-networking-with-retrofit--rxjava-using-zip-flatmap) +5. [Two-way data binding for TextViews (using PublishSubject)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#5-two-way-data-binding-for-textviews-using-publishsubject) +6. [Simple and Advanced polling (using interval and repeatWhen)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#6-simple-and-advanced-polling-using-interval-and-repeatwhen) +7. [Simple and Advanced exponential backoff (using delay and retryWhen)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#7-simple-and-advanced-exponential-backoff-using-delay-and-retrywhen) +8. [Form validation (using combineLatest)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#8-form-validation-using-combinelatest) +9. [Pseudo caching : retrieve data first from a cache, then a network call (using concat, concatEager, merge or publish)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#9-pseudo-caching--retrieve-data-first-from-a-cache-then-a-network-call-using-concat-concateager-merge-or-publish) +10. [Simple timing demos (using timer, interval or delay)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#10-simple-timing-demos-using-timer-interval-and-delay) +11. [RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#11-rxbus--event-bus-using-rxjava-using-rxrelay-never-terminating-subjects-and-debouncedbuffer) +12. [Persist data on Activity rotations (using Subjects and retained Fragments)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#12-persist-data-on-activity-rotations-using-subjects-and-retained-fragments) +13. [Networking with Volley](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#13-networking-with-volley) +14. [Pagination with Rx (using Subjects)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#14-pagination-with-rx-using-subjects) +15. [Orchestrating Observable: make parallel network calls, then combine the result into a single data point (using flatmap & zip)](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/README.md#15-orchestrating-observable-make-parallel-network-calls-then-combine-the-result-into-a-single-data-point-using-flatmap--zip) +16. [Simple Timeout example (using timeout)]() + +## Description + +### 1. Background work & concurrency (using Schedulers) A common requirement is to offload lengthy heavy I/O intensive operations to a background thread (non-UI thread) and feed the results back to the UI/main thread, on completion. This is a demo of how long-running operations can be offloaded to a background thread. After the operation is done, we resume back on the main thread. All using RxJava! Think of this as a replacement to AsyncTasks. @@ -15,7 +37,7 @@ The long operation is simulated by a blocking Thread.sleep call (since this is d To really see this example shine. Hit the button multiple times and see how the button click (which is a UI operation) is never blocked because the long operation only runs in the background. -### Accumulate calls (buffer) +### 2. Accumulate calls (using buffer) This is a demo of how events can be accumulated using the "buffer" operation. @@ -27,7 +49,7 @@ Note: If you're looking for a more foolproof solution that accumulates "continuous" taps vs just the number of taps within a time span, look at the [EventBus Demo](https://github.com/kaushikgopal/Android-RxJava/blob/master/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom3Fragment.java) where a combo of the `publish` and `buffer` operators is used. For a more detailed explanation, you can also have a look at this [blog post](http://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/). -### Instant/Auto searching (subject + debounce) +### 3. Instant/Auto searching text listeners (using Subjects & debounce) This is a demo of how events can be swallowed in a way that only the last one is respected. A typical example of this is instant search result boxes. As you type the word "Bruce Lee", you don't want to execute searches for B, Br, Bru, Bruce, Bruce, Bruce L ... etc. But rather intelligently wait for a couple of moments, make sure the user has finished typing the whole word, and then shoot out a single call for "Bruce Lee". @@ -35,38 +57,19 @@ As you type in the input box, it will not shoot out log messages at every single This is the debounce/throttleWithTimeout method in RxJava. -### Retrofit and RxJava (zip, flatmap) +### 4. Networking with Retrofit & RxJava (using zip, flatmap) [Retrofit from Square](http://square.github.io/retrofit/) is an amazing library that helps with easy networking (even if you haven't made the jump to RxJava just yet, you really should check it out). It works even better with RxJava and these are examples hitting the GitHub API, taken straight up from the android demigod-developer Jake Wharton's talk at Netflix. You can [watch the talk](https://www.youtube.com/watch?v=aEuNBk1b5OE#t=2480) at this link. Incidentally, my motivation to use RxJava was from attending this talk at Netflix. -Since it was a presentation, Jake only put up the most important code snippets in [his slides](https://speakerdeck.com/jakewharton/2014-1). Also he uses Java 8 in them, so I flushed those examples out in ~~good~~ old Java 6. (Note: you're most likely to hit the GitHub API quota pretty fast so send in an OAuth-token as a parameter if you want to keep running these examples often). - -### Volley Demo - -[Volley](http://developer.android.com/training/volley/index.html) is another networking library introduced by [Google at IO '13](https://www.youtube.com/watch?v=yhv8l9F44qo). A kind citizen of github contributed this example so we know how to integrate Volley with RxJava. - - -### Orchestrating Observables. Make parallel network calls, then combine the result into a single data point (flatmap + zip) - -The below ascii diagram expresses the intention of our next example with panache. f1,f2,f3,f4,f5 are essentially network calls that when made, give back a result that's needed for a future calculation. +(Note: you're most likely to hit the GitHub API quota pretty fast so send in an OAuth-token as a parameter if you want to keep running these examples often). - - (flatmap) - f1 ___________________ f3 _______ - (flatmap) | (zip) - f2 ___________________ f4 _______| ___________ final output - \ | - \____________ f5 _______| - -The code for this example has already been written by one Mr.skehlet in the interwebs. Head over to [the gist](https://gist.github.com/skehlet/9418379) for the code. It's written in pure Java (6) so it's pretty comprehensible if you've understood the previous examples. I'll flush it out here again when time permits or I've run out of other compelling examples. - -### Double binding with TextViews +### 5. Two-way data binding for TextViews (using PublishSubject) Auto-updating views are a pretty cool thing. If you've dealt with Angular JS before, they have a pretty nifty concept called "two-way data binding", so when an HTML element is bound to a model/entity object, it constantly "listens" to changes on that entity and auto-updates its state based on the model. Using the technique in this example, you could potentially use a pattern like the [Presentation View Model pattern](http://martinfowler.com/eaaDev/PresentationModel.html) with great ease. While the example here is pretty rudimentary, the technique used to achieve the double binding using a `Publish Subject` is much more interesting. -### Polling with Schedulers +### 6. Simple and Advanced polling (using interval and repeatWhen) This is an example of polling using RxJava Schedulers. This is useful in cases, where you want to constantly poll a server and possibly get new data. The network call is "simulated" so it forces a delay before return a resultant string. @@ -81,15 +84,33 @@ Instead of using a RetryWithDelay, we use a RepeatWithDelay here. To understand An alternative approach to delayed polling without the use of `repeatWhen` would be using chained nested delay observables. See [startExecutingWithExponentialBackoffDelay in the ExponentialBackOffFragment example](https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/src/main/java/com/morihacky/android/rxjava/fragments/ExponentialBackoffFragment.java#L111). -### RxBus - An event bus using RxJava + DebouncedBuffer +### 7. Simple and Advanced exponential backoff (using delay and retryWhen) -Have a look at the accompanying blog posts for details on this demo: +[Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) is a strategy where based on feedback from a certain output, we alter the rate of a process (usually reducing the number of retries or increasing the wait time before retrying or re-executing a certain process). -1. [Implementing an event bus with RxJava](http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/) -2. [DebouncedBuffer used for the fancier variant of the demo](http://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/) -3. [share/publish/refcount](http://blog.kaush.co/2015/01/21/rxjava-tip-for-the-day-share-publish-refcount-and-all-that-jazz/) +The concept makes more sense with examples. RxJava makes it (relatively) simple to implement such a strategy. My thanks to [Mike](https://twitter.com/m_evans10) for suggesting the idea. -### Form validation - using [`.combineLatest`](http://reactivex.io/documentation/operators/combinelatest.html) +#### Retry (if error) with exponential backoff + +Say you have a network failure. A sensible strategy would be to NOT keep retrying your network call every 1 second. It would be smart instead (nay... elegant!) to retry with increasing delays. So you try at second 1 to execute the network call, no dice? try after 10 seconds... negatory? try after 20 seconds, no cookie? try after 1 minute. If this thing is still failing, you got to give up on the network yo! + +We simulate this behaviour using RxJava with the [`retryWhen` operator](http://reactivex.io/documentation/operators/retry.html). + +`RetryWithDelay` code snippet courtesy: + +* http://stackoverflow.com/a/25292833/159825 +* Another excellent implementation via @[sddamico](https://github.com/sddamico) : https://gist.github.com/sddamico/c45d7cdabc41e663bea1 + +Also look at the [Polling example](https://github.com/kaushikgopal/RxJava-Android-Samples#polling-with-schedulers) where we use a very similar Exponential backoff mechanism. + +#### "Repeat" with exponential backoff + +Another variant of the exponential backoff strategy is to execute an operation for a given number of times but with delayed intervals. So you execute a certain operation 1 second from now, then you execute it again 10 seconds from now, then you execute the operation 20 seconds from now. After a grand total of 3 times you stop executing. + +Simulating this behavior is actually way more simpler than the prevoius retry mechanism. You can use a variant of the `delay` operator to achieve this. + + +### 8. Form validation (using [`.combineLatest`](http://reactivex.io/documentation/operators/combinelatest.html)) Thanks to Dan Lew for giving me this idea in the [fragmented podcast - episode #5](http://fragmentedpodcast.com/episodes/4/) (around the 4:30 mark). @@ -101,7 +122,8 @@ Note that the `Func3` function that checks for validity, kicks in only after ALL The value of this technique becomes more apparent when you have more number of input fields in a form. Handling it otherwise with a bunch of booleans makes the code cluttered and kind of difficult to follow. But using `.combineLatest` all that logic is concentrated in a nice compact block of code (I still use booleans but that was to make the example more readable). -### Retrieve data first from a cache, then a network call + +### 9. Pseudo caching : retrieve data first from a cache, then a network call (using concat, concatEager, merge or publish) We have two source Observables: a disk (fast) cache and a network (fresh) call. Typically the disk Observable is much faster than the network Observable. But in order to demonstrate the working, we've also used a fake "slower" disk cache just to see how the operators behave. @@ -126,7 +148,7 @@ To solve this problem you can use merge in combination with the super nifty `pub Previously, I was using the `merge` operator but overcoming the problem of results being overwritten by monitoring the "resultAge". See the old `PseudoCacheMergeFragment` example if you're curious to see this old implementation. -### Simple Timing demos using timer/interval/delay +### 10. Simple timing demos (using timer, interval and delay) This is a super simple and straightforward example which shows you how to use RxJava's `timer`, `interval` and `delay` operators to handle a bunch of cases where you want to run a task at specific intervals. Basically say NO to Android `TimerTask`s. @@ -138,32 +160,15 @@ Cases demonstrated here: 4. run a task constantly every 3s, but after running it 5 times, terminate automatically 5. run a task A, pause for sometime, then execute Task B, then terminate -### Exponential backoff +### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer) -[Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) is a strategy where based on feedback from a certain output, we alter the rate of a process (usually reducing the number of retries or increasing the wait time before retrying or re-executing a certain process). +There are accompanying blog posts that do a much better job of explaining the details on this demo: -The concept makes more sense with examples. RxJava makes it (relatively) simple to implement such a strategy. My thanks to [Mike](https://twitter.com/m_evans10) for suggesting the idea. - -#### Retry (if error) with exponential backoff - -Say you have a network failure. A sensible strategy would be to NOT keep retrying your network call every 1 second. It would be smart instead (nay... elegant!) to retry with increasing delays. So you try at second 1 to execute the network call, no dice? try after 10 seconds... negatory? try after 20 seconds, no cookie? try after 1 minute. If this thing is still failing, you got to give up on the network yo! - -We simulate this behaviour using RxJava with the [`retryWhen` operator](http://reactivex.io/documentation/operators/retry.html). - -`RetryWithDelay` code snippet courtesy: - -* http://stackoverflow.com/a/25292833/159825 -* Another excellent implementation via @[sddamico](https://github.com/sddamico) : https://gist.github.com/sddamico/c45d7cdabc41e663bea1 - -Also look at the [Polling example](https://github.com/kaushikgopal/RxJava-Android-Samples#polling-with-schedulers) where we use a very similar Exponential backoff mechanism. - -#### "Repeat" with exponential backoff - -Another variant of the exponential backoff strategy is to execute an operation for a given number of times but with delayed intervals. So you execute a certain operation 1 second from now, then you execute it again 10 seconds from now, then you execute the operation 20 seconds from now. After a grand total of 3 times you stop executing. - -Simulating this behavior is actually way more simpler than the prevoius retry mechanism. You can use a variant of the `delay` operator to achieve this. +1. [Implementing an event bus with RxJava](http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/) +2. [DebouncedBuffer used for the fancier variant of the demo](http://blog.kaush.co/2015/01/05/debouncedbuffer-with-rxjava/) +3. [share/publish/refcount](http://blog.kaush.co/2015/01/21/rxjava-tip-for-the-day-share-publish-refcount-and-all-that-jazz/) -### Rotation Persist +### 12. Persist data on Activity rotations (using Subjects and retained Fragments) A common question that's asked when using RxJava in Android is, "how do i resume the work of an observable if a configuration change occurs (activity rotation, language locale change etc.)?". @@ -177,7 +182,12 @@ I have since rewritten this example using an alternative approach. While the [`C I wrote [another blog post](https://tech.instacart.com/how-to-think-about-subjects-part-1/) on how to think about Subjects where I go into some specifics. -### Pagination + +### 13. Networking with Volley + +[Volley](http://developer.android.com/training/volley/index.html) is another networking library introduced by [Google at IO '13](https://www.youtube.com/watch?v=yhv8l9F44qo). A kind citizen of github contributed this example so we know how to integrate Volley with RxJava. + +### 14. Pagination with Rx (using Subjects) I leverage the simple use of a Subject here. Honestly, if you don't have your items coming down via an `Observable` already (like through Retrofit or a network request), there's no good reason to use Rx and complicate things. @@ -191,10 +201,34 @@ Here are some other fancy implementations (while i enjoyed reading them, i didn' * [Eugene's very comprehensive Pagination sample](https://github.com/matzuk/PaginationSample) * [Recursive Paging example](http://stackoverflow.com/questions/28047272/handle-paging-with-rxjava) +### 15. Orchestrating Observable: make parallel network calls, then combine the result into a single data point (using flatmap & zip) + +The below ascii diagram expresses the intention of our next example with panache. f1,f2,f3,f4,f5 are essentially network calls that when made, give back a result that's needed for a future calculation. + + + (flatmap) + f1 ___________________ f3 _______ + (flatmap) | (zip) + f2 ___________________ f4 _______| ___________ final output + \ | + \____________ f5 _______| + +The code for this example has already been written by one Mr.skehlet in the interwebs. Head over to [the gist](https://gist.github.com/skehlet/9418379) for the code. It's written in pure Java (6) so it's pretty comprehensible if you've understood the previous examples. I'll flush it out here again when time permits or I've run out of other compelling examples. + +### 16. Simple Timeout example (using timeout) + +This is a simple example demonstrating the use of the `.timeout` operator. Button 1 will complete the task before the timeout constraint, while Button 2 will force a timeout error. + +Notice how we can provide a custom Observable that indicates how to react under a timeout Exception. + +## Rx 2.x + +All the examples here have been migrated to use RxJava 2.X. -## Work in Progress: +* Have a look at [PR #83 to see the diff of changes between RxJava 1 and 2](https://github.com/kaushikgopal/RxJava-Android-Samples/pull/83/files) +* [What's different in Rx 2.x](https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0) -Examples that I would like to have here, but haven't found the time yet to flush out. +We use [David Karnok's Interop library](https://github.com/akarnokd/RxJava2Interop) in some cases as certain libraries like RxBindings, RxRelays, RxJava-Math etc. have not been ported yet to 2.x. ## Contributing: diff --git a/app/build.gradle b/app/build.gradle index 23136ce0..9d4306aa 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -29,23 +29,34 @@ dependencies { compile "com.android.support:appcompat-v7:${supportLibVersion}" compile "com.android.support:recyclerview-v7:${supportLibVersion}" - compile 'io.reactivex:rxandroid:1.2.0' - // Because RxAndroid releases are few and far between, it is recommended you also - // explicitly depend on RxJava's latest version for bug fixes and new features. - compile 'io.reactivex:rxjava:1.1.4' - compile 'io.reactivex:rxjava-math:1.0.0' compile 'com.github.kaushikgopal:CoreTextUtils:c703fa12b6' - compile 'com.jakewharton.rxbinding:rxbinding:0.2.0' compile 'com.jakewharton:butterknife:7.0.1' - compile 'com.jakewharton.rxrelay:rxrelay:1.2.0' compile 'com.jakewharton.timber:timber:2.4.2' compile "com.squareup.retrofit2:retrofit:${retrofitVersion}" - compile "com.squareup.retrofit2:adapter-rxjava:${retrofitVersion}" compile "com.squareup.retrofit2:converter-gson:${retrofitVersion}" compile "com.squareup.okhttp3:okhttp:${okhttpVersion}" compile "com.squareup.okhttp3:okhttp-urlconnection:${okhttpVersion}" compile 'com.mcxiaoke.volley:library:1.0.19' + // ---------------------------------- + // Rx dependencies + + compile 'io.reactivex.rxjava2:rxjava:2.0.1' + + // Because RxAndroid releases are few and far between, it is recommended you also + // explicitly depend on RxJava's latest version for bug fixes and new features. + compile 'io.reactivex.rxjava2:rxandroid:2.0.1' + + // libs like rxbinding + rxjava-math haven't been ported to RxJava 2.x yet, so this helps + compile "com.github.akarnokd:rxjava2-interop:0.6.1" + + compile 'io.reactivex:rxjava-math:1.0.0' + compile 'com.jakewharton.rxrelay:rxrelay:1.2.0' + compile 'com.jakewharton.rxbinding:rxbinding:0.2.0' + compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0' + + // ---------------------------------- + debugCompile 'com.squareup.leakcanary:leakcanary-android:1.3' releaseCompile 'com.squareup.leakcanary:leakcanary-android-no-op:1.3' } @@ -71,4 +82,8 @@ android { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } + + packagingOptions { + pickFirst 'META-INF/rxjava.properties' + } } \ No newline at end of file diff --git a/app/src/main/java/com/morihacky/android/rxjava/MainActivity.java b/app/src/main/java/com/morihacky/android/rxjava/MainActivity.java index 41bc79ae..59715261 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/MainActivity.java +++ b/app/src/main/java/com/morihacky/android/rxjava/MainActivity.java @@ -13,15 +13,6 @@ public class MainActivity private RxBus _rxBus = null; - // This is better done with a DI Library like Dagger - public RxBus getRxBusSingleton() { - if (_rxBus == null) { - _rxBus = new RxBus(); - } - - return _rxBus; - } - @Override public void onBackPressed() { super.onBackPressed(); @@ -34,24 +25,37 @@ protected void onCreate(Bundle savedInstanceState) { if (savedInstanceState == null) { getSupportFragmentManager().beginTransaction() - .replace(android.R.id.content, new MainFragment(), this.toString()) - .commit(); + .replace(android.R.id.content, new MainFragment(), this.toString()) + .commit(); } } + // This is better done with a DI Library like Dagger + public RxBus getRxBusSingleton() { + if (_rxBus == null) { + _rxBus = new RxBus(); + } + + return _rxBus; + } + private void _removeWorkerFragments() { - Fragment frag = getSupportFragmentManager()// - .findFragmentByTag(RotationPersist1WorkerFragment.class.getName()); + Fragment frag = getSupportFragmentManager().findFragmentByTag(RotationPersist1WorkerFragment.class.getName()); if (frag != null) { - getSupportFragmentManager().beginTransaction().remove(frag).commit(); + getSupportFragmentManager() + .beginTransaction() + .remove(frag) + .commit(); } - frag = getSupportFragmentManager()// - .findFragmentByTag(RotationPersist2WorkerFragment.class.getName()); + frag = getSupportFragmentManager().findFragmentByTag(RotationPersist2WorkerFragment.class.getName()); if (frag != null) { - getSupportFragmentManager().beginTransaction().remove(frag).commit(); + getSupportFragmentManager() + .beginTransaction() + .remove(frag) + .commit(); } } } \ No newline at end of file diff --git a/app/src/main/java/com/morihacky/android/rxjava/MyApp.java b/app/src/main/java/com/morihacky/android/rxjava/MyApp.java index 2a8fdc17..899b56d6 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/MyApp.java +++ b/app/src/main/java/com/morihacky/android/rxjava/MyApp.java @@ -1,7 +1,6 @@ package com.morihacky.android.rxjava; import android.app.Application; - import com.morihacky.android.rxjava.volley.MyVolley; import com.squareup.leakcanary.LeakCanary; import com.squareup.leakcanary.RefWatcher; @@ -28,6 +27,9 @@ public void onCreate() { _instance = (MyApp) getApplicationContext(); _refWatcher = LeakCanary.install(this); + // for better RxJava debugging + //RxJavaHooks.enableAssemblyTracking(); + // Initialize Volley MyVolley.init(this); diff --git a/app/src/main/java/com/morihacky/android/rxjava/RxUtils.java b/app/src/main/java/com/morihacky/android/rxjava/RxUtils.java deleted file mode 100644 index c32d0537..00000000 --- a/app/src/main/java/com/morihacky/android/rxjava/RxUtils.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.morihacky.android.rxjava; - -import rx.Subscription; - -public class RxUtils { - - public static void unsubscribeIfNotNull(Subscription subscription) { - if (subscription != null) { - subscription.unsubscribe(); - } - } -} diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/BufferDemoFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/BufferDemoFragment.java index d52e3571..445aa735 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/BufferDemoFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/BufferDemoFragment.java @@ -20,9 +20,10 @@ import butterknife.Bind; import butterknife.ButterKnife; -import rx.Observer; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; +import hu.akarnokd.rxjava.interop.RxJavaInterop; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.Disposable; +import io.reactivex.observers.DisposableObserver; import timber.log.Timber; /** @@ -48,18 +49,18 @@ public class BufferDemoFragment private LogAdapter _adapter; private List _logs; - private Subscription _subscription; + private Disposable _disposable; @Override public void onResume() { super.onResume(); - _subscription = _getBufferedSubscription(); + _disposable = _getBufferedDisposable(); } @Override public void onPause() { super.onPause(); - _subscription.unsubscribe(); + _disposable.dispose(); } @Override @@ -85,8 +86,8 @@ public View onCreateView(LayoutInflater inflater, // ----------------------------------------------------------------------------------- // Main Rx entities - private Subscription _getBufferedSubscription() { - return RxView.clickEvents(_tapBtn) + private Disposable _getBufferedDisposable() { + return RxJavaInterop.toV2Observable(RxView.clickEvents(_tapBtn)) .map(onClickEvent -> { Timber.d("--------- GOT A TAP"); _log("GOT A TAP"); @@ -94,10 +95,10 @@ private Subscription _getBufferedSubscription() { }) .buffer(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Observer>() { + .subscribeWith(new DisposableObserver>() { @Override - public void onCompleted() { + public void onComplete() { // fyi: you'll never reach here Timber.d("----- onCompleted"); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/ConcurrencyWithSchedulersDemoFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/ConcurrencyWithSchedulersDemoFragment.java index dacfaa0b..f97194d8 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/ConcurrencyWithSchedulersDemoFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/ConcurrencyWithSchedulersDemoFragment.java @@ -15,14 +15,13 @@ import butterknife.ButterKnife; import butterknife.OnClick; import com.morihacky.android.rxjava.R; +import io.reactivex.Observable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.observers.DisposableObserver; +import io.reactivex.schedulers.Schedulers; import java.util.ArrayList; import java.util.List; -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; -import rx.subscriptions.CompositeSubscription; import timber.log.Timber; public class ConcurrencyWithSchedulersDemoFragment @@ -33,13 +32,13 @@ public class ConcurrencyWithSchedulersDemoFragment private LogAdapter _adapter; private List _logs; - private CompositeSubscription _subscriptions = new CompositeSubscription(); + private CompositeDisposable _disposables = new CompositeDisposable(); @Override public void onDestroy() { super.onDestroy(); ButterKnife.unbind(this); - _subscriptions.clear(); + _disposables.clear(); } @Override @@ -63,12 +62,14 @@ public void startLongOperation() { _progress.setVisibility(View.VISIBLE); _log("Button Clicked"); - Subscription s = _getObservable()// + DisposableObserver d = _getDisposableObserver(); + + _getObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(_getObserver()); // Observer + .subscribe(d); - _subscriptions.add(s); + _disposables.add(d); } private Observable _getObservable() { @@ -86,11 +87,11 @@ private Observable _getObservable() { * 2. onError * 3. onNext */ - private Observer _getObserver() { - return new Observer() { + private DisposableObserver _getDisposableObserver() { + return new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { _log("On complete"); _progress.setVisibility(View.INVISIBLE); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/DebounceSearchEmitterFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/DebounceSearchEmitterFragment.java index aa105020..9d2a3ba6 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/DebounceSearchEmitterFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/DebounceSearchEmitterFragment.java @@ -23,9 +23,10 @@ import butterknife.Bind; import butterknife.ButterKnife; import butterknife.OnClick; -import rx.Observer; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; +import hu.akarnokd.rxjava.interop.RxJavaInterop; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.Disposable; +import io.reactivex.observers.DisposableObserver; import timber.log.Timber; import static co.kaush.core.util.CoreNullnessUtils.isNotNullOrEmpty; @@ -40,12 +41,12 @@ public class DebounceSearchEmitterFragment private LogAdapter _adapter; private List _logs; - private Subscription _subscription; + private Disposable _disposable; @Override public void onDestroy() { super.onDestroy(); - _subscription.unsubscribe(); + _disposable.dispose(); ButterKnife.unbind(this); } @@ -70,20 +71,20 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { super.onActivityCreated(savedInstanceState); _setupLogger(); - _subscription = RxTextView.textChangeEvents(_inputSearchText) + _disposable = RxJavaInterop.toV2Observable(RxTextView.textChangeEvents(_inputSearchText)) .debounce(400, TimeUnit.MILLISECONDS)// default Scheduler is Computation .filter(changes -> isNotNullOrEmpty(_inputSearchText.getText().toString())) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(_getSearchObserver()); + .subscribeWith(_getSearchObserver()); } // ----------------------------------------------------------------------------------- // Main Rx entities - private Observer _getSearchObserver() { - return new Observer() { + private DisposableObserver _getSearchObserver() { + return new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("--------- onComplete"); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/DoubleBindingTextViewFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/DoubleBindingTextViewFragment.java index a79fb2b5..d17b1e3c 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/DoubleBindingTextViewFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/DoubleBindingTextViewFragment.java @@ -7,14 +7,13 @@ import android.view.ViewGroup; import android.widget.EditText; import android.widget.TextView; - -import com.morihacky.android.rxjava.R; - import butterknife.Bind; import butterknife.ButterKnife; import butterknife.OnTextChanged; -import rx.Subscription; -import rx.subjects.PublishSubject; +import com.morihacky.android.rxjava.R; +import io.reactivex.disposables.Disposable; +import io.reactivex.processors.PublishProcessor; + import static android.text.TextUtils.isEmpty; @@ -25,8 +24,8 @@ public class DoubleBindingTextViewFragment @Bind(R.id.double_binding_num2) EditText _number2; @Bind(R.id.double_binding_result) TextView _result; - Subscription _subscription; - PublishSubject _resultEmitterSubject; + Disposable _disposable; + PublishProcessor _resultEmitterSubject; @Override public View onCreateView(LayoutInflater inflater, @@ -35,12 +34,11 @@ public View onCreateView(LayoutInflater inflater, View layout = inflater.inflate(R.layout.fragment_double_binding_textview, container, false); ButterKnife.bind(this, layout); - _resultEmitterSubject = PublishSubject.create(); - _subscription = _resultEmitterSubject// - .asObservable()// - .subscribe(aFloat -> { - _result.setText(String.valueOf(aFloat)); - }); + _resultEmitterSubject = PublishProcessor.create(); + + _disposable = _resultEmitterSubject.subscribe(aFloat -> { + _result.setText(String.valueOf(aFloat)); + }); onNumberChanged(); _number2.requestFocus(); @@ -67,7 +65,7 @@ public void onNumberChanged() { @Override public void onDestroyView() { super.onDestroyView(); - _subscription.unsubscribe(); + _disposable.dispose(); ButterKnife.unbind(this); } } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/ExponentialBackoffFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/ExponentialBackoffFragment.java index 8d60342b..9b6de603 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/ExponentialBackoffFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/ExponentialBackoffFragment.java @@ -7,25 +7,24 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.wiring.LogAdapter; - +import hu.akarnokd.rxjava.interop.RxJavaInterop; +import io.reactivex.Flowable; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.functions.Function; +import io.reactivex.subscribers.DisposableSubscriber; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.Observer; -import rx.Subscriber; -import rx.functions.Func1; +import org.reactivestreams.Publisher; import rx.observables.MathObservable; -import rx.subscriptions.CompositeSubscription; import timber.log.Timber; + import static android.os.Looper.getMainLooper; public class ExponentialBackoffFragment @@ -33,16 +32,9 @@ public class ExponentialBackoffFragment @Bind(R.id.list_threading_log) ListView _logList; private LogAdapter _adapter; + private CompositeDisposable _disposables = new CompositeDisposable(); private List _logs; - private CompositeSubscription _subscriptions = new CompositeSubscription(); - - @Override - public void onActivityCreated(@Nullable Bundle savedInstanceState) { - super.onActivityCreated(savedInstanceState); - _setupLogger(); - } - @Override public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @@ -52,11 +44,17 @@ public View onCreateView(LayoutInflater inflater, return layout; } + @Override + public void onActivityCreated(@Nullable Bundle savedInstanceState) { + super.onActivityCreated(savedInstanceState); + _setupLogger(); + } + @Override public void onPause() { super.onPause(); - _subscriptions.clear(); + _disposables.clear(); } @Override @@ -72,27 +70,30 @@ public void startRetryingWithExponentialBackoffStrategy() { _logs = new ArrayList<>(); _adapter.clear(); - _subscriptions.add(// - Observable// - .error(new RuntimeException("testing")) // always fails - .retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext values sent are ignored) - .doOnSubscribe(() -> _log("Attempting the impossible 5 times in intervals of 1s"))// - .subscribe(new Observer() { - @Override - public void onCompleted() { - Timber.d("on Completed"); - } - - @Override - public void onError(Throwable e) { - _log("Error: I give up!"); - } - - @Override - public void onNext(Object aVoid) { - Timber.d("on Next"); - } - })); + DisposableSubscriber disposableSubscriber = new DisposableSubscriber() { + @Override + public void onNext(Object aVoid) { + Timber.d("on Next"); + } + + @Override + public void onComplete() { + Timber.d("on Completed"); + } + + @Override + public void onError(Throwable e) { + _log("Error: I give up!"); + } + }; + + Flowable.error(new RuntimeException("testing")) // always fails + .retryWhen(new RetryWithDelay(5, 1000)) // notice this is called only onError (onNext + // values sent are ignored) + .doOnSubscribe(subscription -> _log("Attempting the impossible 5 times in intervals of 1s")) + .subscribe(disposableSubscriber); + + _disposables.add(disposableSubscriber); } @OnClick(R.id.btn_eb_delay) @@ -101,41 +102,41 @@ public void startExecutingWithExponentialBackoffDelay() { _logs = new ArrayList<>(); _adapter.clear(); - _subscriptions.add(// - - Observable.range(1, 4)// - .delay(integer -> { - // Rx-y way of doing the Fibonnaci :P - return MathObservable// - .sumInteger(Observable.range(1, integer)) - .flatMap(targetSecondDelay -> Observable.just(integer) - .delay(targetSecondDelay, TimeUnit.SECONDS)); - })// - .doOnSubscribe(() -> - _log(String.format("Execute 4 tasks with delay - time now: [xx:%02d]", - _getSecondHand())))// - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - Timber.d("onCompleted"); - _log("Completed"); - } - - @Override - public void onError(Throwable e) { - Timber.d(e, "arrrr. Error"); - _log("Error"); - } - - @Override - public void onNext(Integer integer) { - Timber.d("executing Task %d [xx:%02d]", integer, _getSecondHand()); - _log(String.format("executing Task %d [xx:%02d]", - integer, - _getSecondHand())); - - } - })); + DisposableSubscriber disposableSubscriber = new DisposableSubscriber() { + @Override + public void onNext(Integer integer) { + Timber.d("executing Task %d [xx:%02d]", integer, _getSecondHand()); + _log(String.format("executing Task %d [xx:%02d]", integer, _getSecondHand())); + } + + @Override + public void onError(Throwable e) { + Timber.d(e, "arrrr. Error"); + _log("Error"); + } + + @Override + public void onComplete() { + Timber.d("onCompleted"); + _log("Completed"); + } + }; + + Flowable + .range(1, 4) + .delay(integer -> { + // Rx-y way of doing the Fibonnaci :P + return RxJavaInterop + .toV2Flowable(MathObservable.sumInteger(rx.Observable.range(1, integer))) + .flatMap(targetSecondDelay -> Flowable + .just(integer) + .delay(targetSecondDelay, TimeUnit.SECONDS)); + }) + .doOnSubscribe(s -> _log(String.format("Execute 4 tasks with delay - time now: [xx:%02d]", + _getSecondHand()))) + .subscribe(disposableSubscriber); + + _disposables.add(disposableSubscriber); } // ----------------------------------------------------------------------------------- @@ -176,7 +177,7 @@ private void _log(String logMsg) { //public static class RetryWithDelay public class RetryWithDelay - implements Func1, Observable> { + implements Function, Publisher> { private final int _maxRetries; private final int _retryDelayMillis; @@ -193,14 +194,14 @@ public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { // only onNext triggers a re-subscription (onError + onComplete kills it) @Override - public Observable call(Observable inputObservable) { + public Publisher apply(Flowable inputObservable) { // it is critical to use inputObservable in the chain for the result // ignoring it and doing your own thing will break the sequence - return inputObservable.flatMap(new Func1>() { + return inputObservable.flatMap(new Function>() { @Override - public Observable call(Throwable throwable) { + public Publisher apply(Throwable throwable) { if (++_retryCount < _maxRetries) { // When this Observable calls onNext, the original @@ -209,15 +210,14 @@ public Observable call(Throwable throwable) { Timber.d("Retrying in %d ms", _retryCount * _retryDelayMillis); _log(String.format("Retrying in %d ms", _retryCount * _retryDelayMillis)); - return Observable.timer(_retryCount * _retryDelayMillis, - TimeUnit.MILLISECONDS); + return Flowable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS); } Timber.d("Argh! i give up"); // Max retries hit. Pass an error so the chain is forcibly completed // only onNext triggers a re-subscription (onError + onComplete kills it) - return Observable.error(throwable); + return Flowable.error(throwable); } }); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/FormValidationCombineLatestFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/FormValidationCombineLatestFragment.java index 1a3157bf..3a985fcc 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/FormValidationCombineLatestFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/FormValidationCombineLatestFragment.java @@ -7,17 +7,16 @@ import android.view.ViewGroup; import android.widget.EditText; import android.widget.TextView; - -import com.jakewharton.rxbinding.widget.RxTextView; -import com.morihacky.android.rxjava.R; - import butterknife.Bind; import butterknife.ButterKnife; -import rx.Observable; -import rx.Observer; -import rx.Subscription; +import com.jakewharton.rxbinding.widget.RxTextView; +import com.morihacky.android.rxjava.R; +import hu.akarnokd.rxjava.interop.RxJavaInterop; +import io.reactivex.Flowable; +import io.reactivex.subscribers.DisposableSubscriber; import timber.log.Timber; + import static android.text.TextUtils.isEmpty; import static android.util.Patterns.EMAIL_ADDRESS; @@ -29,24 +28,27 @@ public class FormValidationCombineLatestFragment @Bind(R.id.demo_combl_password) EditText _password; @Bind(R.id.demo_combl_num) EditText _number; - private Observable _emailChangeObservable; - private Observable _passwordChangeObservable; - private Observable _numberChangeObservable; - - private Subscription _subscription = null; + private DisposableSubscriber _disposableObserver = null; + private Flowable _emailChangeObservable; + private Flowable _numberChangeObservable; + private Flowable _passwordChangeObservable; @Override public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) { - View layout = inflater.inflate(R.layout.fragment_form_validation_comb_latest, - container, - false); + View layout = inflater.inflate(R.layout.fragment_form_validation_comb_latest, container, false); ButterKnife.bind(this, layout); - _emailChangeObservable = RxTextView.textChanges(_email).skip(1); - _passwordChangeObservable = RxTextView.textChanges(_password).skip(1); - _numberChangeObservable = RxTextView.textChanges(_number).skip(1); + _emailChangeObservable = RxJavaInterop.toV2Flowable(RxTextView + .textChanges(_email) + .skip(1)); + _passwordChangeObservable = RxJavaInterop.toV2Flowable(RxTextView + .textChanges(_password) + .skip(1)); + _numberChangeObservable = RxJavaInterop.toV2Flowable(RxTextView + .textChanges(_number) + .skip(1)); _combineLatestEvents(); @@ -57,57 +59,63 @@ public View onCreateView(LayoutInflater inflater, public void onDestroyView() { super.onDestroyView(); ButterKnife.unbind(this); - _subscription.unsubscribe(); + _disposableObserver.dispose(); } private void _combineLatestEvents() { - _subscription = Observable.combineLatest(_emailChangeObservable, - _passwordChangeObservable, - _numberChangeObservable, - (newEmail, newPassword, newNumber) -> { - - boolean emailValid = !isEmpty(newEmail) && - EMAIL_ADDRESS.matcher(newEmail).matches(); - if (!emailValid) { - _email.setError("Invalid Email!"); - } - - boolean passValid = !isEmpty(newPassword) && newPassword.length() > 8; - if (!passValid) { - _password.setError("Invalid Password!"); - } - - boolean numValid = !isEmpty(newNumber); - if (numValid) { - int num = Integer.parseInt(newNumber.toString()); - numValid = num > 0 && num <= 100; - } - if (!numValid) { - _number.setError("Invalid Number!"); - } - - return emailValid && passValid && numValid; - - })// - .subscribe(new Observer() { - @Override - public void onCompleted() { - Timber.d("completed"); - } - - @Override - public void onError(Throwable e) { - Timber.e(e, "there was an error"); - } - - @Override - public void onNext(Boolean formValid) { - if (formValid) { - _btnValidIndicator.setBackgroundColor(getResources().getColor(R.color.blue)); - } else { - _btnValidIndicator.setBackgroundColor(getResources().getColor(R.color.gray)); - } - } - }); + + _disposableObserver = new DisposableSubscriber() { + @Override + public void onNext(Boolean formValid) { + if (formValid) { + _btnValidIndicator.setBackgroundColor(getResources().getColor(R.color.blue)); + } + else { + _btnValidIndicator.setBackgroundColor(getResources().getColor(R.color.gray)); + } + } + + @Override + public void onError(Throwable e) { + Timber.e(e, "there was an error"); + } + + @Override + public void onComplete() { + Timber.d("completed"); + } + }; + + Flowable + .combineLatest(_emailChangeObservable, + _passwordChangeObservable, + _numberChangeObservable, + (newEmail, newPassword, newNumber) -> { + + boolean emailValid = !isEmpty(newEmail) && + EMAIL_ADDRESS + .matcher(newEmail) + .matches(); + if (!emailValid) { + _email.setError("Invalid Email!"); + } + + boolean passValid = !isEmpty(newPassword) && newPassword.length() > 8; + if (!passValid) { + _password.setError("Invalid Password!"); + } + + boolean numValid = !isEmpty(newNumber); + if (numValid) { + int num = Integer.parseInt(newNumber.toString()); + numValid = num > 0 && num <= 100; + } + if (!numValid) { + _number.setError("Invalid Number!"); + } + + return emailValid && passValid && numValid; + }) + .subscribe(_disposableObserver); } } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java index 02ed2046..07b974a2 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java @@ -82,6 +82,11 @@ void demoTimerIntervalDelays() { clickedOn(new TimingDemoFragment()); } + @OnClick(R.id.btn_demo_timeout) + void demoTimeout() { + clickedOn(new TimeoutDemoFragment()); + } + @OnClick(R.id.btn_demo_exponential_backoff) void demoExponentialBackoff() { clickedOn(new ExponentialBackoffFragment()); diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.java index 730b3e01..21f61fe5 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/PlaygroundFragment.java @@ -11,22 +11,17 @@ import android.widget.ArrayAdapter; import android.widget.ListView; import android.widget.ProgressBar; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; -import com.morihacky.android.rxjava.RxUtils; - +import io.reactivex.Observable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.schedulers.Schedulers; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; import timber.log.Timber; public class PlaygroundFragment @@ -36,21 +31,13 @@ public class PlaygroundFragment @Bind(R.id.list_threading_log) ListView _logsList; private LogAdapter _adapter; - private List _logs; - private Subscription _subscription; private int _attempt = 0; + private List _logs; @Override public void onDestroy() { super.onDestroy(); ButterKnife.unbind(this); - RxUtils.unsubscribeIfNotNull(_subscription); - } - - @Override - public void onActivityCreated(@Nullable Bundle savedInstanceState) { - super.onActivityCreated(savedInstanceState); - _setupLogger(); } @Override @@ -62,41 +49,42 @@ public View onCreateView(LayoutInflater inflater, return layout; } + @Override + public void onActivityCreated(@Nullable Bundle savedInstanceState) { + super.onActivityCreated(savedInstanceState); + _setupLogger(); + } + @OnClick(R.id.btn_start_operation) public void startOperation() { _logs.clear(); _log("Button Clicked"); - _subscription =// - Observable// - .from(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"))// -// .from(Arrays.asList("a", "b", "c", "d"))// - .flatMap(s ->// - Observable// - .just(s)// - .onBackpressureBuffer(3) - .flatMap(s1 -> { - - _log(s1 + "start"); - - if (s1.equalsIgnoreCase("b") && _attempt < 5) { - _attempt++; - return Observable.error(new Throwable("b can't be processed ("+(_attempt - 1)+")")); - } - - if (s1.equalsIgnoreCase("c") || s1.equalsIgnoreCase("f")) { - return Observable.just(s1); - } else { - return Observable.timer(2, TimeUnit.SECONDS).map(l -> s1); - } - }) - .retryWhen(source -> source.delay(8, TimeUnit.SECONDS)) - /*, 3*/) - .doOnNext(s -> _log(s + "stop")) - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .subscribe(); + Observable.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"))// + .flatMap(s1 -> { + + _log(s1 + "start"); + + if (s1.equalsIgnoreCase("b") && _attempt < 5) { + _attempt++; + return Observable.error(new Throwable("b can't be processed (" + (_attempt - 1) + ")")); + } + + if (s1.equalsIgnoreCase("c") || s1.equalsIgnoreCase("f")) { + return Observable.just(s1); + } + else { + return Observable + .timer(2, TimeUnit.SECONDS) + .map(l -> s1); + } + }) + .retryWhen(source -> source.delay(8, TimeUnit.SECONDS)) + .doOnNext(s -> _log(s + "stop")) + .subscribeOn(Schedulers.io()) + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(); } // ----------------------------------------------------------------------------------- @@ -118,7 +106,8 @@ private void _log(String logMsg) { _logs.add(0, logMsg + " (main thread) "); _adapter.clear(); _adapter.addAll(_logs); - } else { + } + else { _logs.add(0, logMsg + " (NOT main thread) "); // You can only do below stuff on main thread. diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/PollingFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/PollingFragment.java index 3f4bb82a..9f4398a9 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/PollingFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/PollingFragment.java @@ -10,20 +10,19 @@ import android.view.ViewGroup; import android.widget.ArrayAdapter; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; - +import io.reactivex.Flowable; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Function; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.functions.Func1; -import rx.subscriptions.CompositeSubscription; +import org.reactivestreams.Publisher; import timber.log.Timber; public class PollingFragment @@ -36,23 +35,15 @@ public class PollingFragment @Bind(R.id.list_threading_log) ListView _logsList; private LogAdapter _adapter; - private List _logs; - - private CompositeSubscription _subscriptions; private int _counter = 0; - + private CompositeDisposable _disposables; + private List _logs; @Override public void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); - _subscriptions = new CompositeSubscription(); - } - - @Override - public void onActivityCreated(@Nullable Bundle savedInstanceState) { - super.onActivityCreated(savedInstanceState); - _setupLogger(); + _disposables = new CompositeDisposable(); } @Override @@ -64,10 +55,16 @@ public View onCreateView(LayoutInflater inflater, return layout; } + @Override + public void onActivityCreated(@Nullable Bundle savedInstanceState) { + super.onActivityCreated(savedInstanceState); + _setupLogger(); + } + @Override public void onDestroy() { super.onDestroy(); - _subscriptions.unsubscribe(); + _disposables.clear(); ButterKnife.unbind(this); } @@ -76,17 +73,21 @@ public void onStartSimplePollingClicked() { final int pollCount = POLL_COUNT; - _subscriptions.add(// - Observable.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS) - .map(this::_doNetworkCallAndGetStringResult)// - .take(pollCount) - .doOnSubscribe(() -> - _log(String.format("Start simple polling - %s", _counter))) - .subscribe(taskName -> { - _log(String.format(Locale.US, "Executing polled task [%s] now time : [xx:%02d]", - taskName, _getSecondHand())); - }) - ); + Disposable d = Flowable + .interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS) + .map(this::_doNetworkCallAndGetStringResult) + .take(pollCount) + .doOnSubscribe(subscription -> { + _log(String.format("Start simple polling - %s", _counter)); + }) + .subscribe(taskName -> { + _log(String.format(Locale.US, + "Executing polled task [%s] now time : [xx:%02d]", + taskName, + _getSecondHand())); + }); + + _disposables.add(d); } @OnClick(R.id.btn_start_increasingly_delayed_polling) @@ -96,19 +97,15 @@ public void onStartIncreasinglyDelayedPolling() { final int pollingInterval = POLLING_INTERVAL; final int pollCount = POLL_COUNT; - _log(String.format(Locale.US, "Start increasingly delayed polling now time: [xx:%02d]", - _getSecondHand())); - - _subscriptions.add(// - Observable.just(1) - .repeatWhen(new RepeatWithDelay(pollCount, pollingInterval)) - .subscribe(o -> { - _log(String.format(Locale.US, "Executing polled task now time : [xx:%02d]", - _getSecondHand())); - }, e -> { - Timber.d(e, "arrrr. Error"); - }) - ); + _log(String.format(Locale.US, "Start increasingly delayed polling now time: [xx:%02d]", _getSecondHand())); + + _disposables.add(Flowable + .just(1L) + .repeatWhen(new RepeatWithDelay(pollCount, pollingInterval)) + .subscribe(o -> _log(String.format(Locale.US, + "Executing polled task now time : [xx:%02d]", + _getSecondHand())), + e -> Timber.d(e, "arrrr. Error"))); } // ----------------------------------------------------------------------------------- @@ -127,7 +124,8 @@ private String _doNetworkCallAndGetStringResult(long attempt) { // randomly make one event super long so we test that the repeat logic waits // and accounts for this. Thread.sleep(9000); - } else { + } + else { Thread.sleep(3000); } @@ -153,7 +151,8 @@ private void _log(String logMsg) { _logs.add(0, logMsg + " (main thread) "); _adapter.clear(); _adapter.addAll(_logs); - } else { + } + else { _logs.add(0, logMsg + " (NOT main thread) "); // You can only do below stuff on main thread. @@ -177,7 +176,7 @@ private boolean _isCurrentlyOnMainThread() { //public static class RepeatWithDelay public class RepeatWithDelay - implements Func1, Observable> { + implements Function, Publisher> { private final int _repeatLimit; private final int _pollingInterval; @@ -193,28 +192,24 @@ public class RepeatWithDelay // only onNext triggers a re-subscription @Override - public Observable call(Observable inputObservable) { - + public Publisher apply(Flowable inputFlowable) throws Exception { // it is critical to use inputObservable in the chain for the result // ignoring it and doing your own thing will break the sequence - return inputObservable.flatMap(new Func1>() { + return inputFlowable.flatMap(new Function>() { @Override - public Observable call(Void blah) { - - + public Publisher apply(Object o) throws Exception { if (_repeatCount >= _repeatLimit) { // terminate the sequence cause we reached the limit _log("Completing sequence"); - return Observable.empty(); + return Flowable.empty(); } // since we don't get an input // we store state in this handler to tell us the point of time we're firing _repeatCount++; - return Observable.timer(_repeatCount * _pollingInterval, - TimeUnit.MILLISECONDS); + return Flowable.timer(_repeatCount * _pollingInterval, TimeUnit.MILLISECONDS); } }); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheConcatFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheConcatFragment.java deleted file mode 100644 index 98972ed7..00000000 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheConcatFragment.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.morihacky.android.rxjava.fragments; - -import android.os.Bundle; -import android.support.annotation.Nullable; -import android.view.LayoutInflater; -import android.view.View; -import android.view.ViewGroup; -import android.widget.ArrayAdapter; -import android.widget.ListView; -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import com.morihacky.android.rxjava.R; -import com.morihacky.android.rxjava.retrofit.Contributor; -import com.morihacky.android.rxjava.retrofit.GithubApi; -import com.morihacky.android.rxjava.retrofit.GithubService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import rx.Observable; -import rx.Subscriber; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; -import timber.log.Timber; - -public class PseudoCacheConcatFragment - extends BaseFragment { - - @Bind(R.id.log_list) ListView _resultList; - - private HashMap _contributionMap = null; - private ArrayAdapter _adapter; - - @Override - public View onCreateView(LayoutInflater inflater, - @Nullable ViewGroup container, - @Nullable Bundle savedInstanceState) { - View layout = inflater.inflate(R.layout.fragment_pseudo_cache_concat, container, false); - ButterKnife.bind(this, layout); - _initializeCache(); - return layout; - } - - @Override - public void onDestroyView() { - super.onDestroyView(); - ButterKnife.unbind(this); - } - - @OnClick(R.id.btn_start_pseudo_cache) - public void onDemoPseudoCacheClicked() { - _adapter = new ArrayAdapter<>(getActivity(), - R.layout.item_log, - R.id.item_log, - new ArrayList<>()); - - _resultList.setAdapter(_adapter); - _initializeCache(); - - Observable.concatEager(_getCachedData(), _getFreshData()) - .subscribeOn(Schedulers.io()) - .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - Timber.d("done loading all data"); - } - - @Override - public void onError(Throwable e) { - Timber.e(e, "arr something went wrong"); - } - - @Override - public void onNext(Contributor contributor) { - _contributionMap.put(contributor.login, contributor.contributions); - _adapter.clear(); - _adapter.addAll(getListStringFromMap()); - } - }); - } - - private List getListStringFromMap() { - List list = new ArrayList<>(); - - for (String username : _contributionMap.keySet()) { - String rowLog = String.format("%s [%d]", username, _contributionMap.get(username)); - list.add(rowLog); - } - - return list; - } - - private Observable _getCachedData() { - - List list = new ArrayList<>(); - - for (String username : _contributionMap.keySet()) { - Contributor c = new Contributor(); - c.login = username; - c.contributions = _contributionMap.get(username); - list.add(c); - } - - return Observable.from(list); - } - - private Observable _getFreshData() { - String githubToken = getResources().getString(R.string.github_oauth_token); - GithubApi githubService = GithubService.createGithubService(githubToken); - return githubService.contributors("square", "retrofit") - .flatMap(Observable::from); - } - - private void _initializeCache() { - _contributionMap = new HashMap<>(); - _contributionMap.put("JakeWharton", 0l); - _contributionMap.put("pforhan", 0l); - _contributionMap.put("edenman", 0l); - _contributionMap.put("swankjesse", 0l); - _contributionMap.put("bruceLee", 0l); - } -} diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheFragment.java index 9049d3f3..e4d3de1a 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheFragment.java @@ -10,22 +10,25 @@ import android.widget.ArrayAdapter; import android.widget.ListView; import android.widget.TextView; -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; + import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.retrofit.Contributor; import com.morihacky.android.rxjava.retrofit.GithubApi; import com.morihacky.android.rxjava.retrofit.GithubService; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscriber; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; + +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; +import io.reactivex.Observable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.observers.DisposableObserver; +import io.reactivex.schedulers.Schedulers; import timber.log.Timber; public class PseudoCacheFragment @@ -61,9 +64,9 @@ public void onConcatBtnClicked() { Observable.concat(getSlowCachedDiskData(), getFreshNetworkData()) .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -86,12 +89,16 @@ public void onConcatEagerBtnClicked() { infoText.setText(R.string.msg_pseudoCache_demoInfo_concatEager); wireupDemo(); - Observable.concatEager(getSlowCachedDiskData(), getFreshNetworkData()) + List> observables = new ArrayList<>(2); + observables.add(getSlowCachedDiskData()); + observables.add(getFreshNetworkData()); + + Observable.concatEager(observables) .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -118,9 +125,9 @@ public void onMergeBtnClicked() { Observable.merge(getCachedDiskData(), getFreshNetworkData()) .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -146,9 +153,9 @@ public void onMergeSlowBtnClicked() { Observable.merge(getSlowCachedDiskData(), getFreshNetworkData()) .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -177,9 +184,9 @@ public void onMergeOptimizedBtnClicked() { getCachedDiskData().takeUntil(network))) .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -208,9 +215,9 @@ public void onMergeOptimizedWithSlowDiskBtnClicked() { getSlowCachedDiskData().takeUntil(network))) .subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -259,10 +266,10 @@ private Observable getCachedDiskData() { list.add(c); } - return Observable.from(list)// - .doOnSubscribe(() -> new Handler(Looper.getMainLooper())// + return Observable.fromIterable(list)// + .doOnSubscribe((data) -> new Handler(Looper.getMainLooper())// .post(() -> adapterSubscriptionInfo.add("(disk) cache subscribed")))// - .doOnCompleted(() -> new Handler(Looper.getMainLooper())// + .doOnComplete(() -> new Handler(Looper.getMainLooper())// .post(() -> adapterSubscriptionInfo.add("(disk) cache completed"))); } @@ -271,10 +278,10 @@ private Observable getFreshNetworkData() { GithubApi githubService = GithubService.createGithubService(githubToken); return githubService.contributors("square", "retrofit") - .flatMap(Observable::from) - .doOnSubscribe(() -> new Handler(Looper.getMainLooper())// + .flatMap(Observable::fromIterable) + .doOnSubscribe((data) -> new Handler(Looper.getMainLooper())// .post(() -> adapterSubscriptionInfo.add("(network) subscribed")))// - .doOnCompleted(() -> new Handler(Looper.getMainLooper())// + .doOnComplete(() -> new Handler(Looper.getMainLooper())// .post(() -> adapterSubscriptionInfo.add("(network) completed"))); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheMergeFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheMergeFragment.java index 6fbaca18..ec2d49e5 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheMergeFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/PseudoCacheMergeFragment.java @@ -8,20 +8,23 @@ import android.view.ViewGroup; import android.widget.ArrayAdapter; import android.widget.ListView; -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; + import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.retrofit.Contributor; import com.morihacky.android.rxjava.retrofit.GithubApi; import com.morihacky.android.rxjava.retrofit.GithubService; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import rx.Observable; -import rx.Subscriber; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; + +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; +import io.reactivex.Observable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.observers.DisposableObserver; +import io.reactivex.schedulers.Schedulers; import timber.log.Timber; public class PseudoCacheMergeFragment @@ -59,9 +62,9 @@ public void onDemoPseudoCacheClicked() { Observable.merge(_getCachedData(), _getFreshData()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber>() { + .subscribe(new DisposableObserver>() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("done loading all data"); } @@ -114,7 +117,7 @@ private Observable> _getCachedData() { list.add(dataWithAgePair); } - return Observable.from(list); + return Observable.fromIterable(list); } private Observable> _getFreshData() { @@ -122,7 +125,7 @@ private Observable> _getFreshData() { GithubApi githubService = GithubService.createGithubService(githubToken); return githubService.contributors("square", "retrofit") - .flatMap(Observable::from) + .flatMap(Observable::fromIterable) .map(contributor -> new Pair<>(contributor, System.currentTimeMillis())); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitAsyncTaskDeathFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitAsyncTaskDeathFragment.java index 5b42e099..180462c9 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitAsyncTaskDeathFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitAsyncTaskDeathFragment.java @@ -21,9 +21,9 @@ import butterknife.Bind; import butterknife.ButterKnife; import butterknife.OnClick; -import rx.Observer; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.observers.DisposableObserver; +import io.reactivex.schedulers.Schedulers; import static java.lang.String.format; @@ -89,9 +89,9 @@ protected void onPostExecute(User user) { _githubService.user(_username.getText().toString()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Observer() { + .subscribe(new DisposableObserver() { @Override - public void onCompleted() { + public void onComplete() { } @Override diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitFragment.java index 4a0a31bb..96875d60 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/RetrofitFragment.java @@ -23,12 +23,11 @@ import butterknife.Bind; import butterknife.ButterKnife; import butterknife.OnClick; -import rx.Observable; -import rx.Observer; -import rx.Subscriber; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; -import rx.subscriptions.CompositeSubscription; +import io.reactivex.Observable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.observers.DisposableObserver; +import io.reactivex.schedulers.Schedulers; import timber.log.Timber; import static android.text.TextUtils.isEmpty; @@ -43,7 +42,7 @@ public class RetrofitFragment private ArrayAdapter _adapter; private GithubApi _githubService; - private CompositeSubscription _subscriptions; + private CompositeDisposable _disposables; @Override public void onCreate(Bundle savedInstanceState) { @@ -51,7 +50,7 @@ public void onCreate(Bundle savedInstanceState) { String githubToken = getResources().getString(R.string.github_oauth_token); _githubService = GithubService.createGithubService(githubToken); - _subscriptions = new CompositeSubscription(); + _disposables = new CompositeDisposable(); } @Override @@ -78,20 +77,21 @@ public void onDestroyView() { @Override public void onDestroy() { super.onDestroy(); - _subscriptions.unsubscribe(); + _disposables.dispose(); } @OnClick(R.id.btn_demo_retrofit_contributors) public void onListContributorsClicked() { _adapter.clear(); - _subscriptions.add(// + _disposables.add(// _githubService.contributors(_username.getText().toString(), _repo.getText().toString()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Observer>() { + .subscribeWith(new DisposableObserver>() { + @Override - public void onCompleted() { + public void onComplete() { Timber.d("Retrofit call 1 completed"); } @@ -121,8 +121,8 @@ public void onNext(List contributors) { public void onListContributorsWithFullUserInfoClicked() { _adapter.clear(); - _subscriptions.add(_githubService.contributors(_username.getText().toString(), _repo.getText().toString()) - .flatMap(Observable::from) + _disposables.add(_githubService.contributors(_username.getText().toString(), _repo.getText().toString()) + .flatMap(Observable::fromIterable) .flatMap(contributor -> { Observable _userObservable = _githubService.user(contributor.login) .filter(user -> !isEmpty(user.name) && !isEmpty(user.email)); @@ -133,9 +133,9 @@ public void onListContributorsWithFullUserInfoClicked() { }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Subscriber() { + .subscribeWith(new DisposableObserver>() { @Override - public void onCompleted() { + public void onComplete() { Timber.d("Retrofit call 2 completed "); } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1Fragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1Fragment.java index dd48d1b8..ec990a13 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1Fragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1Fragment.java @@ -8,22 +8,19 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.wiring.LogAdapter; - +import io.reactivex.Flowable; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.subscribers.DisposableSubscriber; import java.util.ArrayList; import java.util.List; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observer; -import rx.functions.Action0; -import rx.observables.ConnectableObservable; -import rx.subscriptions.CompositeSubscription; import timber.log.Timber; + import static android.os.Looper.getMainLooper; public class RotationPersist1Fragment @@ -37,7 +34,7 @@ public class RotationPersist1Fragment private LogAdapter _adapter; private List _logs; - private CompositeSubscription _subscriptions = new CompositeSubscription(); + private CompositeDisposable _disposables = new CompositeDisposable(); // ----------------------------------------------------------------------------------- @@ -47,7 +44,7 @@ public void startOperationFromWorkerFrag() { _adapter.clear(); FragmentManager fm = getActivity().getSupportFragmentManager(); - RotationPersist1WorkerFragment frag =// + RotationPersist1WorkerFragment frag = (RotationPersist1WorkerFragment) fm.findFragmentByTag(FRAG_TAG); if (frag == null) { @@ -59,32 +56,33 @@ public void startOperationFromWorkerFrag() { } @Override - public void observeResults(ConnectableObservable intsObservable) { - - _subscriptions.add(// - intsObservable.doOnSubscribe(new Action0() { - @Override - public void call() { - _log("Subscribing to intsObservable"); - } - }).subscribe(new Observer() { - @Override - public void onCompleted() { - _log("Observable is complete"); - } - - @Override - public void onError(Throwable e) { - Timber.e(e, "Error in worker demo frag observable"); - _log("Dang! something went wrong."); - } - - @Override - public void onNext(Integer integer) { - _log(String.format("Worker frag spits out - %d", integer)); - } - })); - + public void observeResults(Flowable intsFlowable) { + + DisposableSubscriber d = new DisposableSubscriber() { + @Override + public void onNext(Integer integer) { + _log(String.format("Worker frag spits out - %d", integer)); + } + + @Override + public void onError(Throwable e) { + Timber.e(e, "Error in worker demo frag observable"); + _log("Dang! something went wrong."); + } + + @Override + public void onComplete() { + _log("Observable is complete"); + } + }; + + intsFlowable + .doOnSubscribe(subscription -> { + _log("Subscribing to intsObservable"); + }) + .subscribe(d); + + _disposables.add(d); } // ----------------------------------------------------------------------------------- @@ -109,7 +107,7 @@ public View onCreateView(LayoutInflater inflater, @Override public void onPause() { super.onPause(); - _subscriptions.clear(); + _disposables.clear(); } @Override diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1WorkerFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1WorkerFragment.java index 2d3ad1b0..34b479fa 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1WorkerFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist1WorkerFragment.java @@ -3,22 +3,19 @@ import android.content.Context; import android.os.Bundle; import android.support.v4.app.Fragment; - import com.morihacky.android.rxjava.MainActivity; - +import io.reactivex.Flowable; +import io.reactivex.disposables.Disposable; +import io.reactivex.flowables.ConnectableFlowable; import java.util.List; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscription; -import rx.observables.ConnectableObservable; - public class RotationPersist1WorkerFragment extends Fragment { private IAmYourMaster _masterFrag; - private ConnectableObservable _storedIntsObservable; - private Subscription _storedIntsSubscription; + private ConnectableFlowable _storedIntsFlowable; + private Disposable _storedIntsDisposable; /** * Hold a reference to the activity -> caller fragment @@ -29,7 +26,9 @@ public class RotationPersist1WorkerFragment public void onAttach(Context context) { super.onAttach(context); - List frags = ((MainActivity) context).getSupportFragmentManager().getFragments(); + List frags = ((MainActivity) context) + .getSupportFragmentManager() + .getFragments(); for (Fragment f : frags) { if (f instanceof IAmYourMaster) { _masterFrag = (IAmYourMaster) f; @@ -51,27 +50,17 @@ public void onCreate(Bundle savedInstanceState) { // Retain this fragment across configuration changes. setRetainInstance(true); - if (_storedIntsObservable != null) { + if (_storedIntsFlowable != null) { return; } - Observable intsObservable =// - Observable.interval(1, TimeUnit.SECONDS)// - .map(Long::intValue)// - .take(20); - - // ----------------------------------------------------------------------------------- - // Making our observable "HOT" for the purpose of the demo. + Flowable intsObservable = Flowable + .interval(1, TimeUnit.SECONDS) + .map(Long::intValue) + .take(20); - //_intsObservable = _intsObservable.share(); - _storedIntsObservable = intsObservable.replay(); - - _storedIntsSubscription = _storedIntsObservable.connect(); - - // Do not do this in production! - // `.share` is "warm" not "hot" - // the below forceful subscription fakes the heat - //_intsObservable.subscribe(); + _storedIntsFlowable = intsObservable.publish(); + _storedIntsDisposable = _storedIntsFlowable.connect(); } /** @@ -80,7 +69,13 @@ public void onCreate(Bundle savedInstanceState) { @Override public void onResume() { super.onResume(); - _masterFrag.observeResults(_storedIntsObservable); + _masterFrag.observeResults(_storedIntsFlowable); + } + + @Override + public void onDestroy() { + super.onDestroy(); + _storedIntsDisposable.dispose(); } /** @@ -93,13 +88,7 @@ public void onDetach() { _masterFrag = null; } - @Override - public void onDestroy() { - super.onDestroy(); - _storedIntsSubscription.unsubscribe(); - } - public interface IAmYourMaster { - void observeResults(ConnectableObservable intsObservable); + void observeResults(Flowable intsObservable); } } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2Fragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2Fragment.java index ebbd88fc..cdf5d244 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2Fragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2Fragment.java @@ -8,21 +8,19 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.wiring.LogAdapter; - +import io.reactivex.Flowable; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.subscribers.DisposableSubscriber; import java.util.ArrayList; import java.util.List; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.Observer; -import rx.subscriptions.CompositeSubscription; import timber.log.Timber; + import static android.os.Looper.getMainLooper; public class RotationPersist2Fragment @@ -36,7 +34,7 @@ public class RotationPersist2Fragment private LogAdapter _adapter; private List _logs; - private CompositeSubscription _subscriptions = new CompositeSubscription(); + private CompositeDisposable _disposables = new CompositeDisposable(); // ----------------------------------------------------------------------------------- @@ -46,42 +44,49 @@ public void startOperationFromWorkerFrag() { _adapter.clear(); FragmentManager fm = getActivity().getSupportFragmentManager(); - RotationPersist2WorkerFragment frag =// - (RotationPersist2WorkerFragment) fm.findFragmentByTag(FRAG_TAG); + RotationPersist2WorkerFragment frag = (RotationPersist2WorkerFragment) fm.findFragmentByTag(FRAG_TAG); if (frag == null) { frag = new RotationPersist2WorkerFragment(); - fm.beginTransaction().add(frag, FRAG_TAG).commit(); - } else { + fm + .beginTransaction() + .add(frag, FRAG_TAG) + .commit(); + } + else { Timber.d("Worker frag already spawned"); } } @Override - public void setStream(Observable intStream) { - - _subscriptions.add(// - intStream.doOnSubscribe(() -> _log("Subscribing to intsObservable")) - .subscribe(new Observer() { - @Override - public void onCompleted() { - _log("Observable is complete"); - } - - @Override - public void onError(Throwable e) { - Timber.e(e, "Error in worker demo frag observable"); - _log("Dang! something went wrong."); - } - - @Override - public void onNext(Integer integer) { - _log(String.format("Worker frag spits out - %d", integer)); - } - })); + public void setStream(Flowable intStream) { + DisposableSubscriber d = new DisposableSubscriber() { + @Override + public void onNext(Integer integer) { + _log(String.format("Worker frag spits out - %d", integer)); + } + + @Override + public void onError(Throwable e) { + Timber.e(e, "Error in worker demo frag observable"); + _log("Dang! something went wrong."); + } + + @Override + public void onComplete() { + _log("Observable is complete"); + } + }; + + intStream + .doOnSubscribe(subscription -> _log("Subscribing to intsObservable")) + .subscribe(d); + + _disposables.add(d); + } - // ----------------------------------------------------------------------------------- +// ----------------------------------------------------------------------------------- // Boilerplate // ----------------------------------------------------------------------------------- @@ -103,7 +108,7 @@ public View onCreateView(LayoutInflater inflater, @Override public void onPause() { super.onPause(); - _subscriptions.clear(); + _disposables.clear(); } private void _setupLogger() { diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2WorkerFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2WorkerFragment.java index 9b82f881..353a8faa 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2WorkerFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/RotationPersist2WorkerFragment.java @@ -1,25 +1,21 @@ package com.morihacky.android.rxjava.fragments; -import android.app.Activity; +import android.content.Context; import android.os.Bundle; import android.support.v4.app.Fragment; - import com.morihacky.android.rxjava.MainActivity; - +import io.reactivex.Flowable; +import io.reactivex.processors.PublishProcessor; import java.util.List; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscription; -import rx.subjects.PublishSubject; -import rx.subjects.Subject; - public class RotationPersist2WorkerFragment extends Fragment { + private PublishProcessor _intStream; + private PublishProcessor _lifeCycleStream; + private IAmYourMaster _masterFrag; - private Subscription _storedIntsSubscription; - private Subject _intStream = PublishSubject.create(); /** * Since we're holding a reference to the Master a.k.a Activity/Master Frag @@ -28,10 +24,12 @@ public class RotationPersist2WorkerFragment * See {@link MainActivity#onBackPressed()} */ @Override - public void onAttach(Activity activity) { - super.onAttach(activity); + public void onAttach(Context context) { + super.onAttach(context); - List frags = ((MainActivity) activity).getSupportFragmentManager().getFragments(); + List frags = ((MainActivity) context) + .getSupportFragmentManager() + .getFragments(); for (Fragment f : frags) { if (f instanceof IAmYourMaster) { _masterFrag = (IAmYourMaster) f; @@ -50,14 +48,20 @@ public void onAttach(Activity activity) { public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); + _intStream = PublishProcessor.create(); + _lifeCycleStream = PublishProcessor.create(); + // Retain this fragment across configuration changes. setRetainInstance(true); - _storedIntsSubscription = - Observable.interval(1, TimeUnit.SECONDS) - .map(Long::intValue) - .take(20) - .subscribe(_intStream); + _intStream.takeUntil(_lifeCycleStream); + + Flowable + .interval(1, TimeUnit.SECONDS) + .map(Long::intValue) + .take(20) + .subscribe(_intStream); + } /** @@ -66,7 +70,13 @@ public void onCreate(Bundle savedInstanceState) { @Override public void onResume() { super.onResume(); - _masterFrag.setStream(_intStream.asObservable()); + _masterFrag.setStream(_intStream); + } + + @Override + public void onDestroy() { + super.onDestroy(); + _lifeCycleStream.onComplete(); } /** @@ -79,13 +89,7 @@ public void onDetach() { _masterFrag = null; } - @Override - public void onDestroy() { - super.onDestroy(); - _storedIntsSubscription.unsubscribe(); - } - public interface IAmYourMaster { - void setStream(Observable intStream); + void setStream(Flowable intStream); } } diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/TimeoutDemoFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/TimeoutDemoFragment.java index 03e86031..d70dd669 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/TimeoutDemoFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/TimeoutDemoFragment.java @@ -8,24 +8,20 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; -import com.morihacky.android.rxjava.RxUtils; import com.morihacky.android.rxjava.wiring.LogAdapter; - +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.observers.DisposableObserver; +import io.reactivex.schedulers.Schedulers; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.Observer; -import rx.Subscriber; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; import timber.log.Timber; public class TimeoutDemoFragment @@ -34,20 +30,18 @@ public class TimeoutDemoFragment @Bind(R.id.list_threading_log) ListView _logsList; private LogAdapter _adapter; + private DisposableObserver _disposable; private List _logs; - private Subscription _subscription; - @Override public void onDestroy() { super.onDestroy(); - RxUtils.unsubscribeIfNotNull(_subscription); - } - @Override - public void onActivityCreated(@Nullable Bundle savedInstanceState) { - super.onActivityCreated(savedInstanceState); - _setupLogger(); + if (_disposable == null) { + return; + } + + _disposable.dispose(); } @Override @@ -59,88 +53,97 @@ public View onCreateView(LayoutInflater inflater, return layout; } + @Override + public void onActivityCreated(@Nullable Bundle savedInstanceState) { + super.onActivityCreated(savedInstanceState); + _setupLogger(); + } + @OnClick(R.id.btn_demo_timeout_1_2s) public void onStart2sTask() { - _subscription = _getObservableTask_2sToComplete()// - .observeOn(AndroidSchedulers.mainThread())// - .subscribe(_getEventCompletionObserver()); + _disposable = _getEventCompletionObserver(); + + _getObservableTask_2sToComplete() + .timeout(3, TimeUnit.SECONDS) + .subscribeOn(Schedulers.computation()) + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(_disposable); } @OnClick(R.id.btn_demo_timeout_1_5s) public void onStart5sTask() { - _subscription = _getObservableFor5sTask()// - .timeout(2, TimeUnit.SECONDS, _getTimeoutObservable()) + _disposable = _getEventCompletionObserver(); + + _getObservableTask_5sToComplete() + .timeout(3, TimeUnit.SECONDS, _onTimeoutObservable()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(_getEventCompletionObserver()); + .subscribe(_disposable); } // ----------------------------------------------------------------------------------- // Main Rx entities - private Observable _getObservableFor5sTask() { - return Observable.create(new Observable.OnSubscribe() { - + private Observable _getObservableTask_5sToComplete() { + return Observable.create(new ObservableOnSubscribe() { @Override - public void call(Subscriber subscriber) { + public void subscribe(ObservableEmitter subscriber) throws Exception { _log(String.format("Starting a 5s task")); subscriber.onNext("5 s"); try { - Thread.sleep(1200); + Thread.sleep(5_000); } catch (InterruptedException e) { e.printStackTrace(); } - subscriber.onCompleted(); + subscriber.onComplete(); } }); } private Observable _getObservableTask_2sToComplete() { - return Observable.create(new Observable.OnSubscribe() { - - @Override - public void call(Subscriber subscriber) { - _log(String.format("Starting a 2s task")); - subscriber.onNext("2 s"); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - subscriber.onCompleted(); - } - }).subscribeOn(Schedulers.computation()).timeout(3, TimeUnit.SECONDS); + return Observable + .create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter subscriber) throws Exception { + _log(String.format("Starting a 2s task")); + subscriber.onNext("2 s"); + try { + Thread.sleep(2_000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + subscriber.onComplete(); + } + }); } - private Observable _getTimeoutObservable() { - return Observable.create(new Observable.OnSubscribe() { + private Observable _onTimeoutObservable() { + return Observable.create(new ObservableOnSubscribe() { @Override - public void call(Subscriber subscriber) { + public void subscribe(ObservableEmitter subscriber) throws Exception { _log("Timing out this task ..."); - subscriber.onCompleted(); + subscriber.onError(new Throwable("Timeout Error")); } }); } - private Observer _getEventCompletionObserver() { - return new Observer() { - + private DisposableObserver _getEventCompletionObserver() { + return new DisposableObserver() { @Override - public void onCompleted() { - _log(String.format("task was completed")); + public void onNext(String taskType) { + _log(String.format("onNext %s task", taskType)); } @Override public void onError(Throwable e) { _log(String.format("Dang a task timeout")); - onCompleted(); Timber.e(e, "Timeout Demo exception"); } @Override - public void onNext(String taskType) { - _log(String.format("onNext %s task", taskType)); + public void onComplete() { + _log(String.format("task was completed")); } }; } @@ -160,7 +163,8 @@ private void _log(String logMsg) { _logs.add(0, logMsg + " (main thread) "); _adapter.clear(); _adapter.addAll(_logs); - } else { + } + else { _logs.add(0, logMsg + " (NOT main thread) "); // You can only do below stuff on main thread. diff --git a/app/src/main/java/com/morihacky/android/rxjava/fragments/TimingDemoFragment.java b/app/src/main/java/com/morihacky/android/rxjava/fragments/TimingDemoFragment.java index 649c5f77..8e8458b7 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/fragments/TimingDemoFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/fragments/TimingDemoFragment.java @@ -7,26 +7,23 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.morihacky.android.rxjava.R; -import com.morihacky.android.rxjava.RxUtils; import com.morihacky.android.rxjava.wiring.LogAdapter; - +import io.reactivex.Flowable; +import io.reactivex.subscribers.DefaultSubscriber; +import io.reactivex.subscribers.DisposableSubscriber; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.concurrent.TimeUnit; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.functions.Action1; import timber.log.Timber; + import static android.os.Looper.getMainLooper; import static android.os.Looper.myLooper; @@ -38,14 +35,8 @@ public class TimingDemoFragment private LogAdapter _adapter; private List _logs; - private Subscription _subscription1 = null; - private Subscription _subscription2 = null; - - @Override - public void onActivityCreated(@Nullable Bundle savedInstanceState) { - super.onActivityCreated(savedInstanceState); - _setupLogger(); - } + private DisposableSubscriber _subscriber1; + private DisposableSubscriber _subscriber2; @Override public View onCreateView(LayoutInflater inflater, @@ -56,13 +47,17 @@ public View onCreateView(LayoutInflater inflater, return layout; } + @Override + public void onActivityCreated(@Nullable Bundle savedInstanceState) { + super.onActivityCreated(savedInstanceState); + _setupLogger(); + } + @Override public void onDestroyView() { super.onDestroyView(); ButterKnife.unbind(this); - RxUtils.unsubscribeIfNotNull(_subscription1); - RxUtils.unsubscribeIfNotNull(_subscription2); } // ----------------------------------------------------------------------------------- @@ -70,96 +65,101 @@ public void onDestroyView() { public void btn1_RunSingleTaskAfter2s() { _log(String.format("A1 [%s] --- BTN click", _getCurrentTimestamp())); - Observable.timer(2, TimeUnit.SECONDS)// - //.just(1).delay(2, TimeUnit.SECONDS)// - .subscribe(new Observer() { - @Override - public void onCompleted() { - _log(String.format("A1 [%s] XXX COMPLETE", _getCurrentTimestamp())); - } - - @Override - public void onError(Throwable e) { - Timber.e(e, "something went wrong in TimingDemoFragment example"); - } - - @Override - public void onNext(Long number) { - _log(String.format("A1 [%s] NEXT", _getCurrentTimestamp())); - } - }); + Flowable.timer(2, TimeUnit.SECONDS)// + .subscribe(new DefaultSubscriber() { + @Override + public void onNext(Long number) { + _log(String.format("A1 [%s] NEXT", _getCurrentTimestamp())); + } + + @Override + public void onError(Throwable e) { + Timber.e(e, "something went wrong in TimingDemoFragment example"); + } + + @Override + public void onComplete() { + _log(String.format("A1 [%s] XXX COMPLETE", _getCurrentTimestamp())); + } + }); } @OnClick(R.id.btn_demo_timing_2) public void btn2_RunTask_IntervalOf1s() { - if (_subscription1 != null && !_subscription1.isUnsubscribed()) { - _subscription1.unsubscribe(); + if (_subscriber1 != null && !_subscriber1.isDisposed()) { + _subscriber1.dispose(); _log(String.format("B2 [%s] XXX BTN KILLED", _getCurrentTimestamp())); return; } _log(String.format("B2 [%s] --- BTN click", _getCurrentTimestamp())); - _subscription1 = Observable// - .interval(1, TimeUnit.SECONDS)// - .subscribe(new Observer() { - @Override - public void onCompleted() { - _log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp())); - } - - @Override - public void onError(Throwable e) { - Timber.e(e, "something went wrong in TimingDemoFragment example"); - } - - @Override - public void onNext(Long number) { - _log(String.format("B2 [%s] NEXT", _getCurrentTimestamp())); - } - }); + _subscriber1 = new DisposableSubscriber() { + @Override + public void onComplete() { + _log(String.format("B2 [%s] XXXX COMPLETE", _getCurrentTimestamp())); + } + + @Override + public void onError(Throwable e) { + Timber.e(e, "something went wrong in TimingDemoFragment example"); + } + + @Override + public void onNext(Long number) { + _log(String.format("B2 [%s] NEXT", _getCurrentTimestamp())); + } + }; + + Flowable + .interval(1, TimeUnit.SECONDS) + .subscribe(_subscriber1); } @OnClick(R.id.btn_demo_timing_3) public void btn3_RunTask_IntervalOf1s_StartImmediately() { - if (_subscription2 != null && !_subscription2.isUnsubscribed()) { - _subscription2.unsubscribe(); + if (_subscriber2 != null && !_subscriber2.isDisposed()) { + _subscriber2.dispose(); _log(String.format("C3 [%s] XXX BTN KILLED", _getCurrentTimestamp())); return; } _log(String.format("C3 [%s] --- BTN click", _getCurrentTimestamp())); - _subscription2 = Observable// - .interval(0, 1, TimeUnit.SECONDS)// - .subscribe(new Observer() { - @Override - public void onCompleted() { - _log(String.format("C3 [%s] XXXX COMPLETE", _getCurrentTimestamp())); - } + _subscriber2 = new DisposableSubscriber() { + @Override + public void onNext(Long number) { + _log(String.format("C3 [%s] NEXT", _getCurrentTimestamp())); + } - @Override - public void onError(Throwable e) { - Timber.e(e, "something went wrong in TimingDemoFragment example"); - } + @Override + public void onComplete() { + _log(String.format("C3 [%s] XXXX COMPLETE", _getCurrentTimestamp())); + } - @Override - public void onNext(Long number) { - _log(String.format("C3 [%s] NEXT", _getCurrentTimestamp())); - } - }); + @Override + public void onError(Throwable e) { + Timber.e(e, "something went wrong in TimingDemoFragment example"); + } + + }; + + Flowable + .interval(0, 1, TimeUnit.SECONDS) + .subscribe(_subscriber2); } @OnClick(R.id.btn_demo_timing_4) public void btn4_RunTask5Times_IntervalOf3s() { _log(String.format("D4 [%s] --- BTN click", _getCurrentTimestamp())); - Observable// - .interval(3, TimeUnit.SECONDS).take(5)// - .subscribe(new Observer() { + Flowable + .interval(3, TimeUnit.SECONDS) + .take(5) + .subscribe(new DefaultSubscriber() { @Override - public void onCompleted() { - _log(String.format("D4 [%s] XXX COMPLETE", _getCurrentTimestamp())); + public void onNext(Long number) { + _log(String.format("D4 [%s] NEXT", _getCurrentTimestamp())); } @Override @@ -168,9 +168,10 @@ public void onError(Throwable e) { } @Override - public void onNext(Long number) { - _log(String.format("D4 [%s] NEXT", _getCurrentTimestamp())); + public void onComplete() { + _log(String.format("D4 [%s] XXX COMPLETE", _getCurrentTimestamp())); } + }); } @@ -178,23 +179,16 @@ public void onNext(Long number) { public void btn5_RunTask5Times_IntervalOf3s() { _log(String.format("D5 [%s] --- BTN click", _getCurrentTimestamp())); - Observable.just("Do task A right away") - .doOnNext(new Action1() { - @Override - public void call(String input) { - _log(String.format("D5 %s [%s]", input, _getCurrentTimestamp())); - } - }) + Flowable + .just("Do task A right away") + .doOnNext(input -> _log(String.format("D5 %s [%s]", input, _getCurrentTimestamp()))) .delay(1, TimeUnit.SECONDS) - .doOnNext(new Action1() { - @Override - public void call(String oldInput) { - _log(String.format("D5 %s [%s]", "Doing Task B after a delay", _getCurrentTimestamp())); - } - }) - .subscribe(new Observer() { + .doOnNext(oldInput -> _log(String.format("D5 %s [%s]", + "Doing Task B after a delay", + _getCurrentTimestamp()))) + .subscribe(new DefaultSubscriber() { @Override - public void onCompleted() { + public void onComplete() { _log(String.format("D5 [%s] XXX COMPLETE", _getCurrentTimestamp())); } @@ -236,7 +230,6 @@ private void _log(String logMsg) { } private String _getCurrentTimestamp() { - return new SimpleDateFormat("k:m:s:S a").format(new Date()); + return new SimpleDateFormat("k:m:s:S a", Locale.getDefault()).format(new Date()); } - } diff --git a/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationAutoFragment.java b/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationAutoFragment.java index 17ee72f2..a506c629 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationAutoFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationAutoFragment.java @@ -14,14 +14,14 @@ import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.fragments.BaseFragment; import com.morihacky.android.rxjava.rxbus.RxBus; +import io.reactivex.Flowable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; +import io.reactivex.processors.PublishProcessor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; -import rx.subjects.PublishSubject; -import rx.subscriptions.CompositeSubscription; public class PaginationAutoFragment extends BaseFragment { @@ -31,9 +31,9 @@ public class PaginationAutoFragment private PaginationAutoAdapter _adapter; private RxBus _bus; - private PublishSubject _paginator; + private CompositeDisposable _disposables; + private PublishProcessor _paginator; private boolean _requestUnderWay = false; - private CompositeSubscription _subscriptions; @Override public View onCreateView(LayoutInflater inflater, @@ -57,49 +57,51 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { _adapter = new PaginationAutoAdapter(_bus); _pagingList.setAdapter(_adapter); - _paginator = PublishSubject.create(); + _paginator = PublishProcessor.create(); } @Override public void onStart() { super.onStart(); - _subscriptions = new CompositeSubscription(); - - Subscription s2 =// - _paginator.onBackpressureDrop() - .doOnNext(i -> { - _requestUnderWay = true; - _progressBar.setVisibility(View.VISIBLE); - }) - .concatMap(this::_itemsFromNetworkCall) - .observeOn(AndroidSchedulers.mainThread()) - .map(items -> { - _adapter.addItems(items); - _adapter.notifyDataSetChanged(); - return null; - }) - .doOnNext(i -> { - _requestUnderWay = false; - _progressBar.setVisibility(View.INVISIBLE); - }) - .subscribe(); + _disposables = new CompositeDisposable(); + + Disposable d2 = _paginator + .onBackpressureDrop() + .doOnNext(i -> { + _requestUnderWay = true; + _progressBar.setVisibility(View.VISIBLE); + }) + .concatMap(this::_itemsFromNetworkCall) + .observeOn(AndroidSchedulers.mainThread()) + .map(items -> { + _adapter.addItems(items); + _adapter.notifyDataSetChanged(); + + return items; + }) + .doOnNext(i -> { + _requestUnderWay = false; + _progressBar.setVisibility(View.INVISIBLE); + }) + .subscribe(); // I'm using an RxBus purely to hear from a nested button click // we don't really need Rx for this part. it's just easy ¯\_(ツ)_/¯ - Subscription s1 =// - _bus.asObservable()// - .filter(o -> !_requestUnderWay)// - .subscribe(event -> { - if (event instanceof PaginationAutoAdapter.PageEvent) { - // trigger the paginator for the next event - int nextPage = _adapter.getItemCount(); - _paginator.onNext(nextPage); - } - }); + Disposable d1 = _bus + .asFlowable() + .filter(o -> !_requestUnderWay) + .subscribe(event -> { + if (event instanceof PaginationAutoAdapter.PageEvent) { + + // trigger the paginator for the next event + int nextPage = _adapter.getItemCount(); + _paginator.onNext(nextPage); + } + }); - _subscriptions.add(s1); - _subscriptions.add(s2); + _disposables.add(d1); + _disposables.add(d2); _paginator.onNext(0); } @@ -107,15 +109,16 @@ public void onStart() { @Override public void onStop() { super.onStop(); - _subscriptions.clear(); + _disposables.clear(); } /** * Fake Observable that simulates a network call and then sends down a list of items */ - private Observable> _itemsFromNetworkCall(int pageStart) { - return Observable.just(true)// - .observeOn(AndroidSchedulers.mainThread())// + private Flowable> _itemsFromNetworkCall(int pageStart) { + return Flowable + .just(true) + .observeOn(AndroidSchedulers.mainThread()) .delay(2, TimeUnit.SECONDS) .map(dummy -> { List items = new ArrayList<>(); diff --git a/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationFragment.java b/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationFragment.java index 5973e024..7569b3c9 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/pagination/PaginationFragment.java @@ -8,33 +8,30 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ProgressBar; - +import butterknife.Bind; +import butterknife.ButterKnife; import com.morihacky.android.rxjava.MainActivity; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.fragments.BaseFragment; import com.morihacky.android.rxjava.rxbus.RxBus; - +import io.reactivex.Flowable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; +import io.reactivex.processors.PublishProcessor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import butterknife.Bind; -import butterknife.ButterKnife; -import rx.Observable; -import rx.Subscription; -import rx.android.schedulers.AndroidSchedulers; -import rx.subjects.PublishSubject; -import rx.subscriptions.CompositeSubscription; - -public class PaginationFragment extends BaseFragment { +public class PaginationFragment + extends BaseFragment { @Bind(R.id.list_paging) RecyclerView _pagingList; @Bind(R.id.progress_paging) ProgressBar _progressBar; - - private CompositeSubscription _subscriptions; private PaginationAdapter _adapter; private RxBus _bus; - private PublishSubject _paginator; + private CompositeDisposable _disposables; + private PublishProcessor _paginator; @Override public void onActivityCreated(@Nullable Bundle savedInstanceState) { @@ -49,56 +46,60 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { _adapter = new PaginationAdapter(_bus); _pagingList.setAdapter(_adapter); - _paginator = PublishSubject.create(); + _paginator = PublishProcessor.create(); } @Override public void onStart() { super.onStart(); - _subscriptions = new CompositeSubscription(); + _disposables = new CompositeDisposable(); - Subscription s2 =// - _paginator// - .onBackpressureDrop()// - .concatMap(nextPage -> _itemsFromNetworkCall(nextPage + 1, 10))// - .observeOn(AndroidSchedulers.mainThread()).map(items -> { + Disposable d2 = _paginator + .onBackpressureDrop() + .concatMap(nextPage -> _itemsFromNetworkCall(nextPage + 1, 10)) + .observeOn(AndroidSchedulers.mainThread()) + .map(items -> { int start = _adapter.getItemCount() - 1; _adapter.addItems(items); _adapter.notifyItemRangeInserted(start, 10); _progressBar.setVisibility(View.INVISIBLE); - return null; - })// - .subscribe(); + + return items; + }) + .subscribe(); // I'm using an Rxbus purely to hear from a nested button click // we don't really need Rx for this part. it's just easy ¯\_(ツ)_/¯ - Subscription s1 = _bus.asObservable().subscribe(event -> { - if (event instanceof PaginationAdapter.ItemBtnViewHolder.PageEvent) { + Disposable d1 = _bus + .asFlowable() + .subscribe(event -> { + if (event instanceof PaginationAdapter.ItemBtnViewHolder.PageEvent) { - // trigger the paginator for the next event - int nextPage = _adapter.getItemCount() - 1; - _paginator.onNext(nextPage); + // trigger the paginator for the next event + int nextPage = _adapter.getItemCount() - 1; + _paginator.onNext(nextPage); - } - }); + } + }); - _subscriptions.add(s1); - _subscriptions.add(s2); + _disposables.add(d1); + _disposables.add(d2); } @Override public void onStop() { super.onStop(); - _subscriptions.clear(); + _disposables.clear(); } /** * Fake Observable that simulates a network call and then sends down a list of items */ - private Observable> _itemsFromNetworkCall(int start, int count) { - return Observable.just(true) + private Flowable> _itemsFromNetworkCall(int start, int count) { + return Flowable + .just(true) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(dummy -> _progressBar.setVisibility(View.VISIBLE)) .delay(2, TimeUnit.SECONDS) @@ -111,7 +112,6 @@ private Observable> _itemsFromNetworkCall(int start, int count) { }); } - // ----------------------------------------------------------------------------------- // WIRING up the views required for this example @@ -123,6 +123,4 @@ public View onCreateView(LayoutInflater inflater, ButterKnife.bind(this, layout); return layout; } - - } diff --git a/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubApi.java b/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubApi.java index d65cd92a..d42c2ef9 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubApi.java +++ b/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubApi.java @@ -2,9 +2,9 @@ import java.util.List; +import io.reactivex.Observable; import retrofit2.http.GET; import retrofit2.http.Path; -import rx.Observable; public interface GithubApi { diff --git a/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubService.java b/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubService.java index 0a8d33d6..d7f24a88 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubService.java +++ b/app/src/main/java/com/morihacky/android/rxjava/retrofit/GithubService.java @@ -2,11 +2,11 @@ import android.text.TextUtils; +import com.jakewharton.retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory; + import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.Response; import retrofit2.Retrofit; -import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory; import retrofit2.converter.gson.GsonConverterFactory; import static java.lang.String.format; @@ -17,7 +17,7 @@ private GithubService() { } public static GithubApi createGithubService(final String githubToken) { - Retrofit.Builder builder = new Retrofit.Builder().addCallAdapterFactory(RxJavaCallAdapterFactory.create()) + Retrofit.Builder builder = new Retrofit.Builder().addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .addConverterFactory(GsonConverterFactory.create()) .baseUrl("https://api.github.com"); diff --git a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java index 545239e6..a953e20f 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java +++ b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java @@ -2,8 +2,8 @@ import com.jakewharton.rxrelay.PublishRelay; import com.jakewharton.rxrelay.Relay; - -import rx.Observable; +import hu.akarnokd.rxjava.interop.RxJavaInterop; +import io.reactivex.Flowable; /** * courtesy: https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf @@ -16,8 +16,9 @@ public void send(Object o) { _bus.call(o); } - public Observable asObservable() { - return _bus.asObservable(); + public Flowable asFlowable() { + // this won't be necessary after https://github.com/JakeWharton/RxRelay/pull/20 is complete + return RxJavaInterop.toV2Flowable(_bus.asObservable()); } public boolean hasObservers() { diff --git a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom1Fragment.java b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom1Fragment.java index 439888f4..4ebb3200 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom1Fragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom1Fragment.java @@ -7,21 +7,19 @@ import android.view.View; import android.view.ViewGroup; import android.widget.TextView; - +import butterknife.Bind; +import butterknife.ButterKnife; import com.morihacky.android.rxjava.MainActivity; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.fragments.BaseFragment; - -import butterknife.Bind; -import butterknife.ButterKnife; -import rx.subscriptions.CompositeSubscription; +import io.reactivex.disposables.CompositeDisposable; public class RxBusDemo_Bottom1Fragment extends BaseFragment { @Bind(R.id.demo_rxbus_tap_txt) TextView _tapEventTxtShow; + private CompositeDisposable _disposables; private RxBus _rxBus; - private CompositeSubscription _subscriptions; @Override public View onCreateView(LayoutInflater inflater, @@ -41,26 +39,29 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { @Override public void onStart() { super.onStart(); - _subscriptions = new CompositeSubscription(); + _disposables = new CompositeDisposable(); - _subscriptions// - .add(_rxBus.asObservable()// - .subscribe(event -> { - if (event instanceof RxBusDemoFragment.TapEvent) { - _showTapText(); - } - })); + _disposables.add(_rxBus + .asFlowable() + .subscribe(event -> { + if (event instanceof RxBusDemoFragment.TapEvent) { + _showTapText(); + } + })); } @Override public void onStop() { super.onStop(); - _subscriptions.clear(); + _disposables.clear(); } private void _showTapText() { _tapEventTxtShow.setVisibility(View.VISIBLE); _tapEventTxtShow.setAlpha(1f); - ViewCompat.animate(_tapEventTxtShow).alphaBy(-1f).setDuration(400); + ViewCompat + .animate(_tapEventTxtShow) + .alphaBy(-1f) + .setDuration(400); } } diff --git a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom2Fragment.java b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom2Fragment.java index 5fd50e3b..a2a99587 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom2Fragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom2Fragment.java @@ -7,27 +7,25 @@ import android.view.View; import android.view.ViewGroup; import android.widget.TextView; - +import butterknife.Bind; +import butterknife.ButterKnife; import com.morihacky.android.rxjava.MainActivity; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.fragments.BaseFragment; - +import io.reactivex.Flowable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; import java.util.List; import java.util.concurrent.TimeUnit; -import butterknife.Bind; -import butterknife.ButterKnife; -import rx.Observable; -import rx.android.schedulers.AndroidSchedulers; -import rx.subscriptions.CompositeSubscription; - public class RxBusDemo_Bottom2Fragment extends BaseFragment { @Bind(R.id.demo_rxbus_tap_txt) TextView _tapEventTxtShow; @Bind(R.id.demo_rxbus_tap_count) TextView _tapEventCountShow; + private RxBus _rxBus; - private CompositeSubscription _subscriptions; + private CompositeDisposable _disposables; @Override public View onCreateView(LayoutInflater inflater, @@ -47,32 +45,32 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { @Override public void onStart() { super.onStart(); - _subscriptions = new CompositeSubscription(); - - Observable tapEventEmitter = _rxBus.asObservable().share(); - - _subscriptions// - .add(tapEventEmitter.subscribe(event -> { - if (event instanceof RxBusDemoFragment.TapEvent) { - _showTapText(); - } - })); - - Observable debouncedEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS); - Observable> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEmitter); - - _subscriptions// - .add(debouncedBufferEmitter// - .observeOn(AndroidSchedulers.mainThread())// - .subscribe(taps -> { - _showTapCount(taps.size()); - })); + _disposables = new CompositeDisposable(); + + Flowable tapEventEmitter = _rxBus + .asFlowable() + .share(); + + _disposables.add(tapEventEmitter.subscribe(event -> { + if (event instanceof RxBusDemoFragment.TapEvent) { + _showTapText(); + } + })); + + Flowable debouncedEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS); + Flowable> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEmitter); + + _disposables.add(debouncedBufferEmitter + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(taps -> { + _showTapCount(taps.size()); + })); } @Override public void onStop() { super.onStop(); - _subscriptions.clear(); + _disposables.clear(); } // ----------------------------------------------------------------------------------- diff --git a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom3Fragment.java b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom3Fragment.java index 80ee38fb..d4125f0a 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom3Fragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBusDemo_Bottom3Fragment.java @@ -7,26 +7,23 @@ import android.view.View; import android.view.ViewGroup; import android.widget.TextView; - +import butterknife.Bind; +import butterknife.ButterKnife; import com.morihacky.android.rxjava.MainActivity; import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.fragments.BaseFragment; - +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.flowables.ConnectableFlowable; import java.util.concurrent.TimeUnit; -import butterknife.Bind; -import butterknife.ButterKnife; -import rx.android.schedulers.AndroidSchedulers; -import rx.observables.ConnectableObservable; -import rx.subscriptions.CompositeSubscription; - public class RxBusDemo_Bottom3Fragment extends BaseFragment { @Bind(R.id.demo_rxbus_tap_txt) TextView _tapEventTxtShow; @Bind(R.id.demo_rxbus_tap_count) TextView _tapEventCountShow; private RxBus _rxBus; - private CompositeSubscription _subscriptions; + private CompositeDisposable _disposables; @Override public View onCreateView(LayoutInflater inflater, @@ -46,32 +43,32 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { @Override public void onStart() { super.onStart(); - _subscriptions = new CompositeSubscription(); + _disposables = new CompositeDisposable(); - ConnectableObservable tapEventEmitter = _rxBus.asObservable().publish(); + ConnectableFlowable tapEventEmitter = _rxBus.asFlowable().publish(); - _subscriptions// - .add(tapEventEmitter.subscribe(event -> { + _disposables// + .add(tapEventEmitter.subscribe(event -> { if (event instanceof RxBusDemoFragment.TapEvent) { _showTapText(); } })); - _subscriptions + _disposables .add(tapEventEmitter.publish(stream -> stream.buffer(stream.debounce(1, TimeUnit.SECONDS))) - .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> { + .observeOn(AndroidSchedulers.mainThread()).subscribe(taps -> { _showTapCount(taps.size()); })); - _subscriptions.add(tapEventEmitter.connect()); + _disposables.add(tapEventEmitter.connect()); } @Override public void onStop() { super.onStop(); - _subscriptions.clear(); + _disposables.clear(); } // ----------------------------------------------------------------------------------- diff --git a/app/src/main/java/com/morihacky/android/rxjava/volley/MyVolley.java b/app/src/main/java/com/morihacky/android/rxjava/volley/MyVolley.java index d67b9433..835a0287 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/volley/MyVolley.java +++ b/app/src/main/java/com/morihacky/android/rxjava/volley/MyVolley.java @@ -6,8 +6,6 @@ /** * Helper class that is used to provide references to initialized RequestQueue(s) and ImageLoader(s) - * - * @author Ognyan Bankov */ public class MyVolley { private static RequestQueue mRequestQueue; @@ -20,7 +18,7 @@ public static void init(Context context) { mRequestQueue = Volley.newRequestQueue(context); } - public static RequestQueue getRequestQueue() { + static RequestQueue getRequestQueue() { if (mRequestQueue != null) { return mRequestQueue; } else { diff --git a/app/src/main/java/com/morihacky/android/rxjava/volley/VolleyDemoFragment.java b/app/src/main/java/com/morihacky/android/rxjava/volley/VolleyDemoFragment.java index 57dda54d..c357e536 100644 --- a/app/src/main/java/com/morihacky/android/rxjava/volley/VolleyDemoFragment.java +++ b/app/src/main/java/com/morihacky/android/rxjava/volley/VolleyDemoFragment.java @@ -9,7 +9,9 @@ import android.view.View; import android.view.ViewGroup; import android.widget.ListView; - +import butterknife.Bind; +import butterknife.ButterKnife; +import butterknife.OnClick; import com.android.volley.Request; import com.android.volley.VolleyError; import com.android.volley.toolbox.JsonObjectRequest; @@ -17,22 +19,16 @@ import com.morihacky.android.rxjava.R; import com.morihacky.android.rxjava.fragments.BaseFragment; import com.morihacky.android.rxjava.wiring.LogAdapter; - -import org.json.JSONObject; - +import io.reactivex.Flowable; +import io.reactivex.android.schedulers.AndroidSchedulers; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.DisposableSubscriber; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; - -import butterknife.Bind; -import butterknife.ButterKnife; -import butterknife.OnClick; -import rx.Observable; -import rx.Observer; -import rx.android.schedulers.AndroidSchedulers; -import rx.schedulers.Schedulers; -import rx.subscriptions.CompositeSubscription; +import org.json.JSONObject; import timber.log.Timber; public class VolleyDemoFragment @@ -45,7 +41,7 @@ public class VolleyDemoFragment private List _logs; private LogAdapter _adapter; - private CompositeSubscription _compositeSubscription = new CompositeSubscription(); + private CompositeDisposable _disposables = new CompositeDisposable(); @Override public View onCreateView(LayoutInflater inflater, @@ -65,7 +61,7 @@ public void onActivityCreated(@Nullable Bundle savedInstanceState) { @Override public void onPause() { super.onPause(); - _compositeSubscription.clear(); + _disposables.clear(); } @Override @@ -75,18 +71,18 @@ public void onDestroyView() { } /** - * Creates and returns an observable generated from the Future returned from + * Creates and returns an observable generated from the Future returned from * {@code getRouteData()}. The observable can then be subscribed to as shown in * {@code startVolleyRequest()} * @return Observable */ - public Observable newGetRouteData() { - return Observable.defer(() -> { + public Flowable newGetRouteData() { + return Flowable.defer(() -> { try { - return Observable.just(getRouteData()); + return Flowable.just(getRouteData()); } catch (InterruptedException | ExecutionException e) { Log.e("routes", e.getMessage()); - return Observable.error(e); + return Flowable.error(e); } }); } @@ -97,38 +93,44 @@ void startRequest() { } private void startVolleyRequest() { - _compositeSubscription.add(newGetRouteData().subscribeOn(Schedulers.io()) + DisposableSubscriber d = new DisposableSubscriber() { + @Override + public void onNext(JSONObject jsonObject) { + Log.e(TAG, "onNext " + jsonObject.toString()); + _log("onNext " + jsonObject.toString()); + + } + + @Override + public void onError(Throwable e) { + VolleyError cause = (VolleyError) e.getCause(); + String s = new String(cause.networkResponse.data, Charset.forName("UTF-8")); + Log.e(TAG, s); + Log.e(TAG, cause.toString()); + _log("onError " + s); + + } + + @Override + public void onComplete() { + Log.e(TAG, "onCompleted"); + Timber.d("----- onCompleted"); + _log("onCompleted "); + } + }; + + newGetRouteData() + .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) - .subscribe(new Observer() { - @Override - public void onCompleted() { - Log.e(TAG, "onCompleted"); - Timber.d("----- onCompleted"); - _log("onCompleted "); - } - - @Override - public void onError(Throwable e) { - VolleyError cause = (VolleyError) e.getCause(); - String s = new String(cause.networkResponse.data, Charset.forName("UTF-8")); - Log.e(TAG, s); - Log.e(TAG, cause.toString()); - _log("onError " + s); - - } - - @Override - public void onNext(JSONObject jsonObject) { - Log.e(TAG, "onNext " + jsonObject.toString()); - _log("onNext " + jsonObject.toString()); - - } - })); + .subscribe(d); + + _disposables.add(d); } + /** * Converts the Asynchronous Request into a Synchronous Future that can be used to * block via {@code Future.get()}. Observables require blocking/synchronous functions - * @return JSONObject + * @return JSONObject * @throws ExecutionException * @throws InterruptedException */ diff --git a/app/src/main/res/layout/fragment_main.xml b/app/src/main/res/layout/fragment_main.xml index 96aa931a..322c2923 100644 --- a/app/src/main/res/layout/fragment_main.xml +++ b/app/src/main/res/layout/fragment_main.xml @@ -86,6 +86,13 @@ android:text="@string/btn_demo_timing" /> +