Concurrent Pipeline with .NET Channels

This post is part of the Fantastic F# Advent Calendar 2021 that has become a Christmas tradition over the years to bring the F# community together in celebration of the holiday season. A special thank you to our organizer, Sergey Tihon (@sergey_tihon) who is keeping this tradition vibrant and magical.

In this post we will leverage the .NET Channels to implement reusable and composable building block functions to define a concurrent workflow. 

In this blogpost example, we are implementing a Pipeline to process a set of images read from the local filesystem. The idea is to create few reusable building blocks that use Channels as an underlying way to implement a high-performant procedures/consumers pattern, and then to use these building blocks to combine the image processing pipeline. In addition, we will apply a “pipeline” higher-order function to these building blocks, which shares the same output type, in order to ease the composition of multiple stages and the option to set a degree of parallelism.

You can download the code here

What are .NET Channels 

.NET Channels is a static class that aims to provide high performant thread-safe data synchronization in a multithreaded program.

At the core, Channels supports shared data between producers and consumers in a concurrent fashion. Furthermore, Channels supports advanced producer and consumer patterns because one or many producers can write data into the channel, which are then read by one or many consumers. Logically a channel is effectively an efficient, thread-safe queue.

.NET Channels is a static class that exposes factory methods to create two main types of channels (bounded and unbounded). 

(From Microsoft documentation)

CreateBounded<’a> creates a channel with a finite capacity. In this scenario, it’s possible to develop a producer/consumer pattern which accommodates this limit. We can use this type of Channel to store at most a specified number of items. The channel may be used concurrently by any number of reads and writers. Attempts to write to the channel when it contains the maximum allowed number of items results in behavior according to the mode specified when the channel is created, and can include waiting for space to be available, dropping the oldest item, or dropping the newest item. For example, you can have your producer (non-blocking) capacity within the channel before it completes its write operation. This is a form of backpressure; which, can slow your producer down, or even stop it until the consumer has read some items and created capacity.

In F# we can create a Bounded channel with a buffer of the items as follow:


let channel = Channel.CreateBounded<'a>(10)

CreateUnbounded<’a> creates a channel with an unlimited capacity, meaning that Publishers can publish as many times as they want, hoping that the Consumers are able to keep up.  In this scenario, without a capacity limit, the channel will keep accepting new items. When the consumer is not keeping up, the number of queued items will continue to incresase. Each item being held in the channel requires memory which can’t be released until the object has been consumed. Therefore, in this scenario it’s possible to run out of available memory.

In F# we can create an Unbounded channel with unlimited buffer as follow:


let channel = Channel.CreateUnbounded<'a>()

Choosing the right Channel type is extremely important and highly depends on the context. Also, keep in mind that while it’s true that Unbounded Channels are indeed “unbounded”, the memory on the machine normally isn’t.

Channel and back-pressure

The term backpressure is borrowed from fluid dynamics and relates to the software systems dataflow. Backpressure occurs when a computer system can’t process the incoming data fast enough, so it starts to buffer the arriving data until the space to buffer it is reduced to the point of deteriorating the responsiveness of the system or, worse, raising an “Out of Memory” exception.

When creating bounded Channels to prevent backpressure issues, we can provide bounded options in the constractor to apply different strategies such as:


Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
     FullMode = BoundedChannelFullMode.Wait  // Wait - Wait for space to be available in order to complete the operation.    
    BoundedChannelFullMode.DropOldest // Remove and ignore the oldest item in the channel in order to make room
    BoundedChannelFullMode.DropWrite // Drop the item being written.
});

Reader and writer

A Channels I instance exposes two properties:

These properties, as the names suggest, provide a set of methods (everything is done) to either write into the Channels or to read from:

  • TryRead/TryWrite: Attempt to read or write an item synchronously, returning whether the read or write was successful.
  • ReadAsync/WriteAsync: Read or write an item asynchronously. These will complete synchronously if data/space is already available.
  • TryComplete/Completion: Channels may be completed, such that no additional items may be written; such channels will “complete” when marked as completed and all existing data in the channel has been consumed. Channels may also be marked as faulted by passing an optional Exception to Complete; this exception will emerge when awaiting on the Completion Task, as well as when trying to ReadAsync from an empty completed collection.
  • WaitToReadAsync/WaitToWriteAsync: Return a Task<bool> that will complete when reading or writing can be attempted. If the task completes with a true result, at that moment the channel was available for reading or writing, though because these channels may be used concurrently, it’s possible the status changed the moment after the operation completed. If the task completes with a false result, the channel has been completed and will not be able to satisfy a read or write.

Why Channels

.NET Channels offers performance advantages when used to implement a producer consumer pattern when compared to other approaches such as TPL Dataflow and Concurrent Collections. In addition, the Channels asynchronous semantic provides a simple and straightforward way exploit and introduce high performant and thread-safe queue in any modern applications. Channels are a perfect fit when you need to have one or more dedicated threads for handling the queue.

In general, the producer consumer model can help improve the throughput of the application.

Another benefit of using Channels is to decouple producers and consumers, which has the advantage of generating independent producers and consumers from each other, that can be executed in parallel.

 

Producer/Consumer Pattern

Whenever using programs with any multi-step workflow, consideration must be given to some sort of producer and consumer pattern, which is commonly running serially. In almost every software we write there is a pipeline to achieve, where once a step is completed, then the output is passed to the next step in line, freeing up space for another execution. This pattern is based on the concept that every step in the series must be executed in total isolation, receiving data, processing it, and then pass it over to the next block. Consequently, every block should execute in its own thread, to guarantee the appropriate isolation and encapsulation.  

The producer and consumer patterns can be implemented in different topologies:

  • one Producer and one Consumer
  • one Producer and multiple Consumers
  • multiple Producers and one Consumers
  • multiple Producers and multiple Consumers

Here us a simple implementation of a producer/consumer pattern using channel. From this case we are expanding a more advanced case later in this blog.


let channel = Channel.CreateUnbounded<int>()

let producer () = task {
     for item in [0..100] do
         do! channel.Writer.WriteAsync(item)
     channel.Writer.Complete()
 }

let consumer () = task {
     do!
       channel.Reader.ReadAllAsync()
       |> forEachAsync (fun item -> task {
            printfn $"Received item %d{item}"
       })
}

let runProducerConsumer () =
  let producerTask =  Task.Run(Func<Task>( producer ))
  let consumerTask =  Task.Run(Func<Task>( consumer ))
  Task.WhenAll([producerTask; consumerTask])

In the code, after having initialized a “channel”, the function “producer” that generates the data items sent (or written into) the Channel Writer. Then we notify the channel that the data items generation is completed with the “Writer.Complete()” call. Next, the method “consumer” ingests the data items from the “Channel Reader” to produce a console output. The “Channel Reader” exposes the “ReadAllAsync” method, which allows to read from the channel Asynchronously through the interface “IAsyncEnumerable”. In F#, to support this interface we implement a helper function “forEachAsync”:


let forEachAsync (f: 'a -> Task<unit>) (asyncEn: IAsyncEnumerable<'a>) =
     task {
         let mutable canMoveNext = true
         let enumerator = asyncEn.GetAsyncEnumerator()
         while canMoveNext do
             let! next = enumerator.MoveNextAsync()
             canMoveNext <- next
             if canMoveNext then
                 do! f enumerator.Current } :> Task

In this implementation of the function “forEachAsync”, we assume an ad-hoc case where the output of the AsyncEnumerble stream is either passed into a Channel or produces a side effect that returns “unit”. 

The last method “runProducerConsumer” executes concurrently the previous methods each spawned in a different Task.

.NET Channels helps us deal with these scenarios efficiently by guarantying those producers and consumers, implemented in any shapes, can handle their own tasks without interfering with each other, which is conducive to the concurrent processing of both sides.

Pipelines and Producer/Consumer

In many scenarios, we can create a workflow from the combination of two or more producer/consumer steps, each step performing a different job. This pattern is commonly called pipeline. A pipeline is a concurrency model where a work is performed and moved through several processing stages. Each step performs a part of the complete work, and when it’s completed, the output is passed to the following next stage. In addition, each stage of the pipeline runs in a separate thread, and it does not share any state with the other stages.

Implementing a pipeline using channels

In general, a pipeline begins with a producer method, which starts a process to generate some data that is passed to the first stage of the pipeline, and then the output is moved through the stages. The intermediate stages are executed concurrently. In this case, the Channel operates as a transport mechanism between the stages. The design of a concurrent Pipeline using Channels is composed by stages that take a “Reader Channel” as an input, executes work on each data item asynchronously, and then transfers the result into an “Writer Channel”. 

From the previous producer/consumer implementation using .NET Channels, we can extract a reusable function to behave as base building block to define a workflow. Here is the code implementation:


let pipeline (cTok: CancellationToken) (projection: 'a -> Task<'b>) (reader: ChannelReader<'a>) =
    let pipe = Channel.CreateBounded<'b>(10)
    let writer = pipe.Writer
    let _ = Task.Run(Func<Task>(fun () -> castTask <| task {
            do!
                reader.ReadAllAsync(cTok)
                 |> forEachAsync (fun item -> task {
                      let! outputItem = projection item
                      do! writer.WriteAsync(outputItem)
                    }) cTok
            writer.Complete()
        }
      )
    )
    pipe.Reader

The function “pipeline” takes as input arguments:

  • a cancellation-token, which can be used to stop the pipeline stages
  • a projection function that transforms the data item input read from the ChannelReader<a’>
  • a ChannelReader<a’> to read the data item 

Internally, the “pipeline” function initializes a “bounded” channel (to keep the code simple we set the buffer to 10, but we can expand the function to set this value as needed). This channel is used to write into it asynchronously the output of the projection function. Then, this channel is returned from the function to be passed in other stages of the workflow to share the data for further processing.

The “pipeline” function runs the “consume and produce” logic into a dedicated Task to enable support for concurrency.

The same idea applies to the implementation of an action block, which produces the final side effect of the workflow. 


let pipelineAction (cTok: CancellationToken) (action: 'a -> Task) (reader: ChannelReader<'a>) =
    Task.Run(Func<Task>(fun () -> castTask <| task {
            do!
                reader.ReadAllAsync(cTok)
                 |> forEachAsync (fun item -> task {
                      do! action item
                    }) cTok
        }
      )
    )

TPL Dataflow vs Channels

The System.Threading.Tasks.Channels library provides a set of synchronization data structures for passing data between producers and consumers. Whereas the existing System.Threading.Tasks.Dataflow library is focused on pipelining and connecting together dataflow “blocks” which encapsulate both storage and processing, System.Threading.Tasks.Channelsis focused purely on the storage aspect, with data structures used to provide the hand-offs between participants explicitly coded to use the storage. The library is designed to be used with async/await in C#.

Channel Broadcast

The idea of the Broadcast Channel stage is to provide a reusable function that takes as input a “Channel Reader”, which is used to read data from, and return a set of “Channel Readers” that transport a copy of the same data item. Each of those “Channel Readers” is then connected to a different stage of the pipeline allowing it to process the data concurrently. The Broadcast Channel is useful when dealing with work stages that can originate backpressure because the execution time to consume a data item is slower than the volume of the data items produced by the previous step in the pipeline. Leveraging the Broadcast stage, we can distribute the workload among concurrent stages increasing the capacity of the slow stage to cope with a large volume of data to process. The Broadcast Channel stage takes an input channel and distributes its messages amongst several outputs. The Broadcast stage runs concurrently and in a non-blocking fashion. 

Here the code implementation:


let broadcast (cTok: CancellationToken) n (channel: ChannelReader<'a>) =
    let outputs = Array.init n (fun i -> Channel.CreateUnbounded<'a>())
    let _ = Task.Run(Func<Task>(fun () -> castTask <| task {
        do!
            channel.ReadAllAsync(cTok)
            |> forEachAsync (fun item -> task {
                for output in outputs do
                    do! output.Writer.WriteAsync(item, cTok)
                }) cTok
        for output in outputs do output.Writer.Complete()
    }))
    outputs |> Array.map(fun ch -> ch.Reader)

The “broadcast” function takes as input arguments :

  • a cancellation-token, which can be used to stop the pipeline stages
  • an arbitrary number that defines the count of new channels to initialize for the data sharing
  • a ChannelReader<a’> to read the data item from

When this block runs, a set of “n” new channels is initialized and used to forward the data items read from the channel reader. With this design, each new channel receive a copy of the same data item. Then, the function “broadcast” returns the newly created set of channels to distribute the work among other channels, and possibly to different branches of the workflow. This broadcast” function is useful to increase the degree of parallelism in our pipeline implementation.

Channel Join

The same concept used to implement to Broadcast Channel applies in reverse, where we join a set of “Reader Channels” into a single “Reader Channel” stream. In this way, we can implement a Join stage to read concurrently data items from two or more stages by combining their outputs into a single “Reader Channel”.

Here the code implementation, which is the reverse of the previous “broadcast” function:


let join (cTok: CancellationToken) (inputs: ChannelReader<'a> array) =
    let output = Channel.CreateUnbounded<'a>()
    let writeAsync (input: ChannelReader<'a>) =
        task {
            do!
                input.ReadAllAsync(cTok)
                |> forEachAsync (fun item -> task {
                     do! output.Writer.WriteAsync(item)
                   }) cTok
        } :> Task
    let _ = Task.Run(Func<Task>(fun () -> castTask <| task {
        do!
            inputs
            |> Array.map writeAsync
            |> Task.WhenAll
        output.Writer.Complete()
        }))
    output.Reader

In this implementation, a collection of “Reader Channels” is passed as input into the “join” function, which begins to read data items concurrently and directly passed into a local new single Channel which fuses the outputs from the input channels. The Join stage runs concurrently and in a non-blocking fashion. 

Code implementation details

We have created few reusable building blocks:

  • the “pipeline” block, which helps to create and easily compose the workflow stages 
  • the “broadcast” block, from the TPL Dataflow Broadcast block idea, which enables you to dispatch a copy of the same data item across other stages and/or increase the degree of parallelism of a give stage
  • the “join” block, from the TPL Dataflow Join block idea, which allows you to fuse together into a single channel the output of a set of give channels 

With the help of these building blocks, we can define our workflow for the image processing. For this blogpost, we implement three variants of this pipeline:

  • Sequential Pipeline, which is the simplest implementation of the image processing workflow composing the stages using the “pipeline” function
  • ForkJoin Pipeline, which expands the Sequential Pipeline with the exploit of the “Broadcast” and “Join” functions to improve the performance of the pipeline
  • Multi ForkJoin Pipeline, which expands the ForkJoin Pipeline design to support multiple workflow branches

The initial Step. Generate the data

The initial stage of our pipeline generates the data items, which are the begining of the workflow. This stage reads the image files from a give folder, and sends the file read to the next stage of the pipeline functioning as producer.


let dataGenerator sourceImages (cTok: CancellationToken) : ChannelReader<string> =
    let images = Directory.GetFiles(sourceImages, "*.jpg")
    let channel = Channel.CreateBounded<string>(10)
    let rnd = Random()

    let _ = Task.Run(Func<Task>(fun () -> castTask <| task {
        do!
            images
            |> Seq.map (fun image ->
                    task {
                        do! channel.Writer.WriteAsync(image, cTok)
                        do! Task.Delay(TimeSpan.FromSeconds(float(rnd.Next(3))))
                    }
                    :> Task)
            |> Task.WhenAll
        channel.Writer.Complete()
        }), cancellationToken = cTok
    )
    channel.Reader

The next step is loading the images from the filesystem to create a record type ImageInfo, which is used to keep details of the image file, and ultimately it passes the output to a Channel that is returned from this function and ready to be passed to the next stage of the pipeline functioning as producer.

The next steps follow the same pattern, therefore I left out the repetition of the same code details. 

The Sequential Pipeline

Here is the code implementation of the Sequential Pipeline:


let executeSequential (source: string) destination = task {

    if Directory.Exists destination |> not then
        Directory.CreateDirectory destination |> ignore

    let cTok = new CancellationTokenSource()

    let pipelineCtok f = pipeline cTok.Token f

    do!
        dataGenerator source cTok.Token
        |> pipelineCtok (fun path -> task {
            let! image = ImageProcessingHelpers.loadImageAsync path
            let outputItem =
                 {
                     name = Path.GetFileName(path)
                     source = path
                     destination = destination
                     image = image
                 }
            return outputItem
            })
        |> pipelineCtok (fun imageInfo -> ImageProcessingHelpers.scaleImage imageInfo)
        |> pipelineCtok (fun imageInfo -> ImageProcessingHelpers.convertTo3D imageInfo)
        |> pipelineAction cTok.Token (fun imageInfo -> ImageProcessingHelpers.saveImage imageInfo)
}

In the code creating the different steps of the workflow:

  1. loadImageAsync
  2. scaleImage
  3. convertTo3D
  4. saveImage

Then we composed them together using the “pipeline” function. The last step of the pipeline uses “pipelineAction” function to save each the image processed.

The ForkJoin Pipeline

Here the code implementation of the ForkJion Pipeline:


let executeForkJoin (source: string) destination = task {

    if Directory.Exists destination |> not then
        Directory.CreateDirectory destination |> ignore

    let cTok = new CancellationTokenSource()
    let pipelineCtok f = pipeline cTok.Token f

    let broadcast = broadcast cTok.Token 4 (dataGenerator source cTok.Token)

    let pipe reader =
        reader
        |> pipelineCtok (fun path -> task {
            let! image = ImageProcessingHelpers.loadImageAsync path
            let outputItem =
                 {
                     name = Path.GetFileName(path)
                     source = path
                     destination = destination
                     image = image
                 }
            return outputItem
            })
        |> pipelineCtok (fun imageInfo -> ImageProcessingHelpers.scaleImage imageInfo)
        |> pipelineCtok (fun imageInfo -> ImageProcessingHelpers.convertTo3D imageInfo)

    do!
        broadcast
        |> Array.map pipe
        |> join cTok.Token
        |> pipelineAction cTok.Token (fun imageInfo -> ImageProcessingHelpers.saveImage imageInfo)
}

In this code implementation, we are expanding the Sequential Pipeline to run concurrently in four different tasks using the broadcast function, which aims to improve the performance because we can process four images simultaneously. Ultimately, we are combining the Channel outputs into a single stream Channel to save the images as a last single step. 

The Multi ForkJoin Pipeline

Here the code implementation of the MultiForkJion Pipeline:


let executeMultiForkJoin (source: string) destination = task {

    if Directory.Exists destination |> not then
        Directory.CreateDirectory destination |> ignore

    let cTok = new CancellationTokenSource()
    let pipelineCtok f = pipeline cTok.Token f

    let pipe transform reader =
        reader
        |> pipelineCtok (fun path -> task {
            let! image = ImageProcessingHelpers.loadImageAsync path
            let outputItem =
                 {
                     name = Path.GetFileName(path)
                     source = path
                     destination = destination
                     image = image
                 }
            return outputItem
            })
        |> pipelineCtok (fun imageInfo -> ImageProcessingHelpers.scaleImage imageInfo)
        |> pipelineCtok (fun imageInfo -> transform imageInfo)

    let pipe3D = pipe (fun imageInfo -> ImageProcessingHelpers.convertTo3D imageInfo)
    let pipeRedFilter = pipe (fun imageInfo -> ImageProcessingHelpers.setFilter imageInfo ImageFilter.Red)
    let pipeBlueFilter = pipe (fun imageInfo -> ImageProcessingHelpers.setFilter imageInfo ImageFilter.Blue)
    let pipeGreenFilter = pipe (fun imageInfo -> ImageProcessingHelpers.setFilter imageInfo ImageFilter.Green)


    let collapse (source: ChannelReader<'a>) (maps: (ChannelReader<'a> -> ChannelReader<'b>) array) =
        let countBranches = maps |> Array.length
        let sources = broadcast cTok.Token countBranches source // 
        Array.zip sources maps
        |> Array.map (fun (reader, map) -> map reader)
        |> join cTok.Token

    let dataSource = dataGenerator source cTok.Token
    do!
       collapse dataSource [|pipe3D; pipeRedFilter; pipeBlueFilter; pipeGreenFilter|]
       |> pipelineAction cTok.Token (fun imageInfo -> ImageProcessingHelpers.saveImage imageInfo)
}

In this code implementation, we are expanding the ForkJoin Pipeline to run concurrently different branches of the image processing workflow. Each branch applies different image processing concurrently fusing together the different stages using the “collapse” function, which creates a pair between the source of the data items with the different workflow branches.


let collapse (source: ChannelReader<'a>) (maps: (ChannelReader<'a> -> ChannelReader<'b>) array) =
        let countBranches = maps |> Array.length
        let sources = broadcast cTok.Token countBranches source  
        Array.zip sources maps
        |> Array.map (fun (reader, map) -> map reader)
        |> join cTok.Token

In the “collapse” function, the source of the data items is based on the “broadcast” function, which are fused together with the “join” function after their processing. The advantage of this design is that each branch of the workflow runs simultaneously with an independent and dedicated task that handles a copy of the data items produced from the initial stage.

I hope this brings you some merry coding and good cheer!  Happy Holidays to you and your family, looking forward to wonderful new year with the vibrant F# community!  

Build a Recommendation engine with ML.NET and F#

In this blog we are going to build a shopping/product recommendation service using F# and ML.NET in NET Core.

I have watched how Machine Learning (ML) tools and trends have evolved in recent years.  I am impressed by the great implementation work MSFT is doing, especially the tool ML.NET.  This tool aids developers to integrate ML in any application easily without the need of having a PhD in ML.

Machine learning has long been a difficult and foreign topic for developers working in the .NET space. Historically, if you had to build a machine learning model, you needed to use programming languages like Python or R.  Still today they are the most popular tools to create and train your ML models.  Although these are great programming languages, if you wanted to enter the ML area, you might have the double challenge to learn ML concepts and new language tools.  This is the benefit that ML brings, you can adopt a tool that you are already familiar with like .NET and NET Core, skipping the learning curve of Python and R.  ML.NET allows developers to easily integrate ML in their code without leaving the .NET space and using tools that we are familiar with.

What is ML.NET 

It is a cross-platform and open-source framework that aims to bring the capabilities of machine learning to .NET ecosystem to cover and solve a variety of scenarios, such as sentiment analysis, price prediction, recommendation, image classification, and so forth.

ML.NET is much more than just a machine learning library; it is an extensible platform that is able to leverage and integrate other ML infrastructure libraries and runtimes, such as TensorFlow and ONNX in a way that you don’t have to know how to use Tensorflow API.   ML.NET abstracts away from you the implementation details to use these libraries

In addition, there is the .NET part, which means that ML.NET allows you to train, build, and ship custom machine learning models reusing your existing experience and skillsets in .NET, which is great because we don’t have to leave the .NET ecosystem

Building an ML Model involves few steps

This diagram represents the code structure and the process of development in ML.NET

First, we need to detect the problem and isolated a good set of quality data, which has to be cleaned and normalized. Then, we need to load the data, and for this step ML.NET offers different options, such as a Text, a CSV file, from a database, and of course as a stream to reduce memory consumption. 

Following that, you need to Extract the features transforming the input data; this is because Machine learning algorithms understand featurized data (only numbers). 

Next, we have to Train the model, for this step we need to add a learner/trainer specifying what column is the feature and what column is the label (or goal) to predict.

Once the estimator has been defined, we can train the model providing the already loaded training data. This returns a model which you can use for predictions.  Now we can Evaluate the trained model for estimate the accuracy.  If we are happy with the Model, we can consume it to achieve predictions passing some new inputs, otherwise we can re-train it in a way to get better accuracy.

Let’s build a product recommendation

You can follow along the steps, and you can find/download the complete source code here.

The goal of a product recommendation engine is to be able to predict the most popular product combinations given a purchased product. To build this product recommendation engine we are going to use the data-set from Amazon that contains a large number of product purchases based on the “Customers Who Bought This Item Also Bought” feature, which contains over a million of product combinations. 

The file format is a simple TSV (Tab Separated Values) with two columns, one for the product-id purchased and a second one the product-id purchased in combination (bought by customers who also bought the first product).

Here the link of the file SNAP Amazon Co-Purchasing Network. The source code contains this file compressed.

This is how the file data-set looks like:

Let’s start creating a simple Console App, I personally use Rider XXXX as an IDE, you can use your favorite tool, more important, is to import the necessary nuget packages to exploit the ML.NET library. For the recommendation engine we need these packages

  • Microsoft.ML
  • Microsoft.ML.Recommender

I am breaking in 9 steps when implementing a ML.NET project

  1. Define the data-model
  2. Create MLContext
  3. Load the data
  4. Split the Data
  5. Convert the Data /  Create Pipeline
  6. Training the algorithm for the model
  7. Use the test data against the model
  8. Check accuracy and improve (cross-validation)
  9. Use the model

The first step consists in defining the data-model to reflect in code the data we are dealing with. In this case we have a TSV file with the information of the product purchases. We can use two Record-Types to define respectively our features and predicted attribute.  These objects have to be mutable for compatibility reasons, in F# we can achieve this decorating the Record-Types with the CLIMutable attribute.

[<CLIMutable>]
type Product = {
    [<LoadColumn(0)>] ProductID : float32
    [<LoadColumn(1)>] [<ColumnName("Label")>] CombinedProductID : float32
}

[<CLIMutable>]
type ProductPrediction = {
    [<ColumnName("Score")>] Score : float32
}

These Record-Types define how the schema of data will look (column names & column types).

The Product Record-Type represents a distinct product purchase combination. Each field of the Record-Type is decorated with the LoadColumn attribute, which notifies the data loader which column to import data from based on the index.

The second Record-Type ProductPrediction denotes the product combination prediction.  In the second step we need to initialize the MLContext., which is the starting point for all ML.NET operations, and it used for all aspects of creating and consuming a ML model.

At its core, MLContext  exposes a set of catalogues like Data, Model and Transformers that basically give access to the API that let us load data, and transform and prepare the data, apply learning algorithms, and make predictions and so forth.

let context = MLContext(seed = Nullable 1)

Next, in the third step we load the data in memory, generating two sub-sets respectively one for training the model and one to test the model trained.

let data = context.Data.LoadFromTextFile<Product>(dataPath, hasHeader = true, separatorChar = '\t')

// Step 3 - Split the Data
let dataPartitions = context.Data.TrainTestSplit(data, testFraction = 0.2)
let trainSet = dataPartitions.TrainSet
let testSet = dataPartitions.TestSet

In this code, we use the method LoadFromTextFile to load the TSV data into memory.  The attribute LoadColumn instructs the method on how to store the loaded data into the Product Record-Type.

When we load data for training, we get back an object that implements the IDataView interface, which is a tabular representation of the data.  

The IDataView is designed to efficiently handle high-dimensional and large data sets and is the component that holds the data during data transformations and model training.  In addition, it is very efficient for loading data in memory, in fact ML.NET was designed to handle datasets of theoretically infinite size using a forward cursor, almost like the cursor in SQL where you might have a table with millions of records, but you touch only few at a given time using the cursor.  

When loading the data, it is common practice to divide the data for the training and testing. 

To partition the data, we split the data-set into a training and a testing sub-set using the TrainTestSplit method. In this case we use 80% of the data for training and 20% of the data for testing. Successively, there will be multiple iterations, where for each iteration the data is shuffled, so that the Training set of data and the Test set data keep changing.  During each iteration we adjust the model to be more accurate.  The goal of a machine learning model is to accurately make predictions on data it has not seen before. Therefore, making predictions using inputs that are the same as those it was trained on may provide misleading accuracy metrics.

After we have loaded the data is time for building the machine learning model.  In general, the implementation of a recommendation engine that generates predictions is based on the Matrix Factorization algorithm.  In our case, we have “only” two IDs as data fields, we are limited to use the One-Class Matrix Factorization.

Matrix factorization (recommender systems)

The idea behind matrix factorization is to represent users and items in a lower dimensional latent space. Since the initial work by Funk in 2006 a multitude of matrix factorization approaches have been proposed for recommender systems. Some of the most used and simpler ones are listed in the following sections.

Matrix factorization is a class of collaborative filtering algorithms used in recommender systems. Matrix factorization algorithms work by decomposing the user-item interaction matrix into the product of two lower dimensionality rectangular matrices. This family of methods became widely known during the Netflix prize challenge due to its effectiveness as reported by Simon Funk in his 2006 blog post, where he shared his findings with the research community. The prediction results can be improved by assigning different regularization weights to the latent factors based on items’ popularity and users’ activeness.

Thankfully, the MLNET library supports all these algorithms, it just requires a bit of a simple set up.

let options = MatrixFactorizationTrainer.Options(   MatrixColumnIndexColumnName = "ProductIDEncoded",
                                                    MatrixRowIndexColumnName = "CombinedProductIDEncoded",
                                                    LabelColumnName = "Label",
                                                    LossFunction = MatrixFactorizationTrainer.LossFunctionType.SquareLossOneClass,
                                                    Alpha = 0.01,
                                                    Lambda = 0.025 )
let pipeline =
    Common.printCyan "Create pipeline..."
    EstimatorChain()
     // map ProductID and CombinedProductID to keys
     .Append(context.Transforms.Conversion.MapValueToKey(inputColumnName = "ProductID", outputColumnName = "ProductIDEncoded"))
     .Append(context.Transforms.Conversion.MapValueToKey(inputColumnName = "Label", outputColumnName = "CombinedProductIDEncoded"))
     // find recommendations using matrix factorization
     .Append(context.Recommendation().Trainers.MatrixFactorization(options))

ML.NET uses the concepts of pipelines; data pipelines and training pipelines is a concept that ensures we have all the points connected from generating the model to running the prediction. You can think of the ML.NET model pipeline as a chain of estimators.  For example, we can load the data, and then append the “Transform the data” step of the pipeline, and then append the “train step” and so forth, until we can run and evaluate the model.

The connotation of pipeline in ML.NET is basically building or aggregating a list of operations that must be performed in order to apply a machine learning algorithm to a model and to prepare the data to feed into this model before the training is done.

In the previous code, to set up the matrix factorization we are providing the LossFunction, which for this case we use the SquareLossOneClass function because it fits the case of building a recommendation model that only uses two values.

The Label column is our goal, it is what we want to predict, which in this case is mapped to the “CombinedProductID” field annotated with the Label ColumnName Attribute of the Product Record-type. 

In the option setting, the Alpha and Lambda parameters are used speed up the training and boost the accuracy of the factorization algorithm.

Here is a more in-depth look at the ML.NET for the recommendation engine.  It has the following sections:

  • MapValueToKey: this step converts the IDs to numbers that the model can understand. The method MapValueToKey reads the ProductID column and builds a dictionary of unique ID values that is used to generate an output column called ProductIDEncoded containing an encoding for each ID. The same operation happens for the Label column (mapped to the CombinedProductID column)
  • MatrixFactorization is part of the “Trainers” module, and it executes the “Matrix Factorization” on the encoded outputs of the previous step MapValueToKey to compute the predicted product combinations for every product.

With the ML.NET pipeline completely set up, we are finally able to train the model on the training partition calling the Fit method

let model = trainSet |> pipeline.Fit

At this point that we have a trained model, we can use the validation data to predict all product combinations, which it will helps us to compute the accuracy metrics of the model.

let metrics = testSet |> model.Transform |> context.Regression.Evaluate

// show the metrics
Common.printGreen (sprintf "Model metrics:")
Common.printGreen (sprintf "  RMSE:%f" metrics.RootMeanSquaredError)
Common.printGreen (sprintf "  MSE: %f" metrics.MeanSquaredError)

This code runs the Transform method to make predictions for every product combination in the test set. Then, the Evaluate method compares these predictions to the concrete product combinations to calculate the model metrics, such as the RMSE value, which evaluates the model and rate its accuracy. RMSE represents the length of a vector in n-dimensional space, made up of the error in each individual prediction.

The last step, if we are happy with the accuracy of the model achieved, is to use the model to make a prediction.

To initialize a prediction engine we use the CreatePredictionEngine method. The generic type constructors for this object are the input class Product and the output class ProductPrediction that produces the prediction. When the prediction engine is initialized, we can run the Predict method to obtain the prediction.

As a starting test we can run few predictions on a specific product with ID 21 to verify if it’s frequently purchased together with product ID 77.

This code runs the Transform method to make predictions for every product combination in the test set. Then, the Evaluate method compares these predictions to the concrete product combinations to calculate the model metrics, such as the RMSE value, which evaluates the model and rate its accuracy. RMSE represents the length of a vector in n-dimensional space, made up of the error in each individual prediction.

The last step, if we are happy with the accuracy of the model achieved, is to use the model to make a prediction.

To initialize a prediction engine we use the CreatePredictionEngine method. The generic type constructors for this object are the input class Product and the output class ProductPrediction that produces the prediction. When the prediction engine is initialized, we can run the Predict method to obtain the prediction.

As a starting test we can run few predictions on a specific product with ID 21 to verify if it’s frequently purchased together with product ID 77.

Here the code to to make the prediction:

let engine : PredictionEngine<Product, ProductPrediction> = context.Model.CreatePredictionEngine model

let runPrediction () =
    let productInfo = {
        ProductID = 21.f
        CombinedProductID = 77.f
    }

    let prediction = productInfo |> engine.Predict

    // show the prediction
    Common.printRed (sprintf "Score for product %f combined with %f is %f" productInfo.ProductID productInfo.CombinedProductID prediction.Score)

which output:

At this point we have our recommendation engine working. 

A useful case is to apply the recommendation engine to find the 3 top products to recommend based on a given purchased product. To achieve the goal, we run the prediction iterating through each unique product to predict how it will fit in combination with the purchased one. Then we sort by the highest prediction score and take the top 3. 

Here the code demonstrates how to obtain the 3 recommended products with highest scores for the purchased product with id 61.  

let runPredictionBestMatches productId topN =
    let bestRecommendedProducts productId topN =
        seq {
            for index = 1 to 262110 do
                let product =  {
                    ProductID = float32 productId
                    CombinedProductID = float32 index
                }
                let prediction = engine.Predict product
                {| Score = prediction.Score; ProductID = index |}
        }
        |> Seq.sortByDescending(fun p -> p.Score)
        |> Seq.take topN

    for (index, product) in bestRecommendedProducts productId topN |> Seq.indexed do
        Common.printRed (sprintf "%d) Best match for product: %d is product: %d\t with score: %f" index productId product.ProductID product.Score)


runPredictionBestMatches 61 3

This output running the previous code 

The best 3 matches by score for product id 63 are 8, 99 and 481 with scores between 0.98 to 0.83, which are pretty good values.

As we “wrap” up this blog, I hope that with this new skill you are able to the most compatible recommendations for gifts to “wrap” and put under your Christmas tree!

Concurrency in .NET : Modern patterns of concurrent and parallel programming

Announcing 2 days deep dive training course in writing concurrent and distribute system in .NET

This course is a hands-on workshop will explore the powerful and accessible tool of parallel computation. This course teaches how to optimize and maximize performance of an application, and how to most effectively use multi-core computation, which is used across a range of industries and applications.

Become the master of the multicore domain. Learn how to harness the powers of parallel computation and multicore computation to dominate peer applications in finance software, video games, web applications and market analysis. To yield the most performance, computer programmers have to partition and divide computations to maximize the performance while taking full advantage of multicore processors. Start your path from Padawan to Jedi.

After this workshop, you will be ready to return to work and have code bend to your will. This course introduces you to technologies and tools available to developers at every level who are interested in achieving exceptional performance in applications.

This course is an intensive workshop in writing readable, more modular, and maintainable code in both C# and F#. These languages function at peak performance with fewer lines of code, resulting in increased productivity and successful programs. Ultimately, armed with new found skills, attendees will gain the knowledge needed to become experts at delivering successful, optimized, high-performance solutions. These solutions can adapt resource consumption, whether running locally, on OnPrem, or on Cloud infrastructures.

In this course, participants will have hands-on practice designing projects. By the end of the course, participants will know how to build concurrent and scalable programs in .NET using the functional paradigm.

 

See you there!!

Course details

 

The Traveling Santa Problem… a Neural Network solution

Once again the fantastic “F# Advent Calendar” organized by Sergey Tihon arrived bringing a festive spirit to all the F# enthusiasts.

The goal… helping Santa

This time, Santa has sent for help from his F# elves to help deliver toys to all the good boys and girls of the world. This year Santa is in a race to outpace all the Amazon drones and Jet deliveries, and is asking for our help to design the most efficient and effective flight path for his sleigh given a number of cities. This problem follows the format of the Traveling Salesman Problem. We will utilize Artificial Neural Network to design the perfect solution based on Elastic-Network learning algorithm.

The TSP is a combinatorial problem that involves Santa’s quest to visit a given number of cities and identifying the shortest path to travel between the cities. In addition, Santa is allowed to start and end the route from any city, but he can visit each city only once.

 

At first, this may not look like a complex program to solve, however, the number of possible combinations can dramatically (factorial) increase with the growth of the number of cities. For example, in the case of two cities the solution is only 1 path, with 5 cities there are 120 possible combinations, with 50, well… we have 30414093201713378043612608166064768844377641568960512000000000000 possible combinations. Understandably, a brute-force approach is not recommended, or Santa will be stuck in “recalculating” mode waiting on his MapQuest to come up with a route to deliver presents.

The Elastic Network we will use is a type of artificial neural network which utilizes unsupervised learning algorithms for clusterization problems and treats neural networks as a ring of nodes. The learning process keeps changing the shape of the ring, where each shape represents a possible solution.

 

Starting with Machine Learning

Machine learning is a fascinating and trendy topic these days, but like many other technologies it has its own terminology such as entropy error, resubstitution accuracy, linear and logistic regression that can sound intimidating at first.  Don’t let these terms turn you away, the effort you put into understanding Machine learning will be returned three fold.

In the last few months, I have started experimenting with and applying machine learning in different domains, including Natural Language Generation. In my work at STATMUSE, I am developing a Natural Language Processing back-end infrastructure in F# to provide Natural Language Interfaces for sports statistics. Check out this video to see a project interacting with Alexa (Amazon echo)

Thankfully, if you are a .NET developers, there many great resources to get more familiar with the topic. To name a few:

 

What’s the challenge?

In my opinion, the main challenge is figuring out how to map problems to a machine learning algorithm; as well as, how to choose the right machine learning algorithm to use. To help, the Microsoft Azure Machine Learning Team put together a very useful cheat-sheet here.

What really brought things home for me was seeing a video about Neural Networks in which James McCaffrey mentioned Neural Network as a Multi-Purpose machine learning algorithm!  See the video that spurred my revelation here.

 

What is a neural network?

An Artificial Neural Network (ANN) is an algorithm designed for pattern recognition and classification and can be used in many applications. Simply said, an ANN is composed by an interconnected set of artificial neurons. Even more fascinating is that computations are modeled and inspired after the biological brain in this learning algorithm, which can be applied to classification, recognition, prediction, simulation and many other different tasks.

Here, neurons are organized in sub-sets, called layers, where all the neurons in one layer are connected to all the neurons in the following layers.

For example, the following figure shows a three layered artificial neural network. The neurons from the first layer are propagated through the network and connected into to the second layer, and second to the third.

from the figure, a fully connected neural network with 3 inputs (the blue nodes), 4 nodes in the hidden layer (the green ones), and 2 nodes as outputs (the orange ones) has (3 * 4) + 4 + (4 * 2) + 2 = 26 weights and biases.  

The result of the ANN, referred to as output, relies on the weights of the connections between neurons. The most important concept is that ANN is capable of self-training to output distinct values determined by a particular given input.

The training of a neural network aims to find the values for the weights and biases that can be used to evaluate a set of given known inputs and outputs. The secret of a Neural Network is computing the most correct weight values. However, the Neural Network has to be trained to generate these values, which at the beginning are unknown.

 

Good News !

So, for the functional programmer enthusiast, you can think of a neural network as a mathematical function that accepts numeric inputs and generates numeric outputs.  Although, the value of the outputs is determined by other factors such as the number of layers, the activation functions, and the weights and bias values. Activation functions are used by the neurons to evaluate the inputs and calculate a relative output.

Neural Newark is fast!

Considering that ANNs are modeled after the biological brain, which in the case of a humans, means there are 10^16 synapses to execute multiple operations with a minimal energy.  Theoretically, ANNs could, achieve the same kind of performance, very impressive.

Neural Network can be faster… can run in parallel

The only thing more impressive than utilizing ANNs would be utilizing ANNs on a multicore platform applying concurrency…  That combination could effectively take Santa throughout the galaxy.

 

NOTE If you want to learn more on how to apply concurrency in your programming check out my book “Functional Concurrency in .NET”

It is time to code!

First, we are going to define each part that composes a ANN. Each component to compose the NN is defined using a RecordType. Ultimately we will run a benchmark to compare the sequential implementation, the parallel implementation with the variant of using the new F# 4.1 feature Struct RecordType.  More here

In F# 4.1, a record type can be represented as a struct with the [<Struct>] attribute. This allows records to now share the same performance characteristics as structs, without any other required changes to the type definition.

 

The User-Interface is all in F#

The User-Interface is WPF based, which implementation uses the FsXaml Type-Provider 

The graphic and animation are generated using the charts from FSharp.Charting library, specifically, the LiveChart

 

Neuron

type Neuron =

{ inputsCount : int

  output:float

  threshold:float

  weights : float array }

member this.item n = this.weights |> Array.item n
 

module Neuron =

let create (inputs : int) =

let inputs = max 1 inputs

{ inputsCount = inputs

threshold = rand.NextDouble() * rangeLen

output = 0.

weights = Array.init inputs (fun _ -> rand.NextDouble() * rangeLen  ) }

 

let compute (neuron : Neuron) (input : float array) =

let weigths = neuron.weights

[ 0..neuron.inputsCount - 1 ] |> Seq.fold (fun s i -> s + abs (weigths.[i] - input.[i])) 0.

The neuron is basic unit in a Neural Network. Each Layer in the ANN has a set of Neurons connected to the Neurons of the neighborhood layers, if any. In our ANN model, a Neuron contains the count of inputs and the output value, which is computed as the distance between its weights and inputs. More important, the weights array is used to train the NN to compute the best result.

The Neuron weights are initialized with random values, and successively updated in each iteration. The Threashold is a single value weight that can be used for Matrix operations, but irrelevant in our scenario.

 

Layer

type Layer =

{

neuronsCount : int

inputsCount : int

neurons : Neuron array

output : float array

}

member this.item n = this.neurons |> Array.item n

 

module Layer =

let create neuronsCount inputsCount =

let neuronsCount = max 1 neuronsCount

{ neuronsCount = neuronsCount

inputsCount = inputsCount

neurons = Array.init neuronsCount (fun i -> Neuron.create inputsCount)

output = Array.zeroCreate<float> neuronsCount }

 

let compute (inputs : float array) (layer : Layer) =

let neuronsCount = layer.neuronsCount

let output = Array.Parallel.init neuronsCount (fun i -> Neuron.compute layer.neurons.[i] inputs)

{ layer with output = output }

A layer is simply a collection of Neurons of a same type. To solve the Traveling Santa Problem, only a single layer is needed. The compute function re-evaluates the output of each neuron, which is an array operation that can be parallelized.

Since a ANN requires a considerable number of Array operations to compute results, it is very suitable for implementation in a parallel programming model making the tasks considerably faster in a multi-processor environment.

The F# Array module used, provides Parallel functionality, which can is used to distribute processor intensive work to all processors and threads on the system.

 

Network

type Network =

{

inputsCount : int

layersCount : int

layers : Layer array

ouputLayer : Layer

activation : Activation

output : float array

}

member this.item n = this.layers |> Array.item n

 

module Network =

let create inputsCount layersCount =

let layers = Array.init layersCount (fun _ -> Layer.create layersCount inputsCount)

{

inputsCount = inputsCount

layersCount = layersCount

layers = layers

ouputLayer = layers |> Array.last

activation = Activation.sigmoidActivation

output = [||]

}

 
let compute (network : Network) (input : float array) =

let layers = network.layers |> Array.Parallel.map(Layer.compute input)

{ network with layers = layers; ouputLayer = layers |> Array.last ; output = (layers |> Array.last).output }
 

let foundBestOutput (network : Network) =

network.ouputLayer.output

|> Seq.mapi(fun i o -> i,o)

|> Seq.minBy (fun (_, o) -> o)

|> fst

The Network is a record-type that contains and wraps the Layers of the NN. The compute function runs the re-computation of each underlying layers, which also in the case, the operation can be parallelized using the F# Array.Parallel module.

 

ElasticLearning

type ElasticLearning =

{ learningRate : float

learningRadius : float

squaredRadius : float

distance : float array

network : Network }

 

module NetworkLearning =

let create (network : Network) =

let neuronsCount = network.ouputLayer.neuronsCount

let delta = Math.PI * 2.0 / (float neuronsCount)

 

let rec initDistance i alpha acc =

match i with

| n when n < neuronsCount ->

let x = 0.5 * Math.Cos(alpha) - 0.5

let y = 0.5 * Math.Sin(alpha)

initDistance (i + 1) (alpha + delta) ((x * x + y * y)::acc)

| _ -> acc |> List.toArray

// initial arbitrary values

{ learningRate = 0.1

learningRadius = 0.5

squaredRadius = 98.

distance = initDistance 0 delta []

network = network }

 

let setLearningRate learningRate (learning : ElasticLearning) =

{ learning with learningRate = max 0. (min 1. learningRate) }

 

let compute (learning : ElasticLearning) (input : float array) =

let learningRate = learning.learningRate

let network = Network.compute learning.network input

let bestNetwork = Network.foundBestOutput network

let layer = network.ouputLayer

 

System.Threading.Tasks.Parallel.For(0, layer.neuronsCount - 1, fun j ->

for j = 0 to layer.neuronsCount - 1 do

let neuron = layer.item j

let delta = exp (-learning.distance.[abs (j - bestNetwork)] / learning.squaredRadius)

for i = 0 to neuron.inputsCount - 1 do

let n = (input.[i] - neuron.item i) * delta

neuron.weights.[i] <- neuron.weights.[i] + (n + learningRate)

) |> ignore

The ElasticLearning is an unsupervised learning algorithms, which desired output is not known on the learning stage, which lead to best result rather the perfect. The initialization (create function) sets some predefined arbitrary values. In the application, these values are configured in the WPF UI.

The “learning rate” value determinates how much each weight value can change during each update step, and can impact the training speed. A bigger value increases the speed of training but also increment hazard of skipping over optimal and correct weight values.

The compute function is used to train and to make the NN system learn from the updated weights, the inputs and the delta, which is measured according to the distance of the bestNetwork. The Array operation can be run in parallel updating the each Neuron weights values.

 

TravelingSantaProblem

type TravelingSantaProblem(neurons:int, learningRate:float, cities:(float*float)[]) =

let foundBestPath iterations = asyncSeq {

let network = Network.create 2 neurons

let trainer = NetworkLearning.create network

let fixedLearningRate = learningRate / 20.

let driftingLearningRate = fixedLearningRate * 19.

let input = Array.zeroCreate<float> 2

let iterations = float iterations

let citiesCount = cities.Length

let lenNeurons = neurons

let path = Array.zeroCreate<(float * float)> (lenNeurons + 1)

 

let getNeuronWeight (trainer:ElasticLearning) n w =

(trainer.network.ouputLayer.item n).item w

 

for i = 0 to (int iterations - 1) do

 

let learningRateUpdated = driftingLearningRate * (iterations - float i) / iterations + fixedLearningRate

let trainer = NetworkLearning.setLearningRate learningRateUpdated trainer

let learningRadiusUpdated = trainer.learningRadius * (iterations - float i) / iterations

let trainer = NetworkLearning.setLearningRadius learningRadiusUpdated trainer

 

let currentStep = rand.Next(citiesCount)

input.[0] <- cities.[currentStep] |> fst

input.[1] <- cities.[currentStep] |> snd

let trainer = NetworkLearning.compute trainer input

 

let path = Array.Parallel.init (lenNeurons) (

fun j -> if j = lenNeurons - 1

then ((trainer.network.item 0).item 0).item 0, ((trainer.network.item 0).item 0).item 1

else ((trainer.network.item 0).item j).item 0, ((trainer.network.item 0).item j).item 1)

 

yield ((int iterations - 1), path)

}

The TravelingSantaProblem type has a single function foundBestPath, which execute the Neural Network algorithm by performing the computation of the ElasticLearning, and yielding an updated path for each iteration. The code is quite self-explanatory, it glues each components of the Neural Network and computes them to obtain the best path result. The function uses the FSharp.Control.AsyncSeq (details hereto yield asynchronously each updated path, which is used to update the LiveChart.

 

Here below the partial code of the main WPF ViewModel.

 

let pathStream = Event<(float * float)[]>()

let pathObs = pathStream.Publish |> Observable.map(id)

 

let pointsStream = Event<(float * float)[]>()

let pointsObs = pointsStream.Publish |> Observable.map id

 

// more code here

 

let livePathChart = LiveChart.Line(pathObs)

let livePointsChart = LiveChart.Point(pointsObs)

let chartCombine = Chart.Combine([livePointsChart; livePathChart]).WithYAxis(Enabled=false).WithXAxis(Enabled=false)

let chart = new ChartControl(chartCombine)

let host = new WindowsFormsHost(Child = chart)

 

// more code here

 

let updateCtrl ui  (points : (float * float)[]) = async {

do! Async.SwitchToContext ui

pathStream.Trigger points  }

 

// more code here

 

this.Factory.CommandAsync((fun ui ->

async { let updateControl = updateCtrl ui

let tsp = TravelingSantaProblem(neurons.Value,learningRate.Value,cities)

do! updateControl i path }), token=cts.Token, onCancel=onCancel)

The implementation of the Neural Network to help Santa is done.  But lets take a look at the performance.

 

Benchmarking

Now, is time to benchmark and compare the sequential version of the NN with the parallel implementation that has been shown previously. In addition, we are comparing the performance of the same NN implementation but with the adoption of the new Struct RecordType introduced in F# .4.1. Here the graph of the benchmark.

The test has been executed on a 8 logical cores machine, and the Neural Network settings are:

  • Cities : 150
  • Neuron: 50
  • Itertaion: 30000
  • Learning Rate : 0.5

The parallel versions of the NN with Struct RecordType is the fastest.

The Neural Network that uses Reference RecordType produces (and updates) a large number of neuros, which increases the number of short-living objects in memory. In this case, the Garbage Collector is forced to perform several generations, which is impacting the performance negatively.

 

You can found the complete code implementation here.

The implementation is WPF based, but in the App.fs file you can uncomment some code to run the program as a console project.

 

In conclusion, I am far from being capable of building a self driving car or a Cyberdyne Systems series T-800 (aka Terminator), but ANN is not as complex as I had once thought.

 

What’s next?

I will speak at LambdaDays 2016  (here more details) “Fast Neural Networks… a no brainer!“, if you are in the are .. Please stop by and say !

I will talk about how to develop a fast Neural Network using a parallel K-Means algorithm for the training and leveraging GP-GPU.

 

This solution in the end ensures a Merry Christmas for all and for all a Happy New Year!

“The Ugly Duckling” – A Little Story About Functional Programming

Functional Programming has been around for possibly as long as the fable of the “ugly duckling”. The impetus for FP started in the 1930s thanks to Alonzo Church, a mathematician best known for formulating lambda calculus, a formal theory for expressing computation based on function abstraction and application using variable binding and substitution. The syntax and behavior of most programming languages today reflect this model. This early origin means that Functional Programming is a paradigm that existed from ideas older than the first programmable computer. Since then, functional languages have introduced several concepts later adopted by other languages. In 1950 the first functional language, Lisp came into use.

In 1959, in response to memory issues found in Lisp, John McCarty invented the Garbage Collector. Forty years later, mainstream languages such as Java and C# adopted the Garbage Collector. In 1960, the concept of first-class function was coined by Cristopher Strachey, who also invented the term currying, one of the fundamental principles of FP. In 1973, the functional language ML introduced the concepts of Type-Inference and parametric polymorphism. Forty years later these same concepts were introduced as “Generics” in mainstream languages. So as the story of the “ugly duckling” goes, a concept that initially was seen as strange and different eventually came into its prime. For FP that time is now because of its ability to maximize multicore core platforms.
In 1954 Fortran, the first imperative language was introduced in the market, and a year later Cobol, one other imperative language made its first appearance. Both these languages have had an enormous success in the software business world to the point that the imperative paradigm dominated the industry for thirty years.

In the 1970s, there was the need and interest in the Software industry for different and more practical solutions to solve increasingly complex problems. The rise of the Object-Oriented paradigm came from the growing demand for an improved programming model.
Since then, the OO has increasingly matured and evolved, today this paradigm is widely accepted by enterprises and has become the most popular paradigm in use.
The technology world is in a continuous and unstoppable evolution, the natural consequence of this expansion is more complex computing problems. Industry is realizing that FP is better than OO because of its declarative and expressive coding style. FP’s mathematical approach promotes immutability and functions without side effects allowing the programmer to solve the full spectrum of problems in less time and with less bugs. FP addresses computational problems in a manner similar to mathematics, ensuring the correctness of the program.

An increasing number of programming languages support the functional paradigm including C#, C++, Java and Python. In 1994, Python introduced support of lambda expression and list comprehension for data manipulation. Likewise, C# from the beginning has supported a functional paradigm for program writing, but the complexity of doing so has prevented programmers from using it. In 2007, C# 3.0 introduced first-class functions in the language and new constructs such as lambda expression and type inference to allow programmers to introduce functional programming concepts. Soon to follow was LINQ (Language Integrate Query), which permits a declarative programming style. More recently, in 2010, Microsoft introduced F# as a supported functional language in the .NET ecosystem. The latest generation of functional languages is impure because they allow side effects; the result of this variation is a reduced learning curve for those who embrace this paradigm for the first time. To combat unwanted side effects, the most successful implementation of functional languages today are “hybrids”. These functional languages bridge the gap between object oriented and functional paradigms allowing both programming styles.

If the current job demand is an indication of the future, interest in utilizing Functional Programming in applications will continue to increase, and so will the need for programmers who can bring this paradigm to business solutions.