This project has moved. For the latest updates, please go here.

PLINQ -> IObservable adaption

Topics: Feature Requests and Contributions, Reactive (Rx)
Coordinator
Dec 2, 2011 at 10:31 AM


See end of this post;

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/439a8ff6-7f39-4fda-b759-0c013f5e2285

To me it seems logical that you should be able to go from PLINQ -> IObservable (and maybe from IObservable -> PLINQ).

Thoughts?

Coordinator
Dec 2, 2011 at 10:34 AM
Edited Dec 2, 2011 at 10:35 AM

Or can you already (am I missing something?)

Note: You can actually call ToObservable<T>(this IEnumerable<T> source) on a PLINQ query, however it doesn't work correctly because it uses foreach; meaning the underlying query has to complete before it will yield any results.

Coordinator
Dec 2, 2011 at 12:24 PM
Edited Dec 2, 2011 at 12:32 PM

You just have to tell it to not buffer the output using a merge option. See my response here:

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/44cb599c-5e28-4ae1-af53-f93fe4af167d

Doc:
http://msdn.microsoft.com/en-us/library/dd997424.aspx

Perhaps the recommended general query pattern for PLINQ->Rx should be as follows, but please correct me if I’m wrong:

var query = enumerable
	.AsParallel()
	[.AsOrdered()]
	.WithMergeOptions(ParallelMergeOptions.NotBuffered)
	[1…N PLINQ query operators]
	.ToObservable([Scheduler])
Coordinator
Dec 2, 2011 at 12:30 PM
Edited Dec 2, 2011 at 12:33 PM

Note that according to this blog post, disposing the enumerator cancels the background work being done by PLINQ. This means that an explicit cancellation token isn’t required either. Simply disposing of the Rx subscription should cause ToObservable to dispose of the enumerator, thus implicitly cancelling the PLINQ work.

http://blogs.msdn.com/b/pfxteam/archive/2009/04/29/9576291.aspx

Coordinator
Dec 3, 2011 at 8:21 AM

Yeah OK; I was missing something! Works.