Reactive Extensions (Rx) – Do something after all data has been received


As an Rx user, there are scenarios where you have subscribed to multiple observables, and you want to be notified when all the observables have completed.

A common approach to this problem (albeit a messy one), that I have seen used on my current project far too many times, is to set a boolean flag and to have your ‘final’ method invoked by the OnCompleted of every Observable, which in turn would check this boolean flag.

I consider this very messy, especially since Rx can actually tell you when all the observables have completed.

This can be done as follows:

var observablesObserver = new Subject();
observablesObserver = Observable.Merge( new [] { ObservableMethodA(), ObservableMethodB(), ObservableMethodC() } );

observablesObserver.Subscribe(
    unit => _log.Info("One of the observables returned data"),
    ex => _log.Error("Oops, we blew up",ex),
    () => _log.Info("All observables have now completed")
); 

The implementation of ObservableMethodX() is essentially a simple wrapper, that returns a Unit (which is Rx lingo for void).

private IObservable ObservableMethodA(){
    var observable = new Subject();

    _someService.SomeObservableMethodA().Subscribe(
        result => {
            SomePropertyA = result;
            observable.OnNext(new Unit());
        },
        ex => {
            _log.Error("Oops... I blew up!",ex);
            observable.OnError(ex);
        },
        () => observable.OnCompleted();
    );

    return observable;
 }

Now, when using Observable.Merge, all subscriptions happen at the same time, and there is no guaranteed order of how they return their results… all we know is that OnCompletedis only called when the merge is completed (in this case we are ‘merging’ void, so not much really).

So, what if you have a scenario where ObservableMethodC() depends on data from ObservableMethodB()?

One way of doing this is to actually invoke ObservableMethodC() in the OnCompleted phase of ObservableMethodB()‘s subscription, yet this gets messy since we would need to also pass in our subject so that we can inform when we have gotten ‘all’ the data.

A neater way of doing this is to use Observable.Concat() instead, which is exactly like Observable.Merge(), except for one main difference: Subscription to C will only happen once B has completed.So we can mix and match according to need, ex:

observablesObserver = Observable.Merge(
    new [] { ObservableMethodA(), Observable.Concat(
        new [] { ObservableMethodB(), ObservableMethodC() } ) } );

You could of course Concat ALL the observables, but I wouldn’t suggest this since Merge is more performant than Concat, simply because it invokes all calls simultaneously.

Advertisements

2 thoughts on “Reactive Extensions (Rx) – Do something after all data has been received

  1. Isn’t Subject an abstract class? VS tells me this, so I can’t run var observable = new Subject();

anything to say?

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s