зеркало из
				https://github.com/iharh/notes.git
				synced 2025-11-04 15:46:08 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			377 строки
		
	
	
		
			10 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			377 строки
		
	
	
		
			10 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
https://github.com/Netflix/RxJava
 | 
						|
http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
 | 
						|
 | 
						|
Stine (+backpressure):
 | 
						|
http://www.slideshare.net/mstine/reactive-fault-tolerant-programming-with-hystrix-and-rxjava
 | 
						|
https://www.youtube.com/watch?v=sRJmjO5D3KE
 | 
						|
 | 
						|
 | 
						|
Subscriber (like Barak Obama)
 | 
						|
    - subscribe/unsubscribe
 | 
						|
Observer (like Michele Obama)
 | 
						|
    - onNext/onError/onComplete
 | 
						|
 | 
						|
 | 
						|
Operators:
 | 
						|
http://reactivex.io/documentation/operators.html
 | 
						|
 | 
						|
https://github.com/Netflix/RxJava/wiki
 | 
						|
  zip, filter, map, mapMany, merge, concat
 | 
						|
  Eledge-RxJava\02. Observable Creation, Composition, and Filtering\02_08-Filtering.mp4
 | 
						|
  Christensen - Reactive Programming with Rx slides
 | 
						|
 | 
						|
  https://github.com/Froussios/Intro-To-RxJava
 | 
						|
 | 
						|
Visual:
 | 
						|
http://rxmarbles.com/
 | 
						|
 | 
						|
Threading:
 | 
						|
http://www.grahamlea.com/2014/07/rxjava-threading-examples/
 | 
						|
 | 
						|
Subjects:
 | 
						|
http://akarnokd.blogspot.com.by/2015/06/subjects-part-1.html
 | 
						|
http://akarnokd.blogspot.com.by/2015/09/subjects-part-2.html
 | 
						|
http://akarnokd.blogspot.com.by/2015/09/subjects-part-3-final.html
 | 
						|
 | 
						|
RxStreamLibsCompare:
 | 
						|
http://akarnokd.blogspot.com.by/2015/10/comparison-of-reactive-streams.html
 | 
						|
 | 
						|
Error Tracking:
 | 
						|
http://www.aanandshekharroy.com/articles/2018-02/error-handling-in-rxjava
 | 
						|
https://medium.com/square-corner-blog/no-cause-for-concern-rxjava-and-retrofit-throwing-a-tantrum-96c9e4ba8a6c#.mwi9r1s0v
 | 
						|
 | 
						|
ifaces:
 | 
						|
 | 
						|
Observer
 | 
						|
    onNext(T)
 | 
						|
    onCompleted(T)
 | 
						|
    onError(Throwable t)
 | 
						|
 | 
						|
Subscription
 | 
						|
    unsubscribe() // to disconnect the observable from the observer it is subscribed
 | 
						|
 | 
						|
 | 
						|
internals:
 | 
						|
 | 
						|
Observable
 | 
						|
    - COLD
 | 
						|
        like .range()
 | 
						|
    - HOT
 | 
						|
        don't push obsolete/historical data
 | 
						|
 | 
						|
    .empty()
 | 
						|
 | 
						|
    .create(...) // see later
 | 
						|
 | 
						|
    .from(T object [, Scheduler s]) // from a single simple value
 | 
						|
    .from(Iterable<T> list [, Scheduler s])
 | 
						|
    .from(T [] array [, Scheduler s])
 | 
						|
    .from(Future<T> future [, Scheduler s])
 | 
						|
    .range(int start, int end)
 | 
						|
 | 
						|
    .parallel(...)
 | 
						|
        to run in parallel
 | 
						|
        (observable) -> { return observable.filter(...)...doOnNext((xx) -> { ... });
 | 
						|
 | 
						|
    .concat(observable1, obs2, ...)
 | 
						|
    .merge(obs1, obs2)
 | 
						|
        immediately merge vs concat, which is wait
 | 
						|
        .merge(obs1, obs2)
 | 
						|
            immediately merge vs concat, which is wait
 | 
						|
    .zip(wrappedObservable, accum::collapseFunction)
 | 
						|
        SomeClass accum;
 | 
						|
            SomeClass accum::collapseFunction(Object [] events) {
 | 
						|
                ...
 | 
						|
            }
 | 
						|
    .zip(wrappedObservable1, wrappedObservable2, accum::collapseFunction)
 | 
						|
        
 | 
						|
 | 
						|
    .subscribe()
 | 
						|
    .subscribe(,)
 | 
						|
    .subscribe(,,)
 | 
						|
    .subscribeOn(Sheduler s)
 | 
						|
        Schedulers.newThread()
 | 
						|
    .observeOn(Scheduler s) // if not speci-d, the subscribeOn's one will be used
 | 
						|
        Schedulers.io()
 | 
						|
    .onErrorResumeNext(throwable -> {... return ...})
 | 
						|
    .doOnNext(x -> ...)
 | 
						|
    .doOnCompleted(() -> {
 | 
						|
        synchronized(waitMon) {
 | 
						|
            waitMon.notify();
 | 
						|
        }
 | 
						|
    })
 | 
						|
 | 
						|
    // ConnectableObservable
 | 
						|
    .publish() // -> returns ConnectableObservable<T>
 | 
						|
 | 
						|
    connectable.subscribe(...) // subscribe, but emitting does not happens untill connect() is called.
 | 
						|
    connectable.connect() // start
 | 
						|
 | 
						|
 | 
						|
    .toSortedList((e1, e2) -> { return e1...().compareTo(e2...); })
 | 
						|
 | 
						|
    // predicates
 | 
						|
    .filter((i) -> { return some bool expr; })
 | 
						|
    .distinct()
 | 
						|
 | 
						|
    // positional filtering
 | 
						|
    .first([pred])
 | 
						|
    .firstOrDefault("list is empty")
 | 
						|
    .last([pred])
 | 
						|
    .elementAt(idx)
 | 
						|
    .elementAtOrDefault(idx, def)
 | 
						|
    .take(num)
 | 
						|
    .takeLast(num)
 | 
						|
 | 
						|
    // time based filtering
 | 
						|
    .sample(num, TimeUnit.SECONDS)
 | 
						|
    .timeout(num, TimeUnit) // generate onError(java.util.concurrent.TimeoutException) in case of timeout
 | 
						|
    .amb(observable1, observable2) // select whichever observable who wins the race
 | 
						|
    .skipUntil(observable) // skip all output from source observable until the param-observable begins to emit items
 | 
						|
    .takeUntil(observable) // emits ...
 | 
						|
 | 
						|
    // transformations
 | 
						|
    .map(op)
 | 
						|
    .flatMap(el -> (r1, r2, ..., rn)) // ... (el) -> { return Observable... }
 | 
						|
    .scan(initState, (curState, item) -> { ... return newState ...}) // like a fold
 | 
						|
    .groupBy((i) -> { ... ? "ODD", "EVEN" ... }) // ... groups, maintains the order of events
 | 
						|
        .subscribe((groupList) -> {
 | 
						|
            // groupList.getKey();
 | 
						|
            groupList.subscribe((x) -> { ... });
 | 
						|
        });
 | 
						|
    .buffer(num, TimeUnit.<whatever>)
 | 
						|
        .subscribe((list) -> { ... })
 | 
						|
 | 
						|
    // conditional operations - control one evt-stream using another
 | 
						|
    .defaultIfEmpty(value)
 | 
						|
    .skipWhile(v -> ... bool-cond ...)
 | 
						|
    .takeWhile(v -> ... bool-cond ...)
 | 
						|
    .takeWhileWithIndex((v, idx) -> ... bool-cond ...)
 | 
						|
 | 
						|
    // resource management
 | 
						|
    .using(resourceFactory, observableFactory)
 | 
						|
        Func0<ConnectionSubscription> resourceFactory = () -> { ... return new ConnectionSubscription(...) ... }
 | 
						|
        Func1<ConnectionSubscription, Observable<String>> observableFactory = (connectionSubscription) -> { ... bla-bla(connectionSubscription) ... }
 | 
						|
 | 
						|
    ConnectionSubscription implements Subscription // unsubscribe method
 | 
						|
        .registerResourceForClose(s)
 | 
						|
 | 
						|
    // Subjects
 | 
						|
    // A Subject is a hybrid between a Observable and a Subscriber. So it can both receive and emit events.
 | 
						|
 | 
						|
    Subject = Subscriber (can be used to subscribe to more than one observables) + Observable (can reemit events + drive a new ones using onNext/onError/onCompleted)
 | 
						|
 | 
						|
    PublishSubject
 | 
						|
        // emits events to the subscribers after the point of subscription
 | 
						|
        // events, emitted before, aren't seen
 | 
						|
        // good for multicasting events to subscribers
 | 
						|
 | 
						|
        PublishSubject<String> subj = PublishSubject.create();
 | 
						|
        subj.subscribe((item) -> { ... })
 | 
						|
        ...
 | 
						|
        Observable ...
 | 
						|
            .subscribe( ... subj.onNext(...) ... subj.onCompleted() ... subj.onError(...) ... )
 | 
						|
 | 
						|
    BehaviorSubjects
 | 
						|
        // requires to provide a start state (or last publish) event
 | 
						|
        // it will be the first event the new subscriber sees
 | 
						|
 | 
						|
        BehaviorSubjects<String> subj = BehaviorSubjects.create("Start State");
 | 
						|
 | 
						|
    AsyncSubject
 | 
						|
        // only emits the last event seen before the onCompleted-call
 | 
						|
        // all other events are consumed without being re-emitted
 | 
						|
 | 
						|
        AsyncSubject<String> subj = AsyncSubject.create();
 | 
						|
 | 
						|
    // Backpreassure etc.
 | 
						|
 | 
						|
    // No more sending multiple requests within a specified time frame
 | 
						|
    // get the first item in a time-frame
 | 
						|
    .debounce(n, TimeUnit.Seconds) 
 | 
						|
    // -//-
 | 
						|
    // get the last item in a time-frame
 | 
						|
    .throttleLast(n, TimeUnit.MILLISECONDS)
 | 
						|
    //
 | 
						|
    .throttleFirst()
 | 
						|
    .buffer()
 | 
						|
    .window(n)
 | 
						|
    .sample()
 | 
						|
 | 
						|
    // Error processing
 | 
						|
    .onErrorReturn(throwable -> new User())
 | 
						|
    .onErrorResumeNext(Observable.just(....))
 | 
						|
 | 
						|
    http://docs.couchbase.com/developer/java-2.0/observables.html
 | 
						|
 | 
						|
    // ???
 | 
						|
    .delay(TimeUnit.whatever)
 | 
						|
 | 
						|
    // ???
 | 
						|
    .peek(?)
 | 
						|
 | 
						|
BlockingObservable
 | 
						|
    .from (...)
 | 
						|
    .from (Observable o)
 | 
						|
 | 
						|
Observable types
 | 
						|
* non-blocking ubservables
 | 
						|
    - async execution supported (RxJava is a single-threaded by default)
 | 
						|
    - unsubscribe at any point in the event stream
 | 
						|
 | 
						|
* blocking observables (BlockingObservable subclass)
 | 
						|
    - events are synchronous
 | 
						|
    - no ability to unsubscribe in the middle of event stream
 | 
						|
 | 
						|
* connectable observables
 | 
						|
 | 
						|
* resource management
 | 
						|
* subjects
 | 
						|
 | 
						|
 | 
						|
Observable.create(subscriber ->
 | 
						|
    subscriber.setProducer(request -> {
 | 
						|
        int idx = (int)request;
 | 
						|
        subscriber.onNext(... idx ...);
 | 
						|
    })
 | 
						|
);
 | 
						|
 | 
						|
Observable.create(new Observable.OnSubscribe<String>() {
 | 
						|
    @Override
 | 
						|
    public void call(Subscriber<? super String> subscriber) {
 | 
						|
        for (String item : collection) {
 | 
						|
            if (subscriber.isUnsubscribed()) {
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            String data = item.getSomething(...);
 | 
						|
            subscriber.onNext(... data ...);
 | 
						|
        }
 | 
						|
        subscriber.onCompleted();
 | 
						|
        // ??? onError???
 | 
						|
    }
 | 
						|
});
 | 
						|
 | 
						|
 | 
						|
observable.subscribe(new Action1<String>() {
 | 
						|
    @Override
 | 
						|
    public void call(String data) {
 | 
						|
        ...
 | 
						|
    }
 | 
						|
});
 | 
						|
 | 
						|
observable.subscribe(new Subscriber<String>() {
 | 
						|
    ...
 | 
						|
    @Override
 | 
						|
    public void onNext(String data) {
 | 
						|
        ...
 | 
						|
        if (data ...) {
 | 
						|
            unsubscribe();
 | 
						|
        }
 | 
						|
    }
 | 
						|
    ...
 | 
						|
});
 | 
						|
 | 
						|
new Subscriber<StackInfo>() {
 | 
						|
    // !!!
 | 
						|
    @Override
 | 
						|
    public void onStart() {
 | 
						|
        super.onStart();
 | 
						|
        request(nnn); // request first nnn items
 | 
						|
        request(0) // ???
 | 
						|
    }
 | 
						|
 | 
						|
    @Override
 | 
						|
    public void onNext(StackInfo stackInfo) {
 | 
						|
        ...
 | 
						|
        // calculate idx
 | 
						|
        request(idx);
 | 
						|
    }
 | 
						|
 | 
						|
    @Override
 | 
						|
    public void onCompleted() { ... }
 | 
						|
    @Override
 | 
						|
    public void onError(Throwable t) { ... }
 | 
						|
}
 | 
						|
 | 
						|
observable.subscribe(
 | 
						|
    // onNext
 | 
						|
    (i) -> { ... },
 | 
						|
    // onError
 | 
						|
    (t) -> { ... },
 | 
						|
    // onCompleted
 | 
						|
    () -> { ... }
 | 
						|
);
 | 
						|
 | 
						|
// Errors
 | 
						|
.error(new SomeException(...))
 | 
						|
 | 
						|
// retry
 | 
						|
.retry()
 | 
						|
    infinitely retry
 | 
						|
.retry(num)
 | 
						|
// !!! other overloaded retry stuff like retryWhen
 | 
						|
 | 
						|
 | 
						|
BehaviorSubjects<T> extends Observable<T>
 | 
						|
// we can call it's onNext/onError/onCompleted
 | 
						|
 | 
						|
 | 
						|
schedulers
 | 
						|
 | 
						|
- computation()
 | 
						|
    num of threads = num of CPU cores
 | 
						|
- currentThread()
 | 
						|
    finished the on-going work and then invokes the handler's code for the current-thread
 | 
						|
- immetiate()
 | 
						|
    does not allow the on-going work to finish, but immediately invokes the handler's code for the cur-thread
 | 
						|
- io()
 | 
						|
    uses a thread-pool which grows as needed
 | 
						|
- newThread()
 | 
						|
    new thread for the each unit of work
 | 
						|
- executor(Executor)
 | 
						|
- executor(ScheduledExecutor)
 | 
						|
    wrap an std-java i-face
 | 
						|
- trampoline()
 | 
						|
    ...
 | 
						|
- test()
 | 
						|
    UT schedulers
 | 
						|
 | 
						|
 | 
						|
observableFutureList = Observable.from(future);
 | 
						|
 | 
						|
Schedulers.computation().schedule(() -> {
 | 
						|
    future.run();
 | 
						|
});
 | 
						|
 | 
						|
observableFutureList.subscribe((list) -> {
 | 
						|
    list.forEach((i) -> {
 | 
						|
        System.out.println(i);
 | 
						|
    });
 | 
						|
});
 | 
						|
 | 
						|
 | 
						|
async processing:
 | 
						|
 | 
						|
Observable<Long> interval1 = Observable
 | 
						|
    .interval(2000L, TimeUnit.MILLISECONDS, Schedulers.computation())
 | 
						|
    .take(5)
 | 
						|
    .doOnEach(Util.debug("Interval 1"))
 | 
						|
    .finallyDo(latch::countDown)
 | 
						|
 | 
						|
 | 
						|
error hanling
 | 
						|
 | 
						|
by default, calling the onError() results to the stream shutdown.
 | 
						|
 | 
						|
TESTING:
 | 
						|
https://www.infoq.com/articles/Testing-RxJava
 | 
						|
 | 
						|
SAMPLES:
 | 
						|
 | 
						|
XML parsing
 | 
						|
Bytes.fromClassPath("file.xml")
 | 
						|
    .lift(XML.parse())
 | 
						|
    .filter(e->e.getType() == XmlEventTypes.START_ELEMENT)
 | 
						|
    .map(e->e.getAttributes().get("name))
 | 
						|
    .subscribe(System.err::println);
 |