F# European Tour

The Geeks guide to F# travel

 

My tickets are purchased, bags packed, and laser pointer ops tested!   I am excited to begin my F# adventure which will take me to 10 presentations in 9 countries in 32 days! I will keep you posted as I travel along and invite you to follow me here or on twitter @TRikace.

 

Date Location Link
3/28 Bologna, Italy www.lamdbacon.org
3/30 Zurich, Switzerland http://www.meetup.com/zurich-fsharp-users/
3/31 Madrid, Spain http://www.meetup.com/madrid-fsharp/
4/2 Prague, Czech Republic http://www.meetup.com/Lambda-Meetup-Group/
4/6 Berlin, Germany http://www.meetup.com/thefunclub/
4/8 Amsterdam, Netherlands http://www.meetup.com/fp-ams
4/10 Paris, France meetup.com/Functional-Programming-in-F/
4/13 London, England  http://www.meetup.com/FSharpLondon/events/219808946/
4/14 Dublin, Ireland http://www.meetup.com/FunctionalKats/
4/17 London, England  https://skillsmatter.com/conferences/6724-fsharp-exchange-2015#program

Update – I am back!

I am back from my “European F# Adventure”, it was both wonderful and exhausting.  I met a lot of wonderful F# enthusiasts and had a ton of fun in the process.

The main reason for this trip was to spread the word on the benefits of F# and to establish our presence in the technology community. During and after the presentations I received many great questions and positive feedback, so much so, I am confident that F# has captured interested in several new developers.

I would like to thank the organizers of the Meetups and Conference that helped make my adventure possible and while making me feel welcome. There is no better way to visit these beautiful European Cities than under the guidance of its proud “locals” .

Thanks to Luigi Berettini (Bologna – LambdaCon), Marc Sigrist (Zurich F# UG), Alfoso Garcia (Madrid F# UG), Daniel Skarda (Praha FP UG), Kai Wu (Berlin FP UG), Michel Rijnders (Amsterdam FP UG), Tomasz Jaskula (Paris, F# UG), Philip Trelford (London F# UG), Andrea Magnorsky (Dublin FP UG), SkillsMatter London F# eXchange.

The F# Community is absolutely fantastic! During my journey I received supporting tweets and emails from fellow F#-pers that kept me going and enthusiastic for the next challenge!

 

 

The major take-away for me was that we are living in an exceptional time for technology especially for those embracing all that Functional programming and F# have to offer.  It seems that interest in our community and topics are popping up in many diverse forums.   Conferences and User roups focused on Functional Programming are multiplying with more Software Engineering and companies getting interested in the Functional Paradigm… there must be a reason!

 

Stay functional my friend!

F# and Named-Pipes

I am currently working on a project where I have an existing .NET application written in C# that is running on top of a Unity3D engine.

As part of the requirement, I have to develop a WPF host environment that in addition to hosting the Unity3D process is also able to send and receive commands.

The WPF application must also print the results in a report. The user will fill out a WPF form, which will feed the properties of a command. Once completed the user will send the command triggering an event.

In order to do this, I need a way to communicate between two different processes that could belong to different AppDomains.

2

 

 

I used the class NamedPipeClientStream and NamedPipeServerStream that support both synchronous and asynchronous read and write operations. Named pipes are a mechanism to provide interprocess communication in a client and server architecture style.

The reason I chose NamedPipe stream is because it supports full duplex communication, and is bi-directional per nature improving communication between a pipe server and one or more pipe clients.

Using NamedPipes is similar to using socket but with less code and layers of indirection to process.

PipeStream can communicate among processes through the Windows Pipes protocol. It is also great for intreprocess communication on a single computer, as a low-level abstraction that provides very high performance for sending and receiving data.

There are two type of windows pipe:

  • Named Pipe : which allows two-way communication between two processes on the same machine.
  • AnonymousPipe: which provides one-way communication between processes on the same machine.

For my scenario, I chose Named piped because it offers a two-way communication feature.

The NamedPipe has two subtype classes:

  • NamedPipeServerStream – which when instantiated will wait for a connection using the method “WaitForConnection”
  • NamedPipedClientStream – which when instantiated will attempt a connection to a NamedPipeServerStream

To ensure success it is important that in the creation of two way communication, both Named Piped streams agree on the same Name and protocol used. Equally important is to acknowledgethat both NamedPipe have to share the same size of the Data transmitted.

To help the transmission of lengthy messages, it is recommended to enable and leverage the “message transmission mode”.  If this modality is utilized, the PipeStream that is reading the message can check the “IsMessageComplete” property to evaluate if the Message is completed or if the Stream has to keep reading.

I highly recommend to use the “Message” transmission mode because it is impossible to determinate if the PipeStream has terminated sending the bytes stream or if it has completed reading a message simply checking if the Read bytes count is “0” zero.  According to the MSDN documentation, the PipeStream is acting like a Network stream which has no definite end.

I have chosen to implement the PipeStream using full Async capabilities and leveraging the F# Async computation expression.

The NamedPipServerStream out of the box uses the old Asynchronous Programming Model (APM), the NamedPipeServerStream class has BeginWaitForConnection and EndWaitForConnection methods defined, but it does not yet have a WaitForConnectionAsync method defined. To implement a custom Async method for waiting a connection it is very easy (not trivial) using the F# Async primitive types:

    type NamedPipeServerStream with
        member x.WaitForConnectionAsync() =
            Async.FromBeginEnd(x.BeginWaitForConnection, x.EndWaitForConnection)

The NamedPipClentStream doesn’t have an asynchronous version of the connect API. Similar to the process previously used with NamedPipeServerStream, F# Async primitive can be used to create an asynchronous version of the connect method. Because the NamedPipClentStream doesn’t have an Asynchronous Programming Model (APM) for the Connect method, a delegate was created to help build the Asynchronous version

    type NamedPipeClientStream with
        member x.ConnectionAsync() =
            let delConnect = new Action(x.Connect)
            Async.FromBeginEnd(delConnect.BeginInvoke, delConnect.EndInvoke)

I have to say that using F# for this project allowed me to easily write code to meet the requirements while being expressive and concise. I was able to have all my code in one single “monitor page”.

My code has been reviewed by other developers who were unfamiliar with F# and they were able to easily understand the code without issues.

Ultimately, the application is used in a client side version that involves a responsive user interface. For this reason, I was able to leverage the F# Async computation expression to build a fully asynchronous Interprocess communicator providing a great user experience

Let’s check the code step by step.

1) The server process is started and NamedPipeServerStream waits asynchronously for a connection.

        // Start the named pipe in message 
        let serverPipe = new NamedPipeServerStream( 
                              pipeName, // name of the pipe,
                              PipeDirection.InOut, // diretcion of the pipe 
                              1, // max number of server instances
                              PipeTransmissionMode.Message, // Transmissione Mode
                              PipeOptions.WriteThrough // the operation will not return the control untill the write is completed
                              ||| PipeOptions.Asynchronous)

2) The client process is started and the NamedPipeClientStream waits to be connected to the server process.

        let serverName = "." // local machine server name
        let clientPipe = new NamedPipeClientStream( serverName, //server name, local machine is .
                              pipeName, // name of the pipe,
                              PipeDirection.InOut, // diretcion of the pipe 
                              PipeOptions.WriteThrough // the operation will not return the control untill the write is completed
                              ||| PipeOptions.Asynchronous)  
 
 

I am setting two events are created to notify when the connection is established and when a message is received and completed. When the connection is successful, the NamedPipe is asynchronously waiting for incoming messages and the message received event will be triggered.

Because the program has to interpolate with code written in C#, the events are decorated with the [<CLIEventAttribute>] attribute .

        // event to notify the message is received
        let messageReceived = Event<MessageReceivedEvent>()
        // event to notify the connection is established
        let connected = Event<EventArgs>()
 
        [<CLIEventAttribute>]                               
        member x.OnMessageReceived = messageReceived.Publish
 
        [<CLIEventAttribute>]                               
        member x.OnConnected = connected.Publish
 

3) The following recursive function is partially applied with signature (byte[] -> Async<unit>).  The purpose of this function is to return a bytes array as a representation of the message received.

        // partial function (byte array -> Async<unit>)
        // keep reading incaming messages asynchronously
        // notify the message triggering the OnMessageReceived event
        let readingMessages  = 
                let bufferResizable = new ResizeArray<byte>()                                            
                let rec readingMessage (buffer: byte array) = async {
                    let! bytesRead = serverPipe.AsyncRead(buffer, 0, buffer.Length)
                    // add the bytes read to a "Resizable" collection 
                    bufferResizable.AddRange(buffer.[0..bytesRead])
                  
                    if serverPipe.IsMessageComplete then 
                        // if the message is completed fire OnMessageReceived event
                        // including the total bytes part of the message
                        let message = bufferResizable |> Seq.toArray
                        // clear the resizable collection to be ready for the next income messagw
                        bufferResizable.Clear()
                        messageReceivd.Trigger (MessageReceivedEvent(message))
                        do! readingMessage buffer
                    else
                        // the message is not completed, keep reading
                        do! readingMessage buffer }
                readingMessage                

4) The method to send a message is straight forward and self explanatory

 
        member x.Write (message:byte array) =
            if clientPipe.IsConnected && clientPipe.CanWrite then
                let write = async {
                    do! clientPipe.AsyncWrite(message,0, message.Length)
                    do! clientPipe.FlushAsync() |> Async.AwaitPlainTask
                    clientPipe.WaitForPipeDrain() }
                Async.Start(write, token.Token) 

5) The method that is listening for connection is using the Async version of the classic Asynchronous Programming Model “BeginWaitForConnection” as described previously.

        member x.StartListeing() = 
                if not <| serverPipe.IsConnected then
                    let startListening = async {
                        // wait for a connection
                        do! serverPipe.WaitForConnectionAsync()
                        // fire an event to communicate that a connection is received 
                        connected.Trigger EventArgs.Empty
                        // start receiving messages asynchronously 
                        do! (readingMessages (Array.zeroCreate<byte> 0x1000))
                    } 
                    // start listening for a connection asynchronously
                    Async.Start( startListening, token.Token )
 

For demonstration purposes I have created a struct Person that it is serialized in a bytes array to be able to be sent to the client process. The client process will receive the message and rehydrate the bytes array in the Person struct.

    [<Struct; StructLayout(LayoutKind.Sequential)>]
    type Person =
        struct 
            val name: string
            val age: int
            new (name:string, age:int) = { name = name; age = age}
        end 

Here below the C# code that is consuming the F# library.

    class Program
    {
        static PipeAsyncClient.IPClientAsync client = new PipeAsyncClient.IPClientAsync("myPipe");
        static void Main(string[] args)
        {
            byte[] data = null;
            var bf = new BinaryFormatter();
            using (var ms = new MemoryStream())
            {
                var person = new Common.Person("Riccardo", 39);
                bf.Serialize(ms, person);
                ms.Flush();
                data = ms.ToArray();
            }
 
            client.OnMessageReceived += client_MessageReceived;
            client.OnConnected += client_OnConnected;
            client.StartListeing();
 
            Console.WriteLine("Press Enter to send Message when connected");
            Console.ReadLine();
            client.Write(data);
 
            Console.WriteLine("Press Enter to Exit");
            Console.ReadLine();
            (client as IDisposable).Dispose();
        }
 
        static void client_OnConnected(object sender, EventArgs args)
        {
            Console.WriteLine("Connected");
        }
 
        static void client_MessageReceived(object sender, Common.MessageReceivedEvent args)
        {
            var bf = new BinaryFormatter();
            using (var ms = new MemoryStream(args.Message))
            {
                var person = (Common.Person)bf.Deserialize(ms);
                ms.Flush();
                Console.WriteLine("Message Received - Person Name {0} - Age {1}", person.name, person.age);
            }            
        }
    }