java - Properly handling empty Observable in RxJava -


i have situation creating observable containing results database. applying series of filters them. have subscriber logging results. may case no elements make way though filters. business logic states not error. however, when happens onerror called , contains following exception: java.util.nosuchelementexception: sequence contains no elements

is accepted practice detect type of exception , ignore it? or there better way handle this?

the version 1.0.0.

here simple test case exposes i'm seeing. appears related having events filtered before reaching map , reduce.

  @test public void test() {      integer values[] = new integer[]{1, 2, 3, 4, 5};      observable.from(values).filter(new func1<integer, boolean>()     {         @override         public boolean call(integer integer)         {             if (integer < 0)                 return true;             else                 return false;         }     }).map(new func1<integer, string>()     {         @override         public string call(integer integer)         {             return string.valueof(integer);         }     }).reduce(new func2<string, string, string>()     {         @override         public string call(string s, string s2)         {             return s + "," + s2;         }     })              .subscribe(new action1<string>()             {                 @override                 public void call(string s)                 {                     system.out.println(s);                 }             }); } 

because using safe subscriber, throws onerrornotimplementedexception wraps following exception:

java.util.nosuchelementexception: sequence contains no elements     @ rx.internal.operators.operatorsingle$1.oncompleted(operatorsingle.java:82)     @ rx.internal.operators.notificationlite.accept(notificationlite.java:140)     @ rx.internal.operators.takelastqueueproducer.emit(takelastqueueproducer.java:73)     @ rx.internal.operators.takelastqueueproducer.startemitting(takelastqueueproducer.java:45)     @ rx.internal.operators.operatortakelast$1.oncompleted(operatortakelast.java:59)     @ rx.internal.operators.operatorscan$2.oncompleted(operatorscan.java:121)     @ rx.internal.operators.operatormap$1.oncompleted(operatormap.java:43)     @ rx.internal.operators.operatorfilter$1.oncompleted(operatorfilter.java:42)     @ rx.internal.operators.onsubscribefromiterable$iterableproducer.request(onsubscribefromiterable.java:79)     @ rx.internal.operators.operatorscan$2$1.request(operatorscan.java:147)     @ rx.subscriber.setproducer(subscriber.java:139)     @ rx.internal.operators.operatorscan$2.setproducer(operatorscan.java:139)     @ rx.subscriber.setproducer(subscriber.java:133)     @ rx.subscriber.setproducer(subscriber.java:133)     @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:47)     @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:33)     @ rx.observable$1.call(observable.java:144)     @ rx.observable$1.call(observable.java:136)     @ rx.observable$1.call(observable.java:144)     @ rx.observable$1.call(observable.java:136)     @ rx.observable$1.call(observable.java:144)     @ rx.observable$1.call(observable.java:136)     @ rx.observable$1.call(observable.java:144)     @ rx.observable$1.call(observable.java:136)     @ rx.observable$1.call(observable.java:144)     @ rx.observable$1.call(observable.java:136)     @ rx.observable.subscribe(observable.java:7284) 

based on answer @davem below, created new test case:

@test public void testfromblockingandsingle() {      integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5};      list<string> results = observable.from(values).filter(new func1<integer, boolean>()     {         @override         public boolean call(integer integer)         {             if (integer < 0)                 return true;             else                 return false;         }     }).map(new func1<integer, string>()     {         @override         public string call(integer integer)         {             return string.valueof(integer);         }     }).reduce(new func2<string, string, string>()     {         @override         public string call(string s, string s2)         {             return s + "," + s2;         }     }).tolist().toblocking().single();      system.out.println("test: " + results + " size: " + results.size());  } 

and test results in following behavior:

when input is:

integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

then results (as expected) are:

test: [-2,-1] size: 1 

and when input is:

integer values[] = new integer[]{0, 1, 2, 3, 4, 5}; 

then result following stack trace:

java.util.nosuchelementexception: sequence contains no elements @ rx.internal.operators.operatorsingle$1.oncompleted(operatorsingle.java:82) @ rx.internal.operators.notificationlite.accept(notificationlite.java:140) @ rx.internal.operators.takelastqueueproducer.emit(takelastqueueproducer.java:73) @ rx.internal.operators.takelastqueueproducer.startemitting(takelastqueueproducer.java:45) @ rx.internal.operators.operatortakelast$1.oncompleted(operatortakelast.java:59) @ rx.internal.operators.operatorscan$2.oncompleted(operatorscan.java:121) @ rx.internal.operators.operatormap$1.oncompleted(operatormap.java:43) @ rx.internal.operators.operatorfilter$1.oncompleted(operatorfilter.java:42) @ rx.internal.operators.onsubscribefromiterable$iterableproducer.request(onsubscribefromiterable.java:79) @ rx.internal.operators.operatorscan$2$1.request(operatorscan.java:147) @ rx.subscriber.setproducer(subscriber.java:139) @ rx.internal.operators.operatorscan$2.setproducer(operatorscan.java:139) @ rx.subscriber.setproducer(subscriber.java:133) @ rx.subscriber.setproducer(subscriber.java:133) @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:47) @ rx.internal.operators.onsubscribefromiterable.call(onsubscribefromiterable.java:33) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable$1.call(observable.java:144) @ rx.observable$1.call(observable.java:136) @ rx.observable.subscribe(observable.java:7284) @ rx.observables.blockingobservable.blockforsingle(blockingobservable.java:441) @ rx.observables.blockingobservable.single(blockingobservable.java:340) @ emptytest2.test(emptytest2.java:19) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ org.junit.runners.model.frameworkmethod$1.runreflectivecall(frameworkmethod.java:47) @ org.junit.internal.runners.model.reflectivecallable.run(reflectivecallable.java:12) @ org.junit.runners.model.frameworkmethod.invokeexplosively(frameworkmethod.java:44) @ org.junit.internal.runners.statements.invokemethod.evaluate(invokemethod.java:17) @ org.junit.runners.parentrunner.runleaf(parentrunner.java:271) @ org.junit.runners.blockjunit4classrunner.runchild(blockjunit4classrunner.java:70) @ org.junit.runners.blockjunit4classrunner.runchild(blockjunit4classrunner.java:50) @ org.junit.runners.parentrunner$3.run(parentrunner.java:238) @ org.junit.runners.parentrunner$1.schedule(parentrunner.java:63) @ org.junit.runners.parentrunner.runchildren(parentrunner.java:236) @ org.junit.runners.parentrunner.access$000(parentrunner.java:53) @ org.junit.runners.parentrunner$2.evaluate(parentrunner.java:229) @ org.junit.runners.parentrunner.run(parentrunner.java:309) @ org.junit.runner.junitcore.run(junitcore.java:160) @ com.intellij.junit4.junit4ideatestrunner.startrunnerwithargs(junit4ideatestrunner.java:74) @ com.intellij.rt.execution.junit.junitstarter.preparestreamsandstart(junitstarter.java:211) @ com.intellij.rt.execution.junit.junitstarter.main(junitstarter.java:67) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ com.intellij.rt.execution.application.appmain.main(appmain.java:134) 

so appears problem use of reduce function. see following test case handles both situations:

@test public void testnoreduce() {      integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5};      list<string> results = observable.from(values).filter(new func1<integer, boolean>()     {         @override         public boolean call(integer integer)         {             if (integer < 0)                 return true;             else                 return false;         }     }).map(new func1<integer, string>()     {         @override         public string call(integer integer)         {             return string.valueof(integer);         }     }).tolist().toblocking().first();      iterator<string> itr = results.iterator();     stringbuilder b = new stringbuilder();      while (itr.hasnext())     {         b.append(itr.next());          if (itr.hasnext())             b.append(",");     }      system.out.println("test noreduce: " + b);  } 

with following input:

integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

i following results expected:

test noreduce: -2,-1 

and following input:

integer values[] = new integer[]{0, 1, 2, 3, 4, 5}; 

i following output expected:

test noreduce:  

so, unless misunderstanding something, way handle 0 element observable results filter when followed map , reduce implement reduce logic outside of observable chain. agree statement?


final solution

here final solution after implementing both tomáš dvořák , david motten suggested. think solution reasonable.

@test public void testwithtolist() {      integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5};       observable.from(values).filter(new func1<integer, boolean>()      {          @override          public boolean call(integer integer)          {              if (integer < 0)                  return true;              else                  return false;          }      }).tolist().map(new func1<list<integer>, string>()      {          @override          public string call(list<integer> integers)          {              iterator<integer> intitr = integers.iterator();              stringbuilder b = new stringbuilder();               while (intitr.hasnext())              {                  b.append(intitr.next());                   if (intitr.hasnext())                  {                      b.append(",");                  }              }               return b.tostring();          }      }).subscribe(new action1<string>()      {          @override          public void call(string s)          {              system.out.println("with tolist: " + s);          }      });  } 

here how test behaves when given following inputs.

when given stream have values pass through filters:

integer values[] = new integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

the result is:

with tolist: -2,-1 

when given stream not have values pass through filters:

integer values[] = new integer[]{0, 1, 2, 3, 4, 5}; 

the result is:

with tolist: <empty string> 

now after update error quite obvious. reduce in rxjava fail illegalargumentexception if observable reducing empty, per specification (http://reactivex.io/documentation/operators/reduce.html).

in functional programming, there 2 generic operators aggregate collection single value, fold , reduce. in accepted terminology, fold takes initial accumulator value, , function takes running accumulator , value collection , produces accumulator value. example in pseudocode:

[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)

will start 0, , add 1, 2, 3, 4 running accumulator, yielding 10, sum of values.

reduce similar, doesn't take initial accumulator value explicitly, uses first value initial accumulator, , accumulates remaining values. makes sense if e.g. looking minimum or maximum value.

[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))

looking @ fold , reduce different way, use foldwhenever aggregated value make sense on empty collection (like, in sum, 0 makes sense), , reduce otherwise (minimum makes no sense on empty collection, , reduce fail operate on such collection, in case throwing exception).

you doing similar aggregation, interspersing collection of strings comma produce single string. little bit more difficult situation. makes sense on empty collection (you expect empty string), on other hand, if start empty accumulator, have 1 more comma in result expect. correct solution check, whether collection empty first, , either return fall string empty collection, or reduce on non-empty collection. observe, don't want empty string in empty collection case, "collection empty" might more appropriate, further assuring solution clean one.

btw, i'm using word collection here instead of observable freely, educational purposes. also, in rxjava, both fold , reduce called same, reduce, there 2 versions of method, 1 taking 1 parameter, other 2 parameters.

as final question: don't have leave observable chain. use tolist(), david motten suggests.

.filter(...) .tolist() .map(listofvalues => listofvalues.intersperse(", ")) 

where intersperse implemented in terms of reduce, if not library function (it quite common).

collection.intersperse(separator) =      if (collection.isempty())        ""     else       collection.reduce(accumulator, element => accumulator + separator + element) 

Comments

Popular posts from this blog

node.js - Mongoose: Cast to ObjectId failed for value on newly created object after setting the value -

[C++][SFML 2.2] Strange Performance Issues - Moving Mouse Lowers CPU Usage -

ios - Possible to get UIButton sizeThatFits to work? -