tutorial, conditional_all, rx,

Rx - conditional_all

Upendra Upendra Follow Jan 23, 2025 · 11 mins read
Rx - conditional_all
Share this

Conditional and Boolean Operators

Operators that evaluate one or more Observables or items emitted by Observables:
All — determine whether all items emitted by an Observable meet some criteria
Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
Contains — determine whether an Observable emits a particular item or not
DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
SequenceEqual — determine whether two Observables emit the same sequence of items
SkipUntil — discard items emitted by an Observable until a second Observable emits an item
SkipWhile — discard items emitted by an Observable until a specified condition becomes false
TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
TakeWhile — discard items emitted by an Observable after a specified condition becomes false

All

Pass a predicate function to the All operator that accepts an item emitted by the source Observable and returns a boolean value based on an evaluation of that item. All returns an Observable that emits a single boolean value: true if and only if the source Observable terminates normally and every item emitted by the source Observable evaluated as true according to this predicate; false if any item emitted by the source Observable evaluates as false according to this predicate.

private fun all() {
    Observable.just(1, 2, 3)
            .all { number -> number % 2 == 0 }
            .subscribe { result -> println("Is all numbers even = $result") }

    Observable.just(2, 4, 8)
            .all { number -> number % 2 == 0 }
            .subscribe { result -> println("Is all numbers even = $result") }
}

Output:

All numbers even = false
All numbers even = true

Amb

Given two or more source Observables, emit all of the items from only the first of these Observables to emit an item or notification.

When you pass a number of source Observables to Amb, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb, either by emitting an item or sending an onError or onCompleted notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables.

private fun amb() {
    val firstObservable = Observable.timer(4, TimeUnit.SECONDS)
            .flatMap { Observable.just(1, 3, 5, 7, 9, 11, 13, 15) }

    val secondObservable = Observable.timer(3, TimeUnit.SECONDS)
            .flatMap { Observable.just(2, 4, 6, 8, 10, 12, 14) }

    Observable
            .amb(listOf(firstObservable, secondObservable))
            .subscribe { result -> println("Next item = $result") }
}

secondObservable gets executed and the firstObservable is ignored

Output:

Next item = 2
Next item = 4
Next item = 6
Next item = 8
Next item = 10
Next item = 12
Next item = 14

Contains

Determine whether an Observable emits a particular item or not. Pass the Contains operator a particular item, and the Observable it returns will emit true if that item is emitted by the source Observable, or false if the source Observable terminates without emitting that item.

fun contains() {
    Observable.just(1, 2, 3, 10, 11, 12)
            .contains(100)
            .subscribe { result -> println("Sequence contains element = $result") }

    Observable.just(1, 2, 3, 10, 11, 12)
            .contains(12)
            .subscribe { result -> println("Sequence contains element = $result") }
}

Output:

Sequence contains element = false
Sequence contains element = true

DefaultIfEmpty

Emit items from the source Observable, or a default item if the source Observable emits nothing. The DefaultIfEmpty operator simply mirrors the source Observable exactly if the source Observable emits any items. If the source Observable terminates normally (with an onComplete) without emitting any items, the Observable returned from DefaultIfEmpty will instead emit a default item of your choosing before it too completes.

fun defaultIfEmpty() {
    Observable.empty<Any>()
            .defaultIfEmpty(10)
            .subscribe { result: Any -> println("Default if empty = $result") }
}

Output:

Default if empty = 10

SequenceEqual

Determine whether two Observables emit the same sequence of items. Pass SequenceEqual two Observables, and it will compare the items emitted by each Observable, and the Observable it returns will emit true only if both sequences are the same (the same items, in the same order, with the same termination state).

fun sequenceEquals() {
    Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(3, 2, 1))
            .subscribe { result -> println("Sequence equal = $result") }

    Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
            .subscribe { result -> println("Sequence equal = $result") }
}

Output:

Sequence equal = false
Sequence equal = true

SkipUntil

Discard items emitted by an Observable until a second Observable emits an item. The SkipUntil subscribes to the source Observable, but ignores its emissions until such time as a second Observable emits an item, at which point SkipUntil begins to mirror the source Observable.

fun skipUntil() {
    val firstObservable = Observable.create<Int>{ emitter ->
        run {
            for (i in 0..5) {
                Thread.sleep(1000)
                emitter.onNext(i)
            }

            emitter.onComplete()
        }
    }

    val secondObservable = Observable.timer(3, TimeUnit.SECONDS)
            .map { Observable.just(10, 11, 12, 13, 14, 15) }

    firstObservable.skipUntil(secondObservable)
            .subscribe { result -> println("Next item = $result") }
}

firstObservable starts emitting items after 3 seconds

Output:

Next item = 3
Next item = 4
Next item = 5

SkipWhile

Discard items emitted by an Observable until a specified condition becomes false. The SkipWhile subscribes to the source Observable, but ignores its emissions until such time as some condition you specify becomes false, at which point SkipWhile begins to mirror the source Observable.

fun skipWhile() {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .skipWhile { number -> number < 5 }
            .subscribe { result -> println("Next item = $result") }
}

Output:

Next item = 5
Next item = 6
Next item = 7
Next item = 8
Next item = 9

TakeUntil

Discard any items emitted by an Observable after a second Observable emits an item or terminates. The TakeUntil subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a termination notification, the Observable returned by TakeUntil stops mirroring the source Observable and terminates.

fun takeUntil() {
    val firstObservable = Observable.create<Int>{ emitter ->
        run {
            for (i in 0..5) {
                Thread.sleep(1000)
                emitter.onNext(i)
            }

            emitter.onComplete()
        }
    }

    val secondObservable = Observable.timer(2, TimeUnit.SECONDS)
            .map { Observable.just(10, 11, 12, 13, 14, 15) }

    firstObservable.takeUntil(secondObservable)
            .subscribe { result -> println("Next item = $result") }
}

Output:

Next item = 0
Next item = 1

TakeWhile

Mirror items emitted by an Observable until a specified condition becomes false. The TakeWhile mirrors the source Observable until such time as some condition you specify becomes false, at which point TakeWhile stops mirroring the source Observable and terminates its own Observable.

fun takeWhile() {
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .takeWhile { number -> number < 5 }
            .subscribe { result -> println("Next item = $result") }
}

Output:

Next item = 1
Next item = 2
Next item = 3
Next item = 4

Links

http://reactivex.io/documentation/operators.html
http://reactivex.io/documentation/operators/all.html
http://reactivex.io/documentation/operators/amb.html
http://reactivex.io/documentation/operators/contains.html
http://reactivex.io/documentation/operators/defaultifempty.html
http://reactivex.io/documentation/operators/sequenceequal.html
http://reactivex.io/documentation/operators/skipuntil.html
http://reactivex.io/documentation/operators/skipwhile.html
http://reactivex.io/documentation/operators/takeuntil.html
http://reactivex.io/documentation/operators/takewhile.html
https://proandroiddev.com/exploring-rxjava-in-android-conditional-and-boolean-operators-3bca84c773af

credit goes to @swayangjit
Join Newsletter
Get the latest news right in your inbox. We never spam!
Upendra
Written by Upendra Follow
Hi, I am Upendra, the author in Human and machine languages,I don't know to how 3 liner bio works so just Connect with me on social sites you will get to know me better.