Subject doesn't output events when it's reading an ObservableTcpListener

Topics: General
Apr 17, 2013 at 3:04 PM
Edited Apr 17, 2013 at 3:11 PM
Hello all,

I'm trying to pass the output of ObservableTcpListener to a Subject<byte[]>. That subject will then be used in a query.

The problem is that while the ObservableTcpListener receives every socket contents that I send, the query doesn't output anything besides the first two or three events.

This code shows the problem that I'm mentioning. It should be ran in LINQPad as a "C# Program".
void Main()
{
    // Create the subject and the query
    var subject = new Subject<byte[]>();
    var query = subject.Select(reading => reading);
    
    query.Subscribe(Console.WriteLine);
    
    var tcpListenerPool = ObservableTcpListener.Start(IPAddress.Loopback, 20001);
    
    tcpListenerPool.Subscribe(
        (TcpClient tcpClient) =>
        {
            // Debug information just to check if the socket actually has anything in it
            Console.WriteLine("tcpSocket.Avaliable: " + tcpClient.Available);
            
            var receiveContentObservable = tcpClient.Client.ReceiveUntilCompleted(SocketFlags.None);
            
            // Send socket content to the subject
            receiveContentObservable.Subscribe(subject);
        });
        
    Console.WriteLine("waiting");
    Console.Read();
}

// Define other methods and classes here

Note that sometimes LINQPad doesn't correctly close the socket, and gives the error: "Only one usage of each socket address (protocol/network address/port) is normally permitted". If that happens go to the "Query" menu and choose "Cancel All Threads and Reset".

The program that I'm using to send the sockets to the ObservableTcpListener is this python program:
import socket
import random

payload = ["foo", "bar"];

def sendPayload(payload):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("localhost", 20001))
    
    s.send(payload)
    
    s.close()

# Send 10 sockets with either "foo" or "bar"    
map(lambda x: sendPayload(random.choice(payload)), range(10))
Is this a bug, or am I doing something wrong?

Edit: I'm using the "Rxx Class Library for .NET 4.0" available in the Downloads section.
Coordinator
Apr 17, 2013 at 4:25 PM
Hi,

It's probably because the subject is receiving OnCompleted and ignoring subsequent notifications.

In other words, this:
receiveContentObservable.Subscribe(subject);
is essentially the same as this:
receiveContentObservable.Subscribe(subject.OnNext, subject.OnError, subject.OnCompleted);
Try this instead:
receiveContentObservable.Subscribe(subject.OnNext, subject.OnError);
It's a fairly common mistake that I make too :)

- Dave
Coordinator
Apr 17, 2013 at 4:29 PM
Edited Apr 17, 2013 at 4:30 PM
Hi,
Note that sometimes LINQPad doesn't correctly close the socket [snip]
You can avoid orphaned sockets by disposing of the subscription yourself.
using (tcpListenerPool.Subscribe(
    (TcpClient tcpClient) =>
    {
        // ...
    }))
{
    Console.WriteLine("waiting");
    Console.Read();
}
- Dave
Apr 17, 2013 at 5:06 PM
Edited Apr 17, 2013 at 5:09 PM
davedev wrote:

Hello davedev,
Try this instead:
receiveContentObservable.Subscribe(subject.OnNext, subject.OnError);
Yup, that worked. Thank you!
Note that sometimes LINQPad doesn't correctly close the socket [snip]
You can avoid orphaned sockets by disposing of the subscription yourself.
using (tcpListenerPool.Subscribe(
    (TcpClient tcpClient) =>
    {
        // ...
    }))
{
    Console.WriteLine("waiting");
    Console.Read();
}
Weird I thought I had tried that and it had failed. Thanks again.