This project has moved. For the latest updates, please go here.

Usage of Either<TLeft, TRight>

Topics: General, Reactive (Rx)
Oct 30, 2012 at 4:12 PM
Edited Oct 30, 2012 at 4:13 PM

I'm using Rxx for the Retry extension - but I don't get the purpose or intended usage of the resulting Either type:

 

myStream
  .Retry(5, (ex, i) => TimeSpan.FromSeconds(.5 * Math.Pow(2, i - 1)))
  .Replay()
  .RefCount();

 

After switching the regular Rx Retry operator out for the Rxx version above, the query doesn't work as intended because the Rxx Retry is returning IObservable<Either<int, Exception>>  instead of IObservable<int>.

What's the intention of Either in this case, and how should I be using it?  Is it just a vague relative of Materialize?

Thanks.

Coordinator
Oct 30, 2012 at 5:16 PM

Hi,

Either<TLeft, TRight> is a first-class citizen in Rxx.  Sorry that I haven't gotten around to writing the conceptual documentation for it yet.  I'll make sure it's on my list of priorities.

Conceptually, Either<TLeft, TRight> represents an individual notification that either has data on the left or data on the right.  It's kind of like the "OR" to Tuple<TFirst, TSecond>'s "AND".

Rxx defines operators specifically to make working with IObservable<Either<TLeft, TRight>> sequences easier.  There are operators for creating and converting these kinds of sequences.  Here's the relevant source code in Rxx 2.0 as of 10/30/2012:


The Retry operator uses Either<TLeft, TRight> to create two serialized notification channels.  The left channel is the normal output and the right channel contains the exceptions that cause a retry to occur, up to the specified count.  Exceptions caught be the Retry operator are also data, since they do not terminate the sequence.  The native Retry operator in Rx simply throws away this data, but Rxx provides it to observers through an alternate channel.  Basically, it gives observers the opportunity to handle retried exceptions separately from the normal output of the sequence; e.g., for logging purposes.

If you only want the normal output, like the native Retry operator in Rx, then simply use the TakeLeft operator.  For example:

myStream
  .Retry(5, (ex, i) => TimeSpan.FromSeconds(.5 * Math.Pow(2, i - 1)))
  .TakeLeft()
  .Replay()
  .RefCount();

However, if you have defined a logging routine that accepts a sequence of exceptions, then you could take advantage of Rxx's implementation of Retry to perform out-of-band logging as follows:

myStream
  .Retry(5, (ex, i) => TimeSpan.FromSeconds(.5 * Math.Pow(2, i - 1)))
  .Publish(p => p.TakeLeft().Merge(p.TakeRight().Log<int>("myStream.Retry")))
  .Replay()
  .RefCount();

...

internal static IObservable<T> Log<T>(this IObservable<Exception> exceptions, string description = null)
{
  return exceptions
    .Do(
      ex => MyLogger.TraceError(ex, "(OnNext) " + description), 
      ex => MyLogger.TraceError(ex, "(OnError) " + description))
    .Select(_ => default(T))
    .IgnoreElements();
}

- Dave

Oct 30, 2012 at 5:31 PM

Very helpful Dave - you should cut and paste this straight into your docs!

Coordinator
Oct 30, 2012 at 5:45 PM
Edited Oct 30, 2012 at 5:46 PM

Hi,

I've created the wiki page and added a link to this discussion.  I'll add details when I get more time.

Thanks for asking your question.

- Dave

Nov 26, 2012 at 3:51 PM

Hi,

I've been looking at this and really like the pattern. Are there any more docs available?

Failing that, whats the best practise for seperating out the error messages from an observable into an either?

e.g. I have a method like this (that happens to use an Either<Connection, Exception> as an input but isn't directly part of the question)

        public static IObservable<IMapMessage> When(IObservable<Either<ConnectionException>> whenConnection, string topic)
        {
            // Throw away errors at individual subscription level as we don't want this stream inner errors 
            return whenConnection
                .TakeLeft()
                .Select(c => ObservableSubscription.WhenTopic(c, topic).Catch(Observable.Never<IMapMessage>()))
                .Switch()
                .Multicast(() => new ReplaySubject<IMapMessage>(1))
                .RefCount();
        }

As you can see I'm suppressing the error on the inner ObservableSubscription with a catch, but what I really want to do is push that out on an error channel and return Either<IMapMessage, Exception>.

Thanks,

Dan

Nov 26, 2012 at 4:46 PM

ok had a crack at this and it seems ok. Is this the right way to do it though?

 

        public static IObservable<Either<IMapMessageException>> When(IObservable<Either<ConnectionException>> whenConnection, string topic)
        {
            return Observable2.CreateEither<IMapMessageException>(obs =>
            {
                 return whenConnection
                    .TakeLeft()
                    .Select(c => ObservableSubscription.WhenTopic(c, topic).Do(_ => { }, obs.OnNextRight).Catch(Observable.Never<IMapMessage>()))
                    .Switch()
                    .Multicast(() => new ReplaySubject<IMapMessage>(1))
                    .RefCount()
                    .Subscribe(obs.OnNextLeft, obs.OnNextRight, obs.OnCompleted);
            });
        }

 

Basically using a .Do to intercept errors and put them on the exception channel before suppressing them.

I now need to work out how to do this whilst combining in the error channel from the whenConnection with a distinctuntil changed so that it doesn't double notify the same error, but also shows the retry exceptions that happen if a conneciton is down for a while.

Coordinator
Nov 26, 2012 at 5:04 PM

Hi Dan,

I haven't updated the docs yet, sorry.

> [snip] but what I really want to do is push that out on an error channel and return Either<IMapMessage, Exception>.

You can use Observable2.CreateEither.  You must also add using System.Reactive; to your file to get the OnNextLeft and OnNextRight extensions.

return whenConnection
	.TakeLeft()
	.Select(c => Observable2.CreateEither<IMapMessage, Exception>(
		observer => ObservableSubscription.WhenTopic(c, topic).Subscribe(
			observer.OnNextLeft,
			observer.OnNextRight,
			observer.OnCompleted)))
	.Switch()
	.Replay(1)
	.RefCount();

- Dave

Nov 26, 2012 at 5:24 PM

Ok, have now arrived at this:

 

        public static IObservable<Either<IMapMessageException>> When(IObservable<Either<ConnectionException>> whenConnection, string topic)
        {
            return Observable2.CreateEither<IMapMessageException>(obs =>
            {
                var right = new Subject<Exception>();
                var mergedRightDisp = whenConnection.TakeRight().Merge(right).DistinctUntilChanged().Subscribe(obs.OnNextRight);
 
                var left = whenConnection
                   .SelectLeft(l => ObservableSubscription.WhenTopic(l, topic).Do(_ => { }, right.OnNext).Catch(Observable.Never<IMapMessage>()))
                   .Switch()
                   .Multicast(() => new ReplaySubject<IMapMessage>(1))
                   .RefCount()
                   .Subscribe(obs.OnNextLeft, obs.OnNextRight, obs.OnCompleted);
 
                return new CompositeDisposable(right, mergedRightDisp, left);
            });
        }

I think the 'do' code should probably be in the 'Catch' as potentially more efficient, otherwise I have that nop OnNext being invoked on every message.

Is this approx right? I know my variable naming convention sucks there but in hurry!
Coordinator
Nov 26, 2012 at 5:49 PM
Edited Nov 26, 2012 at 5:57 PM

Hi Dan,

Looks like you slipped in a reply before mine :)

Your solution may work, but it's a bit verbose.

Try the following:

(EDIT: I corrected this example in my next reply.)

return whenConnection
	.Select(
		connection => Observable2.CreateEither<IMapMessage, Exception>(
			observer => ObservableSubscription.WhenTopic(connection, topic).Subscribe(
				observer.OnNextLeft,
				observer.OnNextRight,
				observer.OnCompleted)),
		ex => Observable.Return(Either.Right<IMapMessage, Exception>(ex)))
	.Replay(
		either => either.TakeLeft().Merge(either.TakeRight().DistinctUntilChanged()),
		bufferSize: 1)
	.Switch();

- Dave

Coordinator
Nov 26, 2012 at 5:57 PM
Edited Nov 26, 2012 at 6:33 PM

Hi Dan,

Here's a correction to my previous example, since I had placed the Replay and DistinctUntilChanged operators in the wrong place.

(EDIT: I changed the Replay(selector) to .Publish(selector).Replay(1).RefCount())

return whenConnection
	.Select(
		connection => Observable2.CreateEither<IMapMessage, Exception>(
			observer => ObservableSubscription.WhenTopic(connection, topic).Subscribe(
				observer.OnNextLeft,
				observer.OnNextRight,
				observer.OnCompleted)),
		ex => Observable.Return(Either.Right<IMapMessage, Exception>(ex)))
	.Publish(either => either.TakeLeft().Merge(either.TakeRight()))
	.Switch()
	.Publish(either => either.TakeLeft().Select(Either.Left<IMapMessage, Exception>).Merge(
			either.TakeRight().DistinctUntilChanged().Select(Either.Right<IMapMessage, Exception>)))
	.Replay(1)
	.RefCount();

- Dave

Coordinator
Nov 26, 2012 at 6:03 PM
Edited Nov 26, 2012 at 6:34 PM

Hi Dan,

Even more succinct:

(EDIT: I changed the Replay(selector) to .Publish(selector).Replay(1).RefCount())

return whenConnection
	.Select(either =>
		Observable2.CreateEither<IMapMessage, Exception>(
			observer => either.Switch(
				connection => ObservableSubscription.WhenTopic(connection, topic).Subscribe(
					observer.OnNextLeft,
					observer.OnNextRight,
					observer.OnCompleted),
				ex => Observable.Return(ex).Subscribe(observer.OnNextRight))))
	.Switch()
	.Publish(either => either.Where(e => e.IsLeft).Merge(
			either.TakeRight().DistinctUntilChanged().Select(Either.Right<IMapMessage, Exception>)))
	.Replay(1)
	.RefCount();

- Dave


Coordinator
Nov 26, 2012 at 6:14 PM
This discussion has been copied to a work item. Click here to go to the work item and continue the discussion.
Coordinator
Nov 26, 2012 at 6:18 PM
Edited Nov 26, 2012 at 6:35 PM

Hi,

I just created a work item to override Either.Equals.  If it were overridden, then my example could be simplified even further by applying DistinctUntilChanged to the main sequence, assuming of course that you don't expect any duplicate IMapMessage references:

.Switch()
.DistinctUntilChanged()
.Replay(1)
.RefCount();

Though if you did, then it could still be simplified a bit:

(EDIT: I changed the Replay(selector) to .Publish(selector).Replay(1).RefCount())

.Switch()
.Publish(either => either.Where(e => e.IsLeft).Merge(either.Where(e => !e.IsLeft).DistinctUntilChanged()))
.Replay(1)
.RefCount();

- Dave


Coordinator
Nov 26, 2012 at 6:37 PM

Hi Dan,

I just went back and edited all my posts because I accidentally used the Replay(selector) overload, which doesn't meet your requirements.  Publish(selector) is now used instead and I've appended .Replay(1).RefCount() to the queries again.

- Dave

Nov 27, 2012 at 12:15 PM

Thanks Dave, just trying to grok all this now :)

Nov 27, 2012 at 1:32 PM

Thanks for your help here - I think I grokked it now and it is very helpful in terms of seeing how to use various design patterns in rx!

The deduping of the error channel (but not of the message channel) seems like its made your more elegant solution more verbose than what I originally had. I wonder if this suggests an operator for deduping right and left channels separately?

Its also opened up some interesting questions about whether I really mean to put replay semantics on the error channel. Given that Either is an Or structure I'm not sure thats really what I intended, as its really meant to be last value caching on market data. Need to have a think about that.

One thing I think we lost in the iterations is that I was intentially going via rxx multicast so that the last value was only cached whilst there was an active subscriber, otherwise it goes back to source to re-get an image. This prevents hours stale last values - we all discussed this here : http://social.msdn.microsoft.com/Forums/en-US/rx/thread/33e6bd27-cb0d-43db-8310-73246316ad0d

Having said that I've realised I should be doing my last value a tier above this, as I need to map into pocos. Those MapMessages are actually deltas so meaningless as cached last values!

Thanks again for all your help on this. Super helpful!

Dan

Coordinator
Nov 27, 2012 at 2:12 PM

Hi Dan,

> [snip] I wonder if this suggests an operator for deduping right and left channels separately?

Good point.  The truth is that you can probably find a reason to clone all of the Rx operators to support IObservable<Either<L, R>>.  In order to keep the library slim and manageable, I decided to add the operators that I assumed would be used most frequently: Take and Select.  But I'll consider adding DistinctUntilChangedLeft and DistinctUntilChangedRight operators as well.  Thanks for your feedback.

> One thing I think we lost in the iterations is that I was intentially going via rxx multicast

Haha, yes, I forgot about that overload :)

- Dave

Coordinator
Nov 27, 2012 at 2:14 PM
This discussion has been copied to a work item. Click here to go to the work item and continue the discussion.
Coordinator
Nov 27, 2012 at 2:27 PM

>> One thing I think we lost in the iterations is that I was intentially going via rxx multicast
> Haha, yes, I forgot about that overload :)

Perhaps this is an indication that specialized Replay, Publish and TakeLast operators would be useful.  I guess the only differences would have to be the operator names; e.g., ReplayNew, PublishNew and TakeLastNew.

- Dave

Coordinator
Nov 27, 2012 at 2:29 PM
This discussion has been copied to a work item. Click here to go to the work item and continue the discussion.
Nov 27, 2012 at 6:09 PM
Edited Nov 27, 2012 at 6:24 PM
davedev wrote:

>> One thing I think we lost in the iterations is that I was intentially going via rxx multicast
> Haha, yes, I forgot about that overload :)

Perhaps this is an indication that specialized Replay, Publish and TakeLast operators would be useful.  I guess the only differences would have to be the operator names; e.g., ReplayNew, PublishNew and TakeLastNew.

- Dave

Hard to say if this justifies a new method, or just a way of capturing the pattern in a wiki or something. I take your point about keeping control of the surface area of RXX.

The either stuff is really interesting, but definitely adds some further complexity! Is there a default pattern for enriching the left hand side, but keeping the extra channel passing through? I'm guessing I just do a either.TakeLeft().Select(xxx).Merge(either.TakeRight()) as the base pattern?

edit: actually ended up with this:

 

        public static IObservable<Either<T, Exception>> Map<T>(this IObservable<Either<MessageException>> whenEither, MessageMap<T> map = null)
            where T: ICloneablenew()
        {
            var inst = new T();
 
            return whenEither
                .SelectLeft(m =>
                {
                    if (map != null)
                        Deserialiser.Deserialise(m, inst, map);
                    else
                        Deserialiser.Deserialise(m, inst);
 
                    return Either.Left<T, Exception>((T) inst.Clone());
                })
                .Merge(whenEither.SelectRight(Either.Right<T, Exception>));
        }

Is that typical approach or is there a more efficient way?

 

edit2: just noticed that you tend to use publish in that circumstance. I guess this is a deeper rx thing, but if I don't is it the equiv of the observable being subscribed to twice even though there is only one call to subscribe being explicitly made (somewhere out in the consuming code). i.e. if it was a cold obs it would get started twice?

it should be:

 

   return whenEither
                .Publish(either =>
                    either.SelectLeft(m =>
                    {
                        if (map != null)
                            Deserialiser.Deserialise(m, inst, map);
                        else
                            Deserialiser.Deserialise(m, inst);
 
                        return Either.Left<T, Exception>((T) inst.Clone());
                    })
                    .Merge(either.SelectRight(Either.Right<T, Exception>)));
Coordinator
Nov 27, 2012 at 6:51 PM
Edited Nov 27, 2012 at 6:52 PM

Hi Dan,

> Is there a default pattern for enriching the left hand side, but keeping the extra channel passing through?

Yes, sort of.  There's an overload of Select that takes two selector functions.  For example: 

public static IObservable<Either<T, Exception>> Map<T>(this IObservable<Either<Message, Exception>> whenEither, MessageMap<T> map = null)
	where T: ICloneable, new()
{
	return whenEither.Select(
		m =>
		{
			var inst = new T();

			if (map != null)
				Deserialiser.Deserialise(m, inst, map);
			else
				Deserialiser.Deserialise(m, inst);
 
			return (T) inst.Clone();
		}, 
		ex => ex);
}

(NOTE: In the example above I moved inst into the lambda so that the compiler won't close over a local variable, which would cause it to be shared across multiple subscriptions.  I assume that wasn't the intended behavior.)

> just noticed that you tend to use publish in that circumstance. [snip]  i.e. if it was a cold obs it would get started twice?

Exactly.  When defining new operators, always assume that observable (and enumerable) parameters are cold.

> is it the equiv of the observable being subscribed to twice even though there is only one call to subscribe being explicitly made

Yes.  In your example you're passing two observables to Merge, which means that it will call Subscribe on them.  They are actually two different queries against the same observable source, which means that Merge will create two subscriptions to the same underlying source.  If it's a cold observable, then subscriptions side-effects are going to happen twice; e.g., two web requests for the same data.  Publish converts cold observables into hot observables to share subscription side-effects.  So publishing the source observable before passing the two queries to Merge ensures that Merge will create two subscriptions onto an internal proxy observable (actually a Subject<T>) to form the outer query, which is then connected to the real observable by the Publish operator under the covers.

- Dave

Nov 27, 2012 at 7:14 PM
Edited Nov 27, 2012 at 7:14 PM

Thanks for the explanation all makes excellent sense. Your patience is appreciated!

Re moving the local variable inside, I think it was as intended. Thats  the image created from applying a series of deltas to it as they stream in across the wire. This is then cloned and pushed to subscribers as a theoretical immutable leaving the golden image safe in the closure to be updated as new deltas come in.

Its not meant to be used across multiple calls to map, but it should be stateful for a given closure of a single map call, which is what I think it does in this case. Having said that maybe I've made a mistake with this as the code ain't working right at the moment :)

Coordinator
Nov 27, 2012 at 7:35 PM
Edited Nov 27, 2012 at 7:35 PM

Hi Dan,

Ah, I see.  But my point was that the local is being shared across multiple subscriptions, which it seems wasn't intended.  Consider compromising by wrapping the entire query, including the local variable, in a call to Observable.Defer.  That way, every call to Map and every call to Subscribe get fresh locals, though multiple notifications for a given subscription share the same local.  Defer can also serve as documentation that the closure isn't a mistake.

- Dave

Nov 28, 2012 at 8:09 AM

Dave

Ahah I see your point, I hadn't considered the rammifications of closures intersecting with subscriptions. Thanks a lot for that catch, it may explain some of the problems I'm seeing at the moment. Also thanks for that link to the rx programming best practise you wrote and the MS one as well. Hadn't seen them before and they are very useful.

Dan