F# and Named-Pipes

I am currently working on a project where I have an existing .NET application written in C# that is running on top of a Unity3D engine.

As part of the requirement, I have to develop a WPF host environment that in addition to hosting the Unity3D process is also able to send and receive commands.

The WPF application must also print the results in a report. The user will fill out a WPF form, which will feed the properties of a command. Once completed the user will send the command triggering an event.

In order to do this, I need a way to communicate between two different processes that could belong to different AppDomains.

2

 

 

I used the class NamedPipeClientStream and NamedPipeServerStream that support both synchronous and asynchronous read and write operations. Named pipes are a mechanism to provide interprocess communication in a client and server architecture style.

The reason I chose NamedPipe stream is because it supports full duplex communication, and is bi-directional per nature improving communication between a pipe server and one or more pipe clients.

Using NamedPipes is similar to using socket but with less code and layers of indirection to process.

PipeStream can communicate among processes through the Windows Pipes protocol. It is also great for intreprocess communication on a single computer, as a low-level abstraction that provides very high performance for sending and receiving data.

There are two type of windows pipe:

  • Named Pipe : which allows two-way communication between two processes on the same machine.
  • AnonymousPipe: which provides one-way communication between processes on the same machine.

For my scenario, I chose Named piped because it offers a two-way communication feature.

The NamedPipe has two subtype classes:

  • NamedPipeServerStream – which when instantiated will wait for a connection using the method “WaitForConnection”
  • NamedPipedClientStream – which when instantiated will attempt a connection to a NamedPipeServerStream

To ensure success it is important that in the creation of two way communication, both Named Piped streams agree on the same Name and protocol used. Equally important is to acknowledgethat both NamedPipe have to share the same size of the Data transmitted.

To help the transmission of lengthy messages, it is recommended to enable and leverage the “message transmission mode”.  If this modality is utilized, the PipeStream that is reading the message can check the “IsMessageComplete” property to evaluate if the Message is completed or if the Stream has to keep reading.

I highly recommend to use the “Message” transmission mode because it is impossible to determinate if the PipeStream has terminated sending the bytes stream or if it has completed reading a message simply checking if the Read bytes count is “0” zero.  According to the MSDN documentation, the PipeStream is acting like a Network stream which has no definite end.

I have chosen to implement the PipeStream using full Async capabilities and leveraging the F# Async computation expression.

The NamedPipServerStream out of the box uses the old Asynchronous Programming Model (APM), the NamedPipeServerStream class has BeginWaitForConnection and EndWaitForConnection methods defined, but it does not yet have a WaitForConnectionAsync method defined. To implement a custom Async method for waiting a connection it is very easy (not trivial) using the F# Async primitive types:

    type NamedPipeServerStream with
        member x.WaitForConnectionAsync() =
            Async.FromBeginEnd(x.BeginWaitForConnection, x.EndWaitForConnection)

The NamedPipClentStream doesn’t have an asynchronous version of the connect API. Similar to the process previously used with NamedPipeServerStream, F# Async primitive can be used to create an asynchronous version of the connect method. Because the NamedPipClentStream doesn’t have an Asynchronous Programming Model (APM) for the Connect method, a delegate was created to help build the Asynchronous version

    type NamedPipeClientStream with
        member x.ConnectionAsync() =
            let delConnect = new Action(x.Connect)
            Async.FromBeginEnd(delConnect.BeginInvoke, delConnect.EndInvoke)

I have to say that using F# for this project allowed me to easily write code to meet the requirements while being expressive and concise. I was able to have all my code in one single “monitor page”.

My code has been reviewed by other developers who were unfamiliar with F# and they were able to easily understand the code without issues.

Ultimately, the application is used in a client side version that involves a responsive user interface. For this reason, I was able to leverage the F# Async computation expression to build a fully asynchronous Interprocess communicator providing a great user experience

Let’s check the code step by step.

1) The server process is started and NamedPipeServerStream waits asynchronously for a connection.

        // Start the named pipe in message 
        let serverPipe = new NamedPipeServerStream( 
                              pipeName, // name of the pipe,
                              PipeDirection.InOut, // diretcion of the pipe 
                              1, // max number of server instances
                              PipeTransmissionMode.Message, // Transmissione Mode
                              PipeOptions.WriteThrough // the operation will not return the control untill the write is completed
                              ||| PipeOptions.Asynchronous)

2) The client process is started and the NamedPipeClientStream waits to be connected to the server process.

        let serverName = "." // local machine server name
        let clientPipe = new NamedPipeClientStream( serverName, //server name, local machine is .
                              pipeName, // name of the pipe,
                              PipeDirection.InOut, // diretcion of the pipe 
                              PipeOptions.WriteThrough // the operation will not return the control untill the write is completed
                              ||| PipeOptions.Asynchronous)  
 
 

I am setting two events are created to notify when the connection is established and when a message is received and completed. When the connection is successful, the NamedPipe is asynchronously waiting for incoming messages and the message received event will be triggered.

Because the program has to interpolate with code written in C#, the events are decorated with the [<CLIEventAttribute>] attribute .

        // event to notify the message is received
        let messageReceived = Event<MessageReceivedEvent>()
        // event to notify the connection is established
        let connected = Event<EventArgs>()
 
        [<CLIEventAttribute>]                               
        member x.OnMessageReceived = messageReceived.Publish
 
        [<CLIEventAttribute>]                               
        member x.OnConnected = connected.Publish
 

3) The following recursive function is partially applied with signature (byte[] -> Async<unit>).  The purpose of this function is to return a bytes array as a representation of the message received.

        // partial function (byte array -> Async<unit>)
        // keep reading incaming messages asynchronously
        // notify the message triggering the OnMessageReceived event
        let readingMessages  = 
                let bufferResizable = new ResizeArray<byte>()                                            
                let rec readingMessage (buffer: byte array) = async {
                    let! bytesRead = serverPipe.AsyncRead(buffer, 0, buffer.Length)
                    // add the bytes read to a "Resizable" collection 
                    bufferResizable.AddRange(buffer.[0..bytesRead])
                  
                    if serverPipe.IsMessageComplete then 
                        // if the message is completed fire OnMessageReceived event
                        // including the total bytes part of the message
                        let message = bufferResizable |> Seq.toArray
                        // clear the resizable collection to be ready for the next income messagw
                        bufferResizable.Clear()
                        messageReceivd.Trigger (MessageReceivedEvent(message))
                        do! readingMessage buffer
                    else
                        // the message is not completed, keep reading
                        do! readingMessage buffer }
                readingMessage                

4) The method to send a message is straight forward and self explanatory

 
        member x.Write (message:byte array) =
            if clientPipe.IsConnected && clientPipe.CanWrite then
                let write = async {
                    do! clientPipe.AsyncWrite(message,0, message.Length)
                    do! clientPipe.FlushAsync() |> Async.AwaitPlainTask
                    clientPipe.WaitForPipeDrain() }
                Async.Start(write, token.Token) 

5) The method that is listening for connection is using the Async version of the classic Asynchronous Programming Model “BeginWaitForConnection” as described previously.

        member x.StartListeing() = 
                if not <| serverPipe.IsConnected then
                    let startListening = async {
                        // wait for a connection
                        do! serverPipe.WaitForConnectionAsync()
                        // fire an event to communicate that a connection is received 
                        connected.Trigger EventArgs.Empty
                        // start receiving messages asynchronously 
                        do! (readingMessages (Array.zeroCreate<byte> 0x1000))
                    } 
                    // start listening for a connection asynchronously
                    Async.Start( startListening, token.Token )
 

For demonstration purposes I have created a struct Person that it is serialized in a bytes array to be able to be sent to the client process. The client process will receive the message and rehydrate the bytes array in the Person struct.

    [<Struct; StructLayout(LayoutKind.Sequential)>]
    type Person =
        struct 
            val name: string
            val age: int
            new (name:string, age:int) = { name = name; age = age}
        end 

Here below the C# code that is consuming the F# library.

    class Program
    {
        static PipeAsyncClient.IPClientAsync client = new PipeAsyncClient.IPClientAsync("myPipe");
        static void Main(string[] args)
        {
            byte[] data = null;
            var bf = new BinaryFormatter();
            using (var ms = new MemoryStream())
            {
                var person = new Common.Person("Riccardo", 39);
                bf.Serialize(ms, person);
                ms.Flush();
                data = ms.ToArray();
            }
 
            client.OnMessageReceived += client_MessageReceived;
            client.OnConnected += client_OnConnected;
            client.StartListeing();
 
            Console.WriteLine("Press Enter to send Message when connected");
            Console.ReadLine();
            client.Write(data);
 
            Console.WriteLine("Press Enter to Exit");
            Console.ReadLine();
            (client as IDisposable).Dispose();
        }
 
        static void client_OnConnected(object sender, EventArgs args)
        {
            Console.WriteLine("Connected");
        }
 
        static void client_MessageReceived(object sender, Common.MessageReceivedEvent args)
        {
            var bf = new BinaryFormatter();
            using (var ms = new MemoryStream(args.Message))
            {
                var person = (Common.Person)bf.Deserialize(ms);
                ms.Flush();
                Console.WriteLine("Message Received - Person Name {0} - Age {1}", person.name, person.age);
            }            
        }
    }