java - Properly handling empty Observable in RxJava -
i have situation creating observable containing results database. applying series of filters them. have subscriber logging results. may case no elements make way though filters. business logic states not error. however, when happens onerror called , contains following exception: java.util.nosuchelementexception: sequence contains no elements
is accepted practice detect type of exception , ignore it? or there better way handle this?
the version 1.0.0.
here simple test case exposes i'm seeing. appears related having events filtered before reaching map , reduce.
@test public void test() { integer values[] = new integer[]{1, 2, 3, 4, 5}; observable.from(values).filter(new func1<integer, boolean>() { @override public boolean call(integer integer) { if (integer < 0) return true; else return false; } }).map(new func1<integer, string>() { @override public string call(integer integer) { return string.valueof(integer); } }).reduce(new func2<string, string, string>() { @override public string call(string s, string s2) { return s + "," + s2; } }) .subscribe(new action1<string>() { @override public void call(string s) { system.out.println(s); } }); }
because using safe subscriber, throws onerrornotimplementedexception wraps following exception:
java.util.nosuchelementexception: sequence contains no elements @ rx.internal.operators.operatorsingle$1.oncompleted(operatorsingle.java:82) @ rx.internal.operators.notificationlite.accept(notificationlite.java:140) @ rx.internal.operators.takelastqueueproducer.emit(takelastqueueproducer.java:73) @ rx.internal.operators.takelastqueueproducer.startemitting(takelastqueueproducer.java:45) @ rx.internal.operators.operatortakelast$1.oncompleted(operatortakelast.java:59) @ rx.internal.operators.operatorscan$2.oncompleted(operatorscan.java:121) @ rx.internal.operators.operatormap$1.oncompleted(operatormap.java:43) @ rx.internal.operators.operatorfilter$1.oncompleted(operatorfilter.java:42) @ rx.internal.operators.onsubscribefromiterable$iterableproducer.request(onsubscribefromiterable.java:79) @ rx.internal.operators.operatorscan$2$1.request(operatorscan.java:147) @ rx.subscriber.setproducer(subscriber.java:139) @ rx.internal.operators.operatorscan$2.setproducer(operatorscan.java:139) @ rx.subscriber.setproducer(subscriber.java:133) @ rx.subscriber.setproducer(subscriber.java:133) @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:47) @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:33) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable.subscribe(observable.java:7284)
based on answer @davem below, created new test case:
@test public void testfromblockingandsingle() { integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; list<string> results = observable.from(values).filter(new func1<integer, boolean>() { @override public boolean call(integer integer) { if (integer < 0) return true; else return false; } }).map(new func1<integer, string>() { @override public string call(integer integer) { return string.valueof(integer); } }).reduce(new func2<string, string, string>() { @override public string call(string s, string s2) { return s + "," + s2; } }).tolist().toblocking().single(); system.out.println("test: " + results + " size: " + results.size()); }
and test results in following behavior:
when input is:
integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
then results (as expected) are:
test: [-2,-1] size: 1
and when input is:
integer values[] = new integer[]{0, 1, 2, 3, 4, 5};
then result following stack trace:
java.util.nosuchelementexception: sequence contains no elements @ rx.internal.operators.operatorsingle$1.oncompleted(operatorsingle.java:82) @ rx.internal.operators.notificationlite.accept(notificationlite.java:140) @ rx.internal.operators.takelastqueueproducer.emit(takelastqueueproducer.java:73) @ rx.internal.operators.takelastqueueproducer.startemitting(takelastqueueproducer.java:45) @ rx.internal.operators.operatortakelast$1.oncompleted(operatortakelast.java:59) @ rx.internal.operators.operatorscan$2.oncompleted(operatorscan.java:121) @ rx.internal.operators.operatormap$1.oncompleted(operatormap.java:43) @ rx.internal.operators.operatorfilter$1.oncompleted(operatorfilter.java:42) @ rx.internal.operators.onsubscribefromiterable$iterableproducer.request(onsubscribefromiterable.java:79) @ rx.internal.operators.operatorscan$2$1.request(operatorscan.java:147) @ rx.subscriber.setproducer(subscriber.java:139) @ rx.internal.operators.operatorscan$2.setproducer(operatorscan.java:139) @ rx.subscriber.setproducer(subscriber.java:133) @ rx.subscriber.setproducer(subscriber.java:133) @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:47) @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:33) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable.subscribe(observable.java:7284) @ rx.observables.blockingobservable.blockforsingle(blockingobservable.java:441) @ rx.observables.blockingobservable.single(blockingobservable.java:340) @ emptytest2.test(emptytest2.java:19) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ org.junit.runners.model.frameworkmethod$1.runreflectivecall(frameworkmethod.java:47) @ org.junit.internal.runners.model.reflectivecallable.run(reflectivecallable.java:12) @ org.junit.runners.model.frameworkmethod.invokeexplosively(frameworkmethod.java:44) @ org.junit.internal.runners.statements.invokemethod.evaluate(invokemethod.java:17) @ org.junit.runners.parentrunner.runleaf(parentrunner.java:271) @ org.junit.runners.blockjunit4classrunner.runchild(blockjunit4classrunner.java:70) @ org.junit.runners.blockjunit4classrunner.runchild(blockjunit4classrunner.java:50) @ org.junit.runners.parentrunner$3.run(parentrunner.java:238) @ org.junit.runners.parentrunner$1.schedule(parentrunner.java:63) @ org.junit.runners.parentrunner.runchildren(parentrunner.java:236) @ org.junit.runners.parentrunner.access$000(parentrunner.java:53) @ org.junit.runners.parentrunner$2.evaluate(parentrunner.java:229) @ org.junit.runners.parentrunner.run(parentrunner.java:309) @ org.junit.runner.junitcore.run(junitcore.java:160) @ com.intellij.junit4.junit4ideatestrunner.startrunnerwithargs(junit4ideatestrunner.java:74) @ com.intellij.rt.execution.junit.junitstarter.preparestreamsandstart(junitstarter.java:211) @ com.intellij.rt.execution.junit.junitstarter.main(junitstarter.java:67) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ com.intellij.rt.execution.application.appmain.main(appmain.java:134)
so appears problem use of reduce function. see following test case handles both situations:
@test public void testnoreduce() { integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; list<string> results = observable.from(values).filter(new func1<integer, boolean>() { @override public boolean call(integer integer) { if (integer < 0) return true; else return false; } }).map(new func1<integer, string>() { @override public string call(integer integer) { return string.valueof(integer); } }).tolist().toblocking().first(); iterator<string> itr = results.iterator(); stringbuilder b = new stringbuilder(); while (itr.hasnext()) { b.append(itr.next()); if (itr.hasnext()) b.append(","); } system.out.println("test noreduce: " + b); }
with following input:
integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
i following results expected:
test noreduce: -2,-1
and following input:
integer values[] = new integer[]{0, 1, 2, 3, 4, 5};
i following output expected:
test noreduce:
so, unless misunderstanding something, way handle 0 element observable results filter when followed map , reduce implement reduce logic outside of observable chain. agree statement?
final solution
here final solution after implementing both tomáš dvořák , david motten suggested. think solution reasonable.
@test public void testwithtolist() { integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; observable.from(values).filter(new func1<integer, boolean>() { @override public boolean call(integer integer) { if (integer < 0) return true; else return false; } }).tolist().map(new func1<list<integer>, string>() { @override public string call(list<integer> integers) { iterator<integer> intitr = integers.iterator(); stringbuilder b = new stringbuilder(); while (intitr.hasnext()) { b.append(intitr.next()); if (intitr.hasnext()) { b.append(","); } } return b.tostring(); } }).subscribe(new action1<string>() { @override public void call(string s) { system.out.println("with tolist: " + s); } }); }
here how test behaves when given following inputs.
when given stream have values pass through filters:
integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5};
the result is:
with tolist: -2,-1
when given stream not have values pass through filters:
integer values[] = new integer[]{0, 1, 2, 3, 4, 5};
the result is:
with tolist: <empty string>
now after update error quite obvious. reduce
in rxjava fail illegalargumentexception
if observable reducing empty, per specification (http://reactivex.io/documentation/operators/reduce.html).
in functional programming, there 2 generic operators aggregate collection single value, fold
, reduce
. in accepted terminology, fold
takes initial accumulator value, , function takes running accumulator , value collection , produces accumulator value. example in pseudocode:
[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)
will start 0, , add 1, 2, 3, 4 running accumulator, yielding 10, sum of values.
reduce similar, doesn't take initial accumulator value explicitly, uses first value initial accumulator, , accumulates remaining values. makes sense if e.g. looking minimum or maximum value.
[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))
looking @ fold , reduce different way, use fold
whenever aggregated value make sense on empty collection (like, in sum
, 0 makes sense), , reduce
otherwise (minimum
makes no sense on empty collection, , reduce
fail operate on such collection, in case throwing exception).
you doing similar aggregation, interspersing collection of strings comma produce single string. little bit more difficult situation. makes sense on empty collection (you expect empty string), on other hand, if start empty accumulator, have 1 more comma in result expect. correct solution check, whether collection empty first, , either return fall string empty collection, or reduce
on non-empty collection. observe, don't want empty string in empty collection case, "collection empty" might more appropriate, further assuring solution clean one.
btw, i'm using word collection here instead of observable freely, educational purposes. also, in rxjava, both fold
, reduce
called same, reduce
, there 2 versions of method, 1 taking 1 parameter, other 2 parameters.
as final question: don't have leave observable chain. use tolist(), david motten suggests.
.filter(...) .tolist() .map(listofvalues => listofvalues.intersperse(", "))
where intersperse
implemented in terms of reduce
, if not library function (it quite common).
collection.intersperse(separator) = if (collection.isempty()) "" else collection.reduce(accumulator, element => accumulator + separator + element)
Comments
Post a Comment