Conditional and Boolean Operators
Operators that evaluate one or more Observables or items emitted by Observables:
All
— determine whether all items emitted by an Observable
meet some criteria
Amb
— given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
Contains
— determine whether an Observable
emits a particular item or not
DefaultIfEmpty
— emit items from the source Observable
, or a default item if the source Observable
emits nothing
SequenceEqual
— determine whether two Observables emit the same sequence of items
SkipUntil
— discard items emitted by an Observable
until a second Observable
emits an item
SkipWhile
— discard items emitted by an Observable
until a specified condition becomes false
TakeUntil
— discard items emitted by an Observable
after a second Observable
emits an item or terminates
TakeWhile
— discard items emitted by an Observable
after a specified condition becomes false
All
Pass a predicate function to the All
operator that accepts an item emitted by the source Observable
and returns a boolean
value based on an evaluation of that item. All returns an Observable
that emits a single boolean
value: true
if and only if the source Observable
terminates normally and every item emitted by the source Observable
evaluated as true
according to this predicate; false
if any item emitted by the source Observable
evaluates as false according to this predicate.
private fun all() {
Observable.just(1, 2, 3)
.all { number -> number % 2 == 0 }
.subscribe { result -> println("Is all numbers even = $result") }
Observable.just(2, 4, 8)
.all { number -> number % 2 == 0 }
.subscribe { result -> println("Is all numbers even = $result") }
}
Output:
All numbers even = false
All numbers even = true
Amb
Given two or more source Observables, emit all of the items from only the first of these Observables to emit an item or notification.
When you pass a number of source Observables to Amb
, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb
, either by emitting an item or sending an onError
or onCompleted
notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables.
private fun amb() {
val firstObservable = Observable.timer(4, TimeUnit.SECONDS)
.flatMap { Observable.just(1, 3, 5, 7, 9, 11, 13, 15) }
val secondObservable = Observable.timer(3, TimeUnit.SECONDS)
.flatMap { Observable.just(2, 4, 6, 8, 10, 12, 14) }
Observable
.amb(listOf(firstObservable, secondObservable))
.subscribe { result -> println("Next item = $result") }
}
secondObservable
gets executed and the firstObservable
is ignored
Output:
Next item = 2
Next item = 4
Next item = 6
Next item = 8
Next item = 10
Next item = 12
Next item = 14
Contains
Determine whether an Observable
emits a particular item or not. Pass the Contains
operator a particular item, and the Observable
it returns will emit true
if that item is emitted by the source Observable
, or false
if the source Observable
terminates without emitting that item.
fun contains() {
Observable.just(1, 2, 3, 10, 11, 12)
.contains(100)
.subscribe { result -> println("Sequence contains element = $result") }
Observable.just(1, 2, 3, 10, 11, 12)
.contains(12)
.subscribe { result -> println("Sequence contains element = $result") }
}
Output:
Sequence contains element = false
Sequence contains element = true
DefaultIfEmpty
Emit items from the source Observable
, or a default item if the source Observable
emits nothing. The DefaultIfEmpty
operator simply mirrors the source Observable
exactly if the source Observable
emits any items. If the source Observable
terminates normally (with an onComplete
) without emitting any items, the Observable
returned from DefaultIfEmpty
will instead emit a default item of your choosing before it too completes.
fun defaultIfEmpty() {
Observable.empty<Any>()
.defaultIfEmpty(10)
.subscribe { result: Any -> println("Default if empty = $result") }
}
Output:
Default if empty = 10
SequenceEqual
Determine whether two Observables emit the same sequence of items. Pass SequenceEqual
two Observables, and it will compare the items emitted by each Observable
, and the Observable
it returns will emit true
only if both sequences are the same (the same items, in the same order, with the same termination state).
fun sequenceEquals() {
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(3, 2, 1))
.subscribe { result -> println("Sequence equal = $result") }
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
.subscribe { result -> println("Sequence equal = $result") }
}
Output:
Sequence equal = false
Sequence equal = true
SkipUntil
Discard items emitted by an Observable
until a second Observable
emits an item. The SkipUntil
subscribes to the source Observable
, but ignores its emissions until such time as a second Observable
emits an item, at which point SkipUntil
begins to mirror the source Observable
.
fun skipUntil() {
val firstObservable = Observable.create<Int>{ emitter ->
run {
for (i in 0..5) {
Thread.sleep(1000)
emitter.onNext(i)
}
emitter.onComplete()
}
}
val secondObservable = Observable.timer(3, TimeUnit.SECONDS)
.map { Observable.just(10, 11, 12, 13, 14, 15) }
firstObservable.skipUntil(secondObservable)
.subscribe { result -> println("Next item = $result") }
}
firstObservable
starts emitting items after 3 seconds
Output:
Next item = 3
Next item = 4
Next item = 5
SkipWhile
Discard items emitted by an Observable
until a specified condition becomes false
. The SkipWhile
subscribes to the source Observable
, but ignores its emissions until such time as some condition you specify becomes false
, at which point SkipWhile
begins to mirror the source Observable
.
fun skipWhile() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.skipWhile { number -> number < 5 }
.subscribe { result -> println("Next item = $result") }
}
Output:
Next item = 5
Next item = 6
Next item = 7
Next item = 8
Next item = 9
TakeUntil
Discard any items emitted by an Observable
after a second Observable
emits an item or terminates. The TakeUntil
subscribes and begins mirroring the source Observable
. It also monitors a second Observable
that you provide. If this second Observable
emits an item or sends a termination notification, the Observable
returned by TakeUntil
stops mirroring the source Observable
and terminates.
fun takeUntil() {
val firstObservable = Observable.create<Int>{ emitter ->
run {
for (i in 0..5) {
Thread.sleep(1000)
emitter.onNext(i)
}
emitter.onComplete()
}
}
val secondObservable = Observable.timer(2, TimeUnit.SECONDS)
.map { Observable.just(10, 11, 12, 13, 14, 15) }
firstObservable.takeUntil(secondObservable)
.subscribe { result -> println("Next item = $result") }
}
Output:
Next item = 0
Next item = 1
TakeWhile
Mirror items emitted by an Observabl
e until a specified condition becomes false
. The TakeWhile
mirrors the source Observable
until such time as some condition you specify becomes false
, at which point TakeWhile
stops mirroring the source Observable
and terminates its own Observable
.
fun takeWhile() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.takeWhile { number -> number < 5 }
.subscribe { result -> println("Next item = $result") }
}
Output:
Next item = 1
Next item = 2
Next item = 3
Next item = 4
Links
http://reactivex.io/documentation/operators.html
http://reactivex.io/documentation/operators/all.html
http://reactivex.io/documentation/operators/amb.html
http://reactivex.io/documentation/operators/contains.html
http://reactivex.io/documentation/operators/defaultifempty.html
http://reactivex.io/documentation/operators/sequenceequal.html
http://reactivex.io/documentation/operators/skipuntil.html
http://reactivex.io/documentation/operators/skipwhile.html
http://reactivex.io/documentation/operators/takeuntil.html
http://reactivex.io/documentation/operators/takewhile.html
https://proandroiddev.com/exploring-rxjava-in-android-conditional-and-boolean-operators-3bca84c773af