This project has moved and is read-only. For the latest updates, please go here.

PLINQ -> IObservable adaption

Topics: Feature Requests and Contributions, Reactive (Rx)
Dec 2, 2011 at 11:31 AM


See end of this post;

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/439a8ff6-7f39-4fda-b759-0c013f5e2285

To me it seems logical that you should be able to go from PLINQ -> IObservable (and maybe from IObservable -> PLINQ).

Thoughts?

Dec 2, 2011 at 11:34 AM
Edited Dec 2, 2011 at 11:35 AM

Or can you already (am I missing something?)

Note: You can actually call ToObservable<T>(this IEnumerable<T> source) on a PLINQ query, however it doesn't work correctly because it uses foreach; meaning the underlying query has to complete before it will yield any results.

Dec 2, 2011 at 1:24 PM
Edited Dec 2, 2011 at 1:32 PM

You just have to tell it to not buffer the output using a merge option. See my response here:

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/44cb599c-5e28-4ae1-af53-f93fe4af167d

Doc:
http://msdn.microsoft.com/en-us/library/dd997424.aspx

Perhaps the recommended general query pattern for PLINQ->Rx should be as follows, but please correct me if I’m wrong:

var query = enumerable
	.AsParallel()
	[.AsOrdered()]
	.WithMergeOptions(ParallelMergeOptions.NotBuffered)
	[1…N PLINQ query operators]
	.ToObservable([Scheduler])
Dec 2, 2011 at 1:30 PM
Edited Dec 2, 2011 at 1:33 PM

Note that according to this blog post, disposing the enumerator cancels the background work being done by PLINQ. This means that an explicit cancellation token isn’t required either. Simply disposing of the Rx subscription should cause ToObservable to dispose of the enumerator, thus implicitly cancelling the PLINQ work.

http://blogs.msdn.com/b/pfxteam/archive/2009/04/29/9576291.aspx

Dec 3, 2011 at 9:21 AM

Yeah OK; I was missing something! Works.