This project has moved. For the latest updates, please go here.

Bounded Queue - ObserveOn

Topics: Feature Requests and Contributions, General, Reactive (Rx)
Coordinator
Oct 20, 2011 at 4:56 AM

Should we override ObserveOn(IScheduler, MaxQueueLength)? Or could this be done with WindowIntrospective?

Coordinator
Oct 20, 2011 at 4:58 AM

& could/should we leverage DataFlow

Coordinator
Oct 20, 2011 at 6:45 AM

What is the expected behavior?  If the scheduler's queue is already filled, then it drops new notifications?

That sounds a bit like Throttle.  Perhaps a custom BoundedScheduler would be more appropriate though?

Also, should it drop OnCompleted and OnError as well?

A use case may help.

- Dave

Coordinator
Oct 20, 2011 at 6:50 AM

Sorry, I should have been more specific.

I'd expect the thread trying to add the notification to block until the queue has room. Another overload would allow the caller to specify a timeout period, after which an exception would be raised (to the caller of OnNext).

I'm not sure if it is a good idea to add this to Rx (because of the blocking behaviour) but this how bounded queues work in data flow or any ring buffer type senario.

Coordinator
Oct 20, 2011 at 11:42 AM

Interesting, it sounds like a reactive semaphore.  Maybe a new operator called Semaphore, or some variant, would be appropriate?

I think if a scheduler introduces concurrency for observations, then a blocking behavior for the source observable isn't generally a problem in Rx.

Coordinator
Oct 20, 2011 at 11:49 AM
Edited Oct 20, 2011 at 12:12 PM

Is the queue behavior actually required though?  It seems like it would make more sense to leave that up to the scheduler.

For example, a simple implementation of the Semaphore operator could simply call Schedule on the specified scheduler after passing through a real semaphore.  That way it's up to the scheduler whether it wants to actually queue actions or execute them concurrently.

Do you have a use case?  It may help me to better understand your requirements.

Coordinator
Oct 20, 2011 at 12:26 PM
Edited Oct 20, 2011 at 12:29 PM

The Semaphore documentation mentions that a single thread can consume multiple entries, so perhaps it will work even though OnNext is called serially. 

How does something like this look?  (Entirely untested)

public static IObservable<TSource> Semaphore<TSource>(this IObservable<TSource> source, int maximumCount, TimeSpan timeout, IScheduler scheduler)
{
	return Observable.Create<TSource>(
		observer => 
		{
			var gate = new Semaphore(maximumCount, maximumCount);
			var disposables = new CompositeDisposable();

			Action<Action> blockAndSchedule = action =>
			{
				if (!gate.WaitOne(timeout))
				{
					observer.OnError(new TimeoutException());
					return;
				}

				var schedule = new SingleAssignmentDisposable();

				disposables.Add(schedule);

				schedule.Disposable = scheduler.Schedule(() =>
					{
						action();

						gate.Release();

						disposables.Remove(schedule);
					})
			}

			try
			{
				var subscription = source.Subscribe(
					value => blockAndSchedule(() => observer.OnNext(value)), 
					ex => blockAndSchedule(() => observer.OnError(ex)), 
					() => blockAndSchedule(() => observer.OnCompleted()));

				disposables.Add(subscription);
				disposables.Add(gate);

				return disposables;
			}
			catch
			{
				gate.Dispose();

				throw;
			}
		});
}
Coordinator
Oct 20, 2011 at 12:31 PM
Edited Oct 20, 2011 at 12:31 PM

I'd imagine that the above could be called with an EventLoopScheduler to achieve the concurrent queueing behavior.

Coordinator
Oct 20, 2011 at 3:04 PM

The use case is a simple "Slow Consumer / Fast Producer" problem. All systems that use the Producer / Consumer pattern are vulnerable to this.

Let’s say events are coming into the system over the network, these are processed & placed in a queue, where another thread picks them up, serializes them & publishes them over the network.

Producer (socket input -> deserialization -> processing) -> Queue -> Consumer (serialization -> socket output)

It would seem reasonable to put some upper limit on the size of this queue. Unbounded growth leads to extra GC workload, which would lead to more growth in the queue. It is a slippery slope!

Coordinator
Oct 20, 2011 at 3:06 PM

Re your implementation of Semaphore - I don't see where the Queue is ;)

This would need to work with any scheduler (like observer on).

Coordinator
Oct 20, 2011 at 3:11 PM

It is quite common for enterprise services using Producer/Consumer patterns to use a queue with some sort of capacity (required with a ring buffer).

http://www.bluebytesoftware.com/blog/PermaLink,guid,3740daff-a459-4298-bc9b-65d3647f5c0d.aspx

 

Coordinator
Oct 20, 2011 at 8:48 PM

Thanks for the description, although I don't see how blocking a thread-pool thread created by a socket is the correct solution.  It doesn't seem like an efficient use of pooled threads.  Wouldn't it make more sense to implement throttling?  Or if messages cannot be dropped, then implement some custom network protocol that informs the server socket that it must slow down to some maximum threshold; otherwise, messages will be throttled anyway.

Regardless, in Semaphore the queue implementation is left to the scheduler; e.g., EventLoopScheduler.  (As I mentioned already ;)

This pattern couldn't possibly work with just any scheduler because the pattern requires blocking.  It'll only work with schedulers that introduce concurrency, right?  For example, if you were to use Immediate or CurrentThread scheduler, then blocking at all would block indefinitely.

Furthermore, Semaphore should also work with schedulers that don't use an explicit queue, like ThreadPool; however, to ensure that OnNext is not called concurrently (the Rx serializibility guarantee), the observable must also use the Synchronize operator after calling Semaphore.

Have you seen my Consume extension yet?  It's meant to generalize the producer/consumer pattern, so perhaps this should be an overload of Consume instead.  Have to think about it more though.

http://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703

Coordinator
Oct 21, 2011 at 5:25 AM
Edited Oct 21, 2011 at 5:26 AM

Hi Dave,

>Thanks for the description, although I don't see how blocking a thread-pool thread created by a socket is the correct solution. It doesn't seem like an efficient use of pooled threads. Wouldn't it make more sense to implement throttling?

Yes, obviously blocking a thread-pool thread isn't a great idea. This is why they would be able to specify a timeout period (TimeSpan.Zero in this case).

>Or if messages cannot be dropped, then implement some custom network protocol that informs the server socket that it must slow down to some maximum threshold; otherwise, messages will be throttled anyway.

Sure, when they try to enqueue & and exception is raised, they can implement whatever protocol they want. The length of the queue seems like a pretty good indicator in the health / performance of a system like this.

>the observable must also use the Synchronize operator after calling Semaphore.

Hmm... perhaps. It seems a little odd to provide an Rx operator that doesn't always obey the Rx grammar?

>Have you seen my Consume extension yet? It's meant to generalize the producer/consumer pattern, so perhaps this should be an overload of Consume instead. Have to think about it more though.

Yes, this could apply to both Consume & ObserverOn concepts.

Coordinator
Oct 21, 2011 at 6:42 PM

> Yes, obviously blocking a thread-pool thread isn't a great idea.  This is why they would be able to specify a timeout period.
> Sure, when they try to enqueue & and exception is raised, they can implement whatever protocol they want.

I see now, the TimeSpan is the throttle.

>  [snip] The length of the queue seems like a pretty good indicator in the health / performance of a system like this.

Actually, isn't it the unknown duration before a TimeoutException occurs that is the indicator of the health of the system?  The TimeSpan and the semaphore count (or maximum queue count) are constants, which means that they aren't scalable reactively.

That's why the Consume extension is independent of the storage mechanism and scheduling, which may or may not use a queue.  Theoretically, if the application determines reactively that it can add more consumers, it just calls Subscribe again; otherwise, to remove a consumer it can just dispose of a subscription.  To add producers it can simply Merge them into the source observable.

What we're discussing here seems to actually be a concrete implementation over the core algorithm that Consume represents.

The idea behind Semaphore is similar in that it's still independent of the storage mechanism and scheduling, which may or may not use a queue.  Although to meet the needs of this particular implementation of the pattern, it's gated using a maximum count and a timeout threshold for throttling.  It should work with any scheduler that introduces concurrency.

>> the observable must also use the Synchronize operator after calling Semaphore.
> Hmm... perhaps. It seems a little odd to provide an Rx operator that doesn't always obey the Rx grammar?

Well technically it obeys the Rx grammar, just not the serializability guarantee, depending upon the scheduler.  ;)

I agree that it seems odd.  Note that we already have another (documented) operator in Rxx with concurrent behavior: Serve

But I don't see how the proposed ObserveOn operator would be any different though.  When a notification arrives it goes into a queue that is maintained by the operator itself, or the notification thread is blocked when the queue is full.  If the EventLoopScheduler is supplied then now you have duplication - two queues!  So what is the semantics there?  I.e., how does this implementation work for "any" scheduler?

Furthermore, if a ThreadPool scheduler is supplied, then isn't the intended behavior to have N thread-pool threads consuming concurrently by calling OnNext concurrently?  How is that any different than Semaphore's behavior?  That would also break the serializability guarantee.  Or is the intention that it should actually respect serializability and only act as a single-threaded consumer instead?  But then what's the point of the bounded queue?  It would seem that a single serial consumer is actually a bounded queue of size 1, regardless of the maximum threshold of the queue.  If a notification arrives while another notification is being processed, then adding it to the queue doesn't actually make the algorithm any more productive if there's only one consumer, right?

Coordinator
Oct 21, 2011 at 11:55 PM
Edited Oct 21, 2011 at 11:57 PM

I'm seeing problems trying to implement this pattern concretely in a single combinator.  For example, some implementations may need to adjust the limits reactively, so requiring constant maximum and duration values feels a bit too blunt.

More importantly, though, you probably want to use Subscribe on each individual consumer instead of merging them into one sequence to ensure concurrency between the consumers. I.e., Semaphore is dangerous, as would be the proposed ObserveOn overload, when used with a scheduler that breaks the Rx serializibility gaurantee.

Alternatively, we could offer a Subscribe-like method that encapsulates the following code; however, I'm not sure that it makes sense to add a concrete implementation such as this to Rxx.  There are probably lots of different ways that the Consume extension could be used "correctly", depending upon the particular requirements of an application.

var queue = new BlockingCollection<int>(maxQueueSize);

var source = GetSource();

var producer = Observable.Create<int>(
	observer =>
	{
		var cancel = new CancellationDisposable();

		var subscription = source
			.Do(value =>
				{
					if (!queue.TryAdd(value, timeout, cancel.Token))
					{
						throw new TimeoutException();
					}
				})
			.Subscribe(observer);

		return new CompositeDisposable(subscription, cancel);
	});

var producers = Observable
	.Range(0, producerCount)
	.Select(_ => producer)
	.Merge()
	.Publish();

var consumer = producers.Consume(_ =>
	{
		int value;
		return queue.TryTake(out value)
			? System.Maybe.Return(value)
			: System.Maybe.Empty<int>();
	},
	Scheduler.ThreadPool);

for (int i = 0; i < consumerCount; i++)
{
	consumer.Subscribe(DoSomethingAsync);
}

producers.Connect();