> Yes, obviously blocking a thread-pool thread isn't a great idea. This is why they would be able to specify a timeout period.
> Sure, when they try to enqueue & and exception is raised, they can implement whatever protocol they want.
I see now, the TimeSpan is the throttle.
> [snip] The length of the queue seems like a pretty good indicator in the health / performance of a system like this.
Actually, isn't it the unknown duration before a TimeoutException
occurs that is the indicator of the health of the system? The TimeSpan
and the semaphore count (or maximum queue count) are constants, which means that they aren't scalable reactively.
That's why the Consume extension is independent of the storage mechanism and scheduling, which may or may not use a queue. Theoretically, if the application determines reactively that it can add more consumers, it just calls
Subscribe again; otherwise, to remove a consumer it can just dispose of a subscription. To add producers it can simply
Merge them into the source observable.
What we're discussing here seems to actually be a concrete implementation over the core algorithm that
The idea behind Semaphore is similar in that it's still independent of the storage mechanism and scheduling, which may or may not use a queue. Although to meet the needs of this particular implementation of the pattern, it's gated
using a maximum count and a timeout threshold for throttling. It should work with any scheduler that introduces concurrency.
>> the observable must also use the Synchronize operator after calling
> Hmm... perhaps. It seems a little odd to provide an Rx operator that doesn't always obey the Rx grammar?
Well technically it obeys the Rx grammar, just not the serializability guarantee, depending upon the scheduler. ;)
I agree that it seems odd. Note that we already have another (documented) operator in Rxx with concurrent behavior:
But I don't see how the proposed ObserveOn operator would be any different though. When a notification arrives it goes into a queue that is maintained by the operator itself, or the notification thread is blocked when the queue
is full. If the EventLoopScheduler is supplied then now you have duplication - two queues! So what is the semantics there? I.e., how does this implementation work for "any" scheduler?
Furthermore, if a ThreadPool scheduler is supplied, then isn't the intended behavior to have N thread-pool threads consuming concurrently by calling
OnNext concurrently? How is that any different than Semaphore's behavior? That would also break the serializability guarantee. Or is the intention that it should actually respect serializability and only
act as a single-threaded consumer instead? But then what's the point of the bounded queue? It would seem that a single serial consumer is actually a bounded queue of size 1, regardless of the maximum threshold of the queue. If
a notification arrives while another notification is being processed, then adding it to the queue doesn't actually make the algorithm any more productive if there's only one consumer, right?