notes/pl/java/libfws/fp/frp/rxjava/rxjava.txt
Ihar Hancharenka 5dff80e88e first
2023-03-27 16:52:17 +03:00

377 строки
10 KiB
Plaintext

https://github.com/Netflix/RxJava
http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
Stine (+backpressure):
http://www.slideshare.net/mstine/reactive-fault-tolerant-programming-with-hystrix-and-rxjava
https://www.youtube.com/watch?v=sRJmjO5D3KE
Subscriber (like Barak Obama)
- subscribe/unsubscribe
Observer (like Michele Obama)
- onNext/onError/onComplete
Operators:
http://reactivex.io/documentation/operators.html
https://github.com/Netflix/RxJava/wiki
zip, filter, map, mapMany, merge, concat
Eledge-RxJava\02. Observable Creation, Composition, and Filtering\02_08-Filtering.mp4
Christensen - Reactive Programming with Rx slides
https://github.com/Froussios/Intro-To-RxJava
Visual:
http://rxmarbles.com/
Threading:
http://www.grahamlea.com/2014/07/rxjava-threading-examples/
Subjects:
http://akarnokd.blogspot.com.by/2015/06/subjects-part-1.html
http://akarnokd.blogspot.com.by/2015/09/subjects-part-2.html
http://akarnokd.blogspot.com.by/2015/09/subjects-part-3-final.html
RxStreamLibsCompare:
http://akarnokd.blogspot.com.by/2015/10/comparison-of-reactive-streams.html
Error Tracking:
http://www.aanandshekharroy.com/articles/2018-02/error-handling-in-rxjava
https://medium.com/square-corner-blog/no-cause-for-concern-rxjava-and-retrofit-throwing-a-tantrum-96c9e4ba8a6c#.mwi9r1s0v
ifaces:
Observer
onNext(T)
onCompleted(T)
onError(Throwable t)
Subscription
unsubscribe() // to disconnect the observable from the observer it is subscribed
internals:
Observable
- COLD
like .range()
- HOT
don't push obsolete/historical data
.empty()
.create(...) // see later
.from(T object [, Scheduler s]) // from a single simple value
.from(Iterable<T> list [, Scheduler s])
.from(T [] array [, Scheduler s])
.from(Future<T> future [, Scheduler s])
.range(int start, int end)
.parallel(...)
to run in parallel
(observable) -> { return observable.filter(...)...doOnNext((xx) -> { ... });
.concat(observable1, obs2, ...)
.merge(obs1, obs2)
immediately merge vs concat, which is wait
.merge(obs1, obs2)
immediately merge vs concat, which is wait
.zip(wrappedObservable, accum::collapseFunction)
SomeClass accum;
SomeClass accum::collapseFunction(Object [] events) {
...
}
.zip(wrappedObservable1, wrappedObservable2, accum::collapseFunction)
.subscribe()
.subscribe(,)
.subscribe(,,)
.subscribeOn(Sheduler s)
Schedulers.newThread()
.observeOn(Scheduler s) // if not speci-d, the subscribeOn's one will be used
Schedulers.io()
.onErrorResumeNext(throwable -> {... return ...})
.doOnNext(x -> ...)
.doOnCompleted(() -> {
synchronized(waitMon) {
waitMon.notify();
}
})
// ConnectableObservable
.publish() // -> returns ConnectableObservable<T>
connectable.subscribe(...) // subscribe, but emitting does not happens untill connect() is called.
connectable.connect() // start
.toSortedList((e1, e2) -> { return e1...().compareTo(e2...); })
// predicates
.filter((i) -> { return some bool expr; })
.distinct()
// positional filtering
.first([pred])
.firstOrDefault("list is empty")
.last([pred])
.elementAt(idx)
.elementAtOrDefault(idx, def)
.take(num)
.takeLast(num)
// time based filtering
.sample(num, TimeUnit.SECONDS)
.timeout(num, TimeUnit) // generate onError(java.util.concurrent.TimeoutException) in case of timeout
.amb(observable1, observable2) // select whichever observable who wins the race
.skipUntil(observable) // skip all output from source observable until the param-observable begins to emit items
.takeUntil(observable) // emits ...
// transformations
.map(op)
.flatMap(el -> (r1, r2, ..., rn)) // ... (el) -> { return Observable... }
.scan(initState, (curState, item) -> { ... return newState ...}) // like a fold
.groupBy((i) -> { ... ? "ODD", "EVEN" ... }) // ... groups, maintains the order of events
.subscribe((groupList) -> {
// groupList.getKey();
groupList.subscribe((x) -> { ... });
});
.buffer(num, TimeUnit.<whatever>)
.subscribe((list) -> { ... })
// conditional operations - control one evt-stream using another
.defaultIfEmpty(value)
.skipWhile(v -> ... bool-cond ...)
.takeWhile(v -> ... bool-cond ...)
.takeWhileWithIndex((v, idx) -> ... bool-cond ...)
// resource management
.using(resourceFactory, observableFactory)
Func0<ConnectionSubscription> resourceFactory = () -> { ... return new ConnectionSubscription(...) ... }
Func1<ConnectionSubscription, Observable<String>> observableFactory = (connectionSubscription) -> { ... bla-bla(connectionSubscription) ... }
ConnectionSubscription implements Subscription // unsubscribe method
.registerResourceForClose(s)
// Subjects
// A Subject is a hybrid between a Observable and a Subscriber. So it can both receive and emit events.
Subject = Subscriber (can be used to subscribe to more than one observables) + Observable (can reemit events + drive a new ones using onNext/onError/onCompleted)
PublishSubject
// emits events to the subscribers after the point of subscription
// events, emitted before, aren't seen
// good for multicasting events to subscribers
PublishSubject<String> subj = PublishSubject.create();
subj.subscribe((item) -> { ... })
...
Observable ...
.subscribe( ... subj.onNext(...) ... subj.onCompleted() ... subj.onError(...) ... )
BehaviorSubjects
// requires to provide a start state (or last publish) event
// it will be the first event the new subscriber sees
BehaviorSubjects<String> subj = BehaviorSubjects.create("Start State");
AsyncSubject
// only emits the last event seen before the onCompleted-call
// all other events are consumed without being re-emitted
AsyncSubject<String> subj = AsyncSubject.create();
// Backpreassure etc.
// No more sending multiple requests within a specified time frame
// get the first item in a time-frame
.debounce(n, TimeUnit.Seconds)
// -//-
// get the last item in a time-frame
.throttleLast(n, TimeUnit.MILLISECONDS)
//
.throttleFirst()
.buffer()
.window(n)
.sample()
// Error processing
.onErrorReturn(throwable -> new User())
.onErrorResumeNext(Observable.just(....))
http://docs.couchbase.com/developer/java-2.0/observables.html
// ???
.delay(TimeUnit.whatever)
// ???
.peek(?)
BlockingObservable
.from (...)
.from (Observable o)
Observable types
* non-blocking ubservables
- async execution supported (RxJava is a single-threaded by default)
- unsubscribe at any point in the event stream
* blocking observables (BlockingObservable subclass)
- events are synchronous
- no ability to unsubscribe in the middle of event stream
* connectable observables
* resource management
* subjects
Observable.create(subscriber ->
subscriber.setProducer(request -> {
int idx = (int)request;
subscriber.onNext(... idx ...);
})
);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (String item : collection) {
if (subscriber.isUnsubscribed()) {
break;
}
String data = item.getSomething(...);
subscriber.onNext(... data ...);
}
subscriber.onCompleted();
// ??? onError???
}
});
observable.subscribe(new Action1<String>() {
@Override
public void call(String data) {
...
}
});
observable.subscribe(new Subscriber<String>() {
...
@Override
public void onNext(String data) {
...
if (data ...) {
unsubscribe();
}
}
...
});
new Subscriber<StackInfo>() {
// !!!
@Override
public void onStart() {
super.onStart();
request(nnn); // request first nnn items
request(0) // ???
}
@Override
public void onNext(StackInfo stackInfo) {
...
// calculate idx
request(idx);
}
@Override
public void onCompleted() { ... }
@Override
public void onError(Throwable t) { ... }
}
observable.subscribe(
// onNext
(i) -> { ... },
// onError
(t) -> { ... },
// onCompleted
() -> { ... }
);
// Errors
.error(new SomeException(...))
// retry
.retry()
infinitely retry
.retry(num)
// !!! other overloaded retry stuff like retryWhen
BehaviorSubjects<T> extends Observable<T>
// we can call it's onNext/onError/onCompleted
schedulers
- computation()
num of threads = num of CPU cores
- currentThread()
finished the on-going work and then invokes the handler's code for the current-thread
- immetiate()
does not allow the on-going work to finish, but immediately invokes the handler's code for the cur-thread
- io()
uses a thread-pool which grows as needed
- newThread()
new thread for the each unit of work
- executor(Executor)
- executor(ScheduledExecutor)
wrap an std-java i-face
- trampoline()
...
- test()
UT schedulers
observableFutureList = Observable.from(future);
Schedulers.computation().schedule(() -> {
future.run();
});
observableFutureList.subscribe((list) -> {
list.forEach((i) -> {
System.out.println(i);
});
});
async processing:
Observable<Long> interval1 = Observable
.interval(2000L, TimeUnit.MILLISECONDS, Schedulers.computation())
.take(5)
.doOnEach(Util.debug("Interval 1"))
.finallyDo(latch::countDown)
error hanling
by default, calling the onError() results to the stream shutdown.
TESTING:
https://www.infoq.com/articles/Testing-RxJava
SAMPLES:
XML parsing
Bytes.fromClassPath("file.xml")
.lift(XML.parse())
.filter(e->e.getType() == XmlEventTypes.START_ELEMENT)
.map(e->e.getAttributes().get("name))
.subscribe(System.err::println);