https://github.com/Netflix/RxJava http://techblog.netflix.com/2013/02/rxjava-netflix-api.html Stine (+backpressure): http://www.slideshare.net/mstine/reactive-fault-tolerant-programming-with-hystrix-and-rxjava https://www.youtube.com/watch?v=sRJmjO5D3KE Subscriber (like Barak Obama) - subscribe/unsubscribe Observer (like Michele Obama) - onNext/onError/onComplete Operators: http://reactivex.io/documentation/operators.html https://github.com/Netflix/RxJava/wiki zip, filter, map, mapMany, merge, concat Eledge-RxJava\02. Observable Creation, Composition, and Filtering\02_08-Filtering.mp4 Christensen - Reactive Programming with Rx slides https://github.com/Froussios/Intro-To-RxJava Visual: http://rxmarbles.com/ Threading: http://www.grahamlea.com/2014/07/rxjava-threading-examples/ Subjects: http://akarnokd.blogspot.com.by/2015/06/subjects-part-1.html http://akarnokd.blogspot.com.by/2015/09/subjects-part-2.html http://akarnokd.blogspot.com.by/2015/09/subjects-part-3-final.html RxStreamLibsCompare: http://akarnokd.blogspot.com.by/2015/10/comparison-of-reactive-streams.html Error Tracking: http://www.aanandshekharroy.com/articles/2018-02/error-handling-in-rxjava https://medium.com/square-corner-blog/no-cause-for-concern-rxjava-and-retrofit-throwing-a-tantrum-96c9e4ba8a6c#.mwi9r1s0v ifaces: Observer onNext(T) onCompleted(T) onError(Throwable t) Subscription unsubscribe() // to disconnect the observable from the observer it is subscribed internals: Observable - COLD like .range() - HOT don't push obsolete/historical data .empty() .create(...) // see later .from(T object [, Scheduler s]) // from a single simple value .from(Iterable list [, Scheduler s]) .from(T [] array [, Scheduler s]) .from(Future future [, Scheduler s]) .range(int start, int end) .parallel(...) to run in parallel (observable) -> { return observable.filter(...)...doOnNext((xx) -> { ... }); .concat(observable1, obs2, ...) .merge(obs1, obs2) immediately merge vs concat, which is wait .merge(obs1, obs2) immediately merge vs concat, which is wait .zip(wrappedObservable, accum::collapseFunction) SomeClass accum; SomeClass accum::collapseFunction(Object [] events) { ... } .zip(wrappedObservable1, wrappedObservable2, accum::collapseFunction) .subscribe() .subscribe(,) .subscribe(,,) .subscribeOn(Sheduler s) Schedulers.newThread() .observeOn(Scheduler s) // if not speci-d, the subscribeOn's one will be used Schedulers.io() .onErrorResumeNext(throwable -> {... return ...}) .doOnNext(x -> ...) .doOnCompleted(() -> { synchronized(waitMon) { waitMon.notify(); } }) // ConnectableObservable .publish() // -> returns ConnectableObservable connectable.subscribe(...) // subscribe, but emitting does not happens untill connect() is called. connectable.connect() // start .toSortedList((e1, e2) -> { return e1...().compareTo(e2...); }) // predicates .filter((i) -> { return some bool expr; }) .distinct() // positional filtering .first([pred]) .firstOrDefault("list is empty") .last([pred]) .elementAt(idx) .elementAtOrDefault(idx, def) .take(num) .takeLast(num) // time based filtering .sample(num, TimeUnit.SECONDS) .timeout(num, TimeUnit) // generate onError(java.util.concurrent.TimeoutException) in case of timeout .amb(observable1, observable2) // select whichever observable who wins the race .skipUntil(observable) // skip all output from source observable until the param-observable begins to emit items .takeUntil(observable) // emits ... // transformations .map(op) .flatMap(el -> (r1, r2, ..., rn)) // ... (el) -> { return Observable... } .scan(initState, (curState, item) -> { ... return newState ...}) // like a fold .groupBy((i) -> { ... ? "ODD", "EVEN" ... }) // ... groups, maintains the order of events .subscribe((groupList) -> { // groupList.getKey(); groupList.subscribe((x) -> { ... }); }); .buffer(num, TimeUnit.) .subscribe((list) -> { ... }) // conditional operations - control one evt-stream using another .defaultIfEmpty(value) .skipWhile(v -> ... bool-cond ...) .takeWhile(v -> ... bool-cond ...) .takeWhileWithIndex((v, idx) -> ... bool-cond ...) // resource management .using(resourceFactory, observableFactory) Func0 resourceFactory = () -> { ... return new ConnectionSubscription(...) ... } Func1> observableFactory = (connectionSubscription) -> { ... bla-bla(connectionSubscription) ... } ConnectionSubscription implements Subscription // unsubscribe method .registerResourceForClose(s) // Subjects // A Subject is a hybrid between a Observable and a Subscriber. So it can both receive and emit events. Subject = Subscriber (can be used to subscribe to more than one observables) + Observable (can reemit events + drive a new ones using onNext/onError/onCompleted) PublishSubject // emits events to the subscribers after the point of subscription // events, emitted before, aren't seen // good for multicasting events to subscribers PublishSubject subj = PublishSubject.create(); subj.subscribe((item) -> { ... }) ... Observable ... .subscribe( ... subj.onNext(...) ... subj.onCompleted() ... subj.onError(...) ... ) BehaviorSubjects // requires to provide a start state (or last publish) event // it will be the first event the new subscriber sees BehaviorSubjects subj = BehaviorSubjects.create("Start State"); AsyncSubject // only emits the last event seen before the onCompleted-call // all other events are consumed without being re-emitted AsyncSubject subj = AsyncSubject.create(); // Backpreassure etc. // No more sending multiple requests within a specified time frame // get the first item in a time-frame .debounce(n, TimeUnit.Seconds) // -//- // get the last item in a time-frame .throttleLast(n, TimeUnit.MILLISECONDS) // .throttleFirst() .buffer() .window(n) .sample() // Error processing .onErrorReturn(throwable -> new User()) .onErrorResumeNext(Observable.just(....)) http://docs.couchbase.com/developer/java-2.0/observables.html // ??? .delay(TimeUnit.whatever) // ??? .peek(?) BlockingObservable .from (...) .from (Observable o) Observable types * non-blocking ubservables - async execution supported (RxJava is a single-threaded by default) - unsubscribe at any point in the event stream * blocking observables (BlockingObservable subclass) - events are synchronous - no ability to unsubscribe in the middle of event stream * connectable observables * resource management * subjects Observable.create(subscriber -> subscriber.setProducer(request -> { int idx = (int)request; subscriber.onNext(... idx ...); }) ); Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { for (String item : collection) { if (subscriber.isUnsubscribed()) { break; } String data = item.getSomething(...); subscriber.onNext(... data ...); } subscriber.onCompleted(); // ??? onError??? } }); observable.subscribe(new Action1() { @Override public void call(String data) { ... } }); observable.subscribe(new Subscriber() { ... @Override public void onNext(String data) { ... if (data ...) { unsubscribe(); } } ... }); new Subscriber() { // !!! @Override public void onStart() { super.onStart(); request(nnn); // request first nnn items request(0) // ??? } @Override public void onNext(StackInfo stackInfo) { ... // calculate idx request(idx); } @Override public void onCompleted() { ... } @Override public void onError(Throwable t) { ... } } observable.subscribe( // onNext (i) -> { ... }, // onError (t) -> { ... }, // onCompleted () -> { ... } ); // Errors .error(new SomeException(...)) // retry .retry() infinitely retry .retry(num) // !!! other overloaded retry stuff like retryWhen BehaviorSubjects extends Observable // we can call it's onNext/onError/onCompleted schedulers - computation() num of threads = num of CPU cores - currentThread() finished the on-going work and then invokes the handler's code for the current-thread - immetiate() does not allow the on-going work to finish, but immediately invokes the handler's code for the cur-thread - io() uses a thread-pool which grows as needed - newThread() new thread for the each unit of work - executor(Executor) - executor(ScheduledExecutor) wrap an std-java i-face - trampoline() ... - test() UT schedulers observableFutureList = Observable.from(future); Schedulers.computation().schedule(() -> { future.run(); }); observableFutureList.subscribe((list) -> { list.forEach((i) -> { System.out.println(i); }); }); async processing: Observable interval1 = Observable .interval(2000L, TimeUnit.MILLISECONDS, Schedulers.computation()) .take(5) .doOnEach(Util.debug("Interval 1")) .finallyDo(latch::countDown) error hanling by default, calling the onError() results to the stream shutdown. TESTING: https://www.infoq.com/articles/Testing-RxJava SAMPLES: XML parsing Bytes.fromClassPath("file.xml") .lift(XML.parse()) .filter(e->e.getType() == XmlEventTypes.START_ELEMENT) .map(e->e.getAttributes().get("name)) .subscribe(System.err::println);