How to parse a high rate stream of data with low memory allocation

When implementing a program, developers should set performance goals for the design up front. Performance is an aspect of software design that cannot be left for the final code optimization. Performance needs to be incorporated throughout the design, beyond code refactoring. Thus, performance should be included as an explicit goal from the start. It is very complex and highly expensive to redesign an existing application from the ground up, thus time spent on the design is worth the investment.  This tutorial aims to leverage a few new Microsoft libraries to implement highly performant data stream processing.

When writing performance-critical and scalable applications, developers have to keep in mind different aspects of the program, especially when these applications are sensitive to memory consumption. Fortunately, both the .NET Framework and .NET Core provide a set of tools to address these developer scenarios. In particular, Microsoft has introduced the Span<‘a> and Memory<‘a> types into the ecosystem with the release of .NET Core 2.1.  This version provides easy a scalable APIs that don’t allocate buffers and avoid unnecessary data copies. These types aim to reduce, and in many cases completely avoid GC generations, improving the performance (during GC generations all threads are stopped).

With Span, there is no allocation!

Quick tour of the Span<’a>   

Span<‘a> is a simple value type that allows you to work with any arbitrary contiguous block of memory, ensuring type safety with no-copy semantics, which reduces the allocation overhead almost completely.

When is Span<’a> is useful? 

To improve the performance of your program, you should reduce the allocations. Span<‘a> can help with this goal For example, when a .NET string is an immutable type, meaning every type of operation that “changes” the string also creates a new string allocating a new object on the heap. Memory allocations produce memory pressure, that ultimately create GC generations. The less GC generations there are in your application, the better the performance is.

Why is it so important? Let’s take for example the Substring method for the String type.  Anytime you want to substring a string, the .NET creates a new string, the necessary memory is allocated and then each character is copied into it. 

Here is the pseudo code 

let substring (source:string) startIndex length =
    let destinaiton = String(length)  
    Buffer.BlockCopy(source, startIndex, destinaiton, 0, length) 
    destinaiton 

let mySubstring (text : string) = substring text 0 5

This example calls the Substring only once but imagine the performance issues that would be created when parsing a CSV file where it’s called thousands of times.

The remedy is slicing, without managed heap allocations! 

The core function of the Span<’a> is slicing. This function does not copy or allocate any memory, it simply creates a Span with a different pointer and length.

Here is the pseudo code to implement a Substring function with no allocation.

let substring (source:string) startIndex length : ReadOnlySpan =
    source.AsSpan().Slice(startIndex, length)

After the AsSpan extension method, which converts the string into a ReadOnlySpan<char>, you can use the Slice function to point to a reference of a portion of the string without copying it. No allocation!

The .NET GC identifies the Span<’a> type, and has the native support for updating this reference when it’s needed during the Compact phase of garbage collection (when the underlying object like an array is moved, the reference needs to be updated, the offset must remain untouched).

Table comparison running 1000 times Substring using regular String Substring method and Span<char> Splice 

MethodString lengthAverage for
100 runs (ns)
Allocation
Substring10071.1781049 B
Slice1001.2590 B

It is noticeable that the Substring implemented using the Slice of Span<char> method is faster, but more importantly has zero allocation, which means no GC generation.

The Memory<’a> type overcomes the Span<’a>stack-only limitations 

Note that there are some limitations using the Span<’a> type due to the fact that it is a struct (value type).  For example, you cannot place the Span<’a> on the heap, use it inside collections, within an asynchronous operations, store it as a plan field, or even a boxing operation.

The Memory<’a> has overcome these limitations, which can be used with the full power of the .NET.  It can be freely used in generics, stored on the heap and used within asynchronous operations. 

In addition, whenever you need to manipulate or process the underlying buffer referenced by Memory<‘a>, you can access the Span<‘a> property and get efficient indexing capabilities. This makes Memory<‘a> a more general purpose and high-level exchange type than Span<‘a>.

If you are a performance junky like myself, and enjoy squeezing out as much performance as possible from your code, I recommend looking into these newly defined types Span<‘a> and Memory<‘a>. However, in this blog, I decided to cover a different set of tools, which are the driving force for Span: the System.IO.Pipelines

The System.IO.Pipelines is a set of tools that extensively use the Span<’a> (and Memory<’a>) types, and more importantly, abstract away most of the hard work required when building asynchronous and/or parallel processing code.  They are less known than TPL and dataflow but are a force multiplier for your code designs.  

The goal

There are many challenges to address when implementing a reactive and scalableapplication that has to process a high-rate stream of events (data) in near real-time.  A few of these challenges are keeping the memory pressure of the application low, minimizing the GC generations, and taming and controlling back-pressure. In addition, the program becomes more complicated if the stream of events to process requires you to parse the incoming data with a delimiter. This means that we have to process the data for every given delimiter that exists.  

For example, if you want to process a continuous stream of data in chunks, the typical code you would write would look like this: 

let processStream (stream : Stream) (process : byte [] -> unit) = async {
    let bufferSize = 0x100
    let buffer = Array.zeroCreate bufferSize
    let! bytesRead = stream.AsyncRead(buffer, 0, bufferSize)
    // Process a single chunk of data from the buffer
    do process (Array.sub buffer 0 bytesRead)
 } 

The mission of this blog: This code has several problems, the full message (chunk of data) to process might not have been fully received in a single call to AsyncRead.  Additionally, it may be the case where multiple messages come back in a single AsyncRead call and the code cannot read it.

When addressing code that reads from Streams, especially reading streams from the network, the code can’t assume that it will get everything in a single call to AsyncRead. A read operation on a Stream can return nothing (which indicates the end of the data), or it could fill the buffer, or return a single byte despite a bigger buffer. How can we fix this?

The mission of this blog    

For the example of this blog we will look into a video surveillance system, which reads continuously from different streams of data originated from IP-video cameras. The goal is to implement a scalable, low memory and low CPU consumption application, since we deploy and run the system in a mobile device, where low resource utilization is very important in order to preserve the battery life of the device.

Scenario: I recently moved to Charlotte NC, in my back yard I receive a daily visit from a group of deer, so I decide to install a feeder, and of course a couple of cameras to be notified and watch when the deer pay us a visit. Thus, I decided to implement a video surveillance system that I could run from my phone.

Goal: First, in order to implement a single camera video surveillance system, employ a basic implementation.  Then analyze the performance problems to be solved.  Next, design application enhancements using the System.IO.Pipelines .NET library, which is designed specifically for implementing high performance IO operations. Ultimately, the goal is to enable four cameras in a multi camera video surveillance system and analyze/compare the resource consumption. The system implemented in this tutorial allows viewing simultaneously from multiple cameras on an iOS mobile device.  The application should be fully asynchronous to ensure responsiveness to users. 

IP Camera (MJPEG support)

The IP cameras (and servers) can be accessed over an IP network, which allows monitoring not only from the actual location of these cameras, but also allows the viewing from any other IP-enabled point of the world using special video surveillance applications.  The digital output of IP-enabled video cameras/servers allows them to be easily integrated with different type of applications.

How Does It Work?

The camera used in the example is a D-Link DCS 8525LH Network Camera, which supports MJPEG (motion JPEG) streams. The MJPEG is a popular format that allows you to download not just a single JPEG image, but a stream of JPEGs. You can think the MJPEG as a video format where each frame of video is sent as a separate, compressed JPEG image.  For more information, see the MJPEG article on Wikipedia.

First of all, we have to send an asynchronous request to an IP-Camera MJPEG URL, then a multipart response is received back to the caller. The challenging part of the code implementation is parsing the multipart stream into the separate images received.  The viewer has to display those JPEG images as quickly as they are parsed to generate the video.  

First implementation (without IO.Pipelines and Span<‘a>)

The first implementation is based on code that does not use the latest .NET Core feature such as Span<‘a>, Memory<‘a> and Channels. This implementation provides a baseline to compare the performance enhancement gained by the implementation that exploits these types. 

Note: In this example, we focus on the important code without being distracted by the details of the UI implementation. The code example is based on Xamarin.Forms, which uses the Image to display imageson a page. The full source code is downloadable from the GitHub link.

The application has the following steps:

To be able to process and render each frame received separately when reading from a stream of data, we have to confront some common pitfalls. We have to buffer the incoming data until we have found a new frame boundary, and then we have to parse the entire buffer returned.

The following code is the first implementation that demonstrates how you can display MJPEG camera stream.

The application has the following steps:

  • The client application does an HTTP request to a special camera’s URL, like http://192.168.20.49/axis-cgi/mjpg/video.cgi
  • The IP-camera replies to this request with a stream of JPEGs delimited with a special delimiter, which is specified in one of the HTTP headers.
  • It then streams the response data
  • Looks for a JPEG header marker
  • Then reads until it finds the boundary marker
  • Copies the data into a buffer
  • Decodes the buffer 
  • Renders the image 
  • Starts over

To be able to process and render each frame received separately when reading from a stream of data, we have to confront some common pitfalls. We have to buffer the incoming data until we have found a new frame boundary, and then we have to parse the entire buffer returned.

The following code is the first implementation that demonstrates how you can display MJPEG camera stream.

let frameHeader = [| 0xffuy; 0xd8uy |]

let chunk = 1048576 // 1 Mb

let findStreamDelimiter (search:byte []) (buffer:byte []) =
   let rec findHeaderArray (arr:byte []) (index:int) : int =
        if Array.isEmpty arr then -1
        elif arr.Length < (search.Length - 1) then -1
        elif arr.[index] = search.[0] && arr.[1 .. search.Length - 1].SequenceEqual search.[1..] then index
        else findHeaderArray (Array.tail arr) (index + 1)
   findHeaderArray buffer 0

let renderStream (request : HttpWebRequest) (progress : IProgress) = async {
     // on the response we grab and use the Content-Type header 
     // to determine the boundary marker that will be sent between JPEG frames
    use! response = request.AsyncGetResponse()

    // find our magic boundary value
    let frameBoundary = response.Headers.["Content-Type"].Split([| '=' |]).[1]

    let frameBoundaryValid = 
        if frameBoundary.StartsWith("--") then frameBoundary
        else "--" + frameBoundary

    let frameBoundaryBytes = Encoding.UTF8.GetBytes(frameBoundaryValid)        

    let imageBuffer = Array.zeroCreate(chunk)
    use cameraStream = response.GetResponseStream()

    // Streams the response data, looks for a JPEG header marker, 
    // then reads until it finds the frame boundary marker, copies the data into a buffer, 
    // decodes it, passes the result to the IPorgress notification to render the image, then starts over.
    let rec readFrames (readStream : Async) = 
        async { 
            let! buffer = readStream
            // find the JPEG header
            let imageStart = findStreamDelimiter frameHeader buffer
            if imageStart <> -1 then 

                // copy the start of the JPEG image to the imageBuffer
                let size = buffer.Length - imageStart
                Buffer.BlockCopy(buffer,imageStart, imageBuffer, 0, size)

                let rec readFrame (buffer : byte array) accSize = 
                    async { 
                        // find the boundary end
                        let imageEnd = findStreamDelimiter frameBoundaryBytes buffer 
                        if imageEnd <> -1 then 

                            // copy the remainder of the JPEG to the imageBuffer
                            Buffer.BlockCopy(buffer, 0, imageBuffer, accSize, imageEnd)
                            let sizeIncrImageEnd = accSize + imageEnd
                            // create a single JPEG frame
                            let frame = Array.zeroCreate (sizeIncrImageEnd)
                            Buffer.BlockCopy(imageBuffer, 0, frame, 0, sizeIncrImageEnd)

                            // report new frame to render to the UI
                            progress.Report( frame )

                            // copy the leftover data to the start
                            Buffer.BlockCopy(buffer, imageEnd, buffer, 0, buffer.Length - imageEnd)

                            // fill the remainder of the buffer with new data and start over
                            let! tempBuffer = cameraStream.AsyncRead(imageEnd)
                            Buffer.BlockCopy(tempBuffer, 0, buffer, buffer.Length - imageEnd, tempBuffer.Length)
                            return buffer
                        else 
                            // copy all of the data to the imageBuffer
                            Buffer.BlockCopy(buffer, 0, imageBuffer, accSize, buffer.Length)
                            let! data = cameraStream.AsyncRead(chunk)
                            return! readFrame data (accSize + buffer.Length)
                    }
                let! data = cameraStream.AsyncRead(chunk)
                return! readFrames (readFrame data size)
        }
    do! readFrames (cameraStream.AsyncRead(chunk)) 
}


let startCameraStreaming (credential:System.Net.NetworkCredential) (cts:CancellationTokenSource) (cameraUrl:Uri) =
    let request = HttpWebRequest.Create(cameraUrl) :?> HttpWebRequest
    request.Credentials <- credential

// takes the raw byte buffer which contains an undecoded JPEG image, and update the image
let processFrame (image : Image) = 
    { new IProgress with 
        member __.Report( frameBuffer : byte[] ) = 
            image.Source <- ImageSource.FromStream(Func(fun () -> new MemoryStream(frameBuffer) :> Stream))
        }

type CameraStreaming(image : Image, credential:System.Net.NetworkCredential, cameraUrl:Uri) =
 
    let cameraStreaming () = 
        let cts = new CancellationTokenSource()
        let request = HttpWebRequest.Create(cameraUrl) :?> HttpWebRequest
        request.Credentials <- credential
        let processFrameImage = processFrame image
        Async.Start(renderStream request isCameraConnected processFrameImage, cts.Token)
        { new IDisposable with 
            member __.Dispose() = cts.Cancel() }

    member self.Start() = cameraStreaming()  
  1. find the JPEG header
  2. copy the start of the JPEG image to the imageBuffer
  3. find the boundary end (end of the image)
  4. copy the remainder of the JPEG to the imageBuffer
  5. create a single JPEG frame
  6. report new frame to render to the UI
  7. copy the leftover data to the start
  8. fill the remainder of the buffer with new data and start over

there are several parts in this code.

The frameHeader  is a byte array that the MJPEG cameras use to delimit the images. This delimiter acts as buffer boundary. 

The function findStreamDelimiter helps to find the boundaries in the stream, matching the frameHeader value within buffer received to process each image separately.  

The readerStream is the core function, which grabs the camera stream cameraStream from the HTTP response, determines the representation of the image (frame) boundary, and then starts the loop readFrames to read and process the stream of data and render the images as a video.

The initial buffer imageBuffer is set with size of 1 Mb (chunk), which should be enough for the images to buffer.  The code in the body of the recursive function readFrames and the sub-function readFrame have several byte array operations, which in sequence correspond to:

The CameraStreaming type acts as a class that encapsulates the code implementations, and exposes the main method to Start the streaming, which returns a Disposable object to stop the streaming.  The view is updated as a new image and is available using the IProgress pattern.

The IProgress pattern

The asynchronous programming model in the .NET provides the IProgress<’a> pattern that facilitates updates and/or notifications in the background. The pattern is highly productive and solves common developer challenges. The usage is simple, on the client/consuming side of your code, on the UI thread, you have to define an event handler that will be called when IProgress<’a>.Report is invoked. The code below shows how this pattern is utilized in the context of an F# Object Expression.

// takes the raw byte buffer which contains an undecoded JPEG image to update the image
let processFrame (image : Image) = 
    { new IProgress with 
        member __.Report( frameBuffer : byte[] ) = 
            image.Source <- ImageSource.FromStream(Func(fun () -> new MemoryStream(frameBuffer)))
        } 

The Progress<’a> catches the current SynchronisationContext when it is instantiated then, whenever the method Report is called, the captured context is used for the updates. This pattern avoids cross thread issues with updating the UI.

First code implementation thoughts 

This code works, but there are few problems.  First of all, it might work in local testing but it’s possible that the image buffer is bigger than 1 Mb.  For this reason, we have to resize the input buffer until we have found the next header delimiter, which results in more buffer copies.  In addition, the code keeps allocating buffers on the heap as bigger images are processed, which uses more memory as the logic doesn’t shrink the buffer after the images are processed.  One possible improvement could be introducing an ArrayPool<byte> to avoid repeated buffer allocations. This would also introduce more code complexity. 

Running the application

The code below runs and renders a single camera stream using the first implementation.

open Xamarin.Forms
open Xamarin.Forms.Xaml

type MainPage() =
    inherit ContentPage()

    let camersUrl = "http://192.168.20.48/axis-cgi/mjpg/video.cgi"

    let _ = base.LoadFromXaml(typeof)
    let cameraSource = new Image ( Aspect = Aspect.AspectFit )

    let credentials = new NetworkCredential("admin", "myPa55w0rd");
    let camera = CameraStreaming(image, credentials, url)

    let mainStack = base.Content :?> StackLayout
    mainStack.Padding <- new Thickness (0, 10, 0, 0)
    mainStack.VerticalOptions <- LayoutOptions.StartAndExpand
    mainStack.HorizontalOptions <- LayoutOptions.CenterAndExpand
    mainStack.Children.Add(camera.Image)
    do camera.Start() 

The code creates an instance of the CameraStreaming, add an Image control into a StackLayout of the main page, then start the video streaming.

The instance on the Image sets the property Aspect, which sizes the image within the bounds of the display. 

Here below is the output: 

In terms of performance, the application runs with an average consumption of 15% CPU, and about 120Mb of memory pressure. What could happen to the total resource consumption if there were more streaming cameras running?  These values could be acceptable in a Desktop or Server application, but for a mobile device we should reduce these values being careful not to jeopardize the quality of the video streaming.

NOTE: for simplicity, the code in the blog does not have error handling support, which is important when working with network connections. The code downloadable implements safeguard error handling.  

A performant implementation - System.IO.Pipelines in action

In this second implementation of the Video surveillance application, we will look into and exploit the recently added library System.IO.Pipelines to the .NET Core 2.1. This library is used to optimize the resource consumption and maximize the performance of the streaming application.

What is IO.Pipelines ?

IO.Pipelines were introduced with the .NET Core 2.1, and although they haven’t garnered wide popularity among developers, they are extensively used internally by Microsoft to implement performance critical applications.  In fact, this technology is designed to make it easier to do high performance IO operations, and was used to make Kestrel one of the fastest web servers in the industry. 

IO.Pipelines aim to achieve high performance code removing the complexity that in many cases is required to write such high performance application. IO.Pipelines expose a reader/writer access semantic to a binary stream, including thread safety, buffer management and safeguard via back-pressure.

Below is the second implementation of the Video Surveillance application.  Code unchanged in the previous implementation is not shown. 

let frameHeader = [| 0xffuy; 0xd8uy |].AsSpan()

let findStreamDelimiter (search:Span) (buffer:Span) : Nullable =
   let rec findHeaderArray (buffer:Span) (index:int) : int =
        if buffer.Length = 0 || buffer.Length < (search.Length - 1) then Nullable()
        elif buffer.[index] = search.[0] && buffer.Slice(1).SequenceEqual(search.Slice(1)) then
            Nullable (SequencePosition(buffer.GetPosition(index)))
        else findHeaderArray (buffer.Slice(1)) (index + 1)
   findHeaderArray buffer 0

let writePipe (stream : Stream) (writer : PipeWriter) =
    let rec writeLoop () = task {            
        // Allocate at least 1024 bytes from the PipeWriter
        let memory : Memory = writer.GetMemory(minimumBufferSize)
        try
            let! bytesRead = stream.ReadAsync(memory) 
            if bytesRead = 0 then writer.Complete()
            else
                // Tell the PipeWriter how much was read from the Socket
                writer.Advance(bytesRead)
        with
        | _ -> writer.Complete()
        // Make the data available to the PipeReader
        let! result = writer.FlushAsync()
        if result.IsCompleted then  writer.Complete()
        else return! writeLoop()
    }
    writeLoop ()     

let readPipe (reader : PipeReader) (progress : IProgress)=
  let rec readLoop (reader : PipeReader) = task {
      let! result = reader.ReadAsync()
      let buffer : ReadOnlySequence = result.Buffer
      let! (bufferAdvanced:ReadOnlySequence) = findStreamDelimiter reader buffer
      do reader.AdvanceTo(bufferAdvanced.Start, bufferAdvanced.End)
      if result.IsCompleted then reader.Complete()
      else return! readLoop reader
    }
  and findHeaderMarker (reader : PipeReader) (buffer : ReadOnlySequence) = task {
      let position = findStreamDelimiter buffer frameHeader
      if position.HasValue then 
             progress.Report(buffer.Slice(0, position.Value))
             return! findHeaderMarker reader (buffer.Slice(buffer.GetPosition(1L, position.Value)))
      else return buffer }    
  readLoop reader               

let processFrame (request : HttpWebRequest) (progress : IProgress) = task {
    use! response = request.AsyncGetResponse()

    let frameBoundary =
        let contentType = response.Headers.["Content-Type"]
        let frameBoundary = contentType.AsSpan().Slice(contentType.IndexOf('=') + 1)

        if frameBoundary.StartsWith(boundyFrameDelimiter) then
            Encoding.UTF8.GetBytes(frameBoundary)
        else
            let frameBoundary = appendSpans boundyFrameDelimiter frameBoundary
            Encoding.UTF8.GetBytes(frameBoundary)

    use cameraStream = response.GetResponseStream()
        

    let pipe = new Pipe()

    let writing = writePipe cameraStream pipe.Writer
    let reading = readPipe pipe.Reader
    do! Task.WhenAll(reading, writing) }

There are several changes in the core of the code implementation.  The function processFrame, as the previous implementation, grasps the stream from the HTTP response received by the Camera to process the video stream. However, notice that the array of bytes, like frameHeader, and the relative operations, like findStreamDelimiter, are using the Span<byte> type to reduce allocations. In particular, the function findStreamDelimiter, “slices” the array of bytes for comparison and index offsetting instead of generating new arrays. This technique provides some benefits, but there is more.

Later in the same function processFrame, the pipe instance of the Pipe type (from the System.IO.Pipelines namesapce) is roughly comparable to a MemoryStream, with the only (huge) difference being able to rewind it many times, the data is treated like a traditional FIFO queue. In general, you can think the Pipe as a buffer that sits between a high performant producer/consumer, where we have a producer pushing data in at one end, and a consumer that pulls the data out at the other side. 

In this particular implementation, the IO read and write operations on pipes are asynchronous, so we pass a PipeWriter instance to the writing  function, which does our writing, and an instance of the PipeReader to the reading function that does our reading. Then, we wait asynchronously in the Task.WhenAll method that the writing the data calls Complete() on the PipeWriter.

The pipelines pipe of our video streaming reader has two asynchronous functions:

  • writePipe reads asynchronously from the HTTP response stream, and then writes into the PipeWriter
  • readPipe reads asynchronously from the PipeReader and parses/decodes incoming images


Unlike the original implementation, there are no explicit Buffer.BlockCopy calls and buffers allocated anywhere. The impact is enormous, reducing memory allocation and in some aspects, improving the CPU utilization. This is the main purpose of exploiting Pipelines. In addition, the code is simpler and easier for the developer to focus solely on the business logic instead of complex buffer management.

NOTE: In this implementation, to simplify the access and interop of the System.IO.Pipelines asynchorouns operation, we use the task {} computation expression in place of the idiomatic F# async {} computation expression. The reason is that direct utilization of the task-based operations, without converting them into an F# Async type, produce better performance. For more details see this link

In a more in-depth analysis of the code, the function writePipe starts with calling the GetMemory(int) method of the underlying PipeWriter instance to get memory available.  Then, the Advance(int) method of the PipeWriter instance notifies the PipeWriter the amount of data that is written to the buffer.  Next, it is important to call the PipeWriter.FlushAsync(), which makes the data available to the PipeReader in the function readPipe.

It is important to note, that the data we receive from the PipeWriter is passed without any extra allocation and data copying. The pipelines behave simply, but in a very sophisticated and intelligent way, as a buffer between the PipeWriter and the PipeReader without any need to copy data around.

The recursive function readPipe consumes the buffers written by the PipeWriter, which originates from the HTTP video stream. When the call to the PipeReader.ReadAsync completes, it returns a ReadResult type, which exposes two central properties; the data read in the form of ReadOnlySequence<byte> and the IsCompleted flag notifying when the PipeWriter reaches the end of the stream.  Then, the function findStreamDelimiter finds the end of the image boundary to parse and render the buffer. In this function, the arrays of bytes are sliced (Slice method of the Span<’a>) to avoid re-processing and allocating the same data. The GetPosition method of the ReadOnlySequence<byte> returns a relative position of the delimiter frameHeader passed, which defines the beginning of an image in the video stream.

Next, the method PipeReader.AdvanceTo completes a single write operation and notifies the PipeReader the quantity of data consumed. This function makes the data available in the pipe to be consumed by the PipeReader.

The underlying Pipe implementation persist a linked list of buffers that passes within the PipeWriter and PipeReader. For example, the PipeReader defines a ReadOnlySequence<’a> type as a view over a set of segments of ReadOnlyMemory<‘a>, similar to Span<’a> and Memory<’a>. In other words, the Pipe implementation keeps pointers of the location of the reader and writer in the global allocated data, and it updates them when the data is either written or read. 

Auto Back pressure handling 

Another important characteristic of pipelines is that they support and contend with back-pressure. 

In a perfect world, reading and processing streams of data run at the same speed, in perfect balance. However, in the real World, this scenario is very rare. Normally, processing and parsing chunks of data takes more time than just copying or moving blocks of data from the stream. Consequentially, the producer thread can easily overwhelm the consumer thread. The result is that the producer thread will have to either slow down, or allocate more memory to store the data for the consumer thread, which is not a sustainable long-term solution. For ideal performance, there should be an equilibrium between frequent pauses and allocating more memory. Pipe solves this problem by controlling the flow of data, regulating how much data should be buffered before call the PipeWriter.FlushAsync method, and determining how much the consumer has to process before the producer can resume.

For example, if the PipeReader is not able to process the data at the same speed as the PipeWriter, the reader can push the data back into the pipe explicitly identifying the length of what was read.  The PipeWriter adjusts its rate in response to this information.  This way the Pipe negotiates between the PipeReader and PipeWriter, preventing the PipeReader from being overwhelmed by the stream of incoming data.  On the other side, the PipeWriter writes the chunk of the data into the pipe, and then calls the flush method to ensure that the pipe applies back-pressure.   

Below is the output:

In terms of performance, the application runs with an average of 8% CPU consumption, and about 36Mb of memory pressure. 

Here is the code for the UI Xamarim Forms:

type StreamManager private () =
    static let instance = System.Lazy<_>(fun () ->  
            new Microsoft.IO.RecyclableMemoryStreamManager())
    static member Instance = instance.Value


type MainPage() =
    inherit ContentPage()

    let camersUrls = [
        "http://192.168.20.48/axis-cgi/mjpg/video.cgi"
        "http://192.168.20.49/axis-cgi/mjpg/video.cgi"
        "http://192.168.20.50/axis-cgi/mjpg/video.cgi"
        "http://192.168.20.51/axis-cgi/mjpg/video.cgi"
        ]

    let _ = base.LoadFromXaml(typeof)

    let cameraSources = 
        List.init camersUrls.Length (fun i -> new Image ( Aspect = Aspect.AspectFit ), Uri(camerasUrls.[i]))

    let credentials = new NetworkCredential("admin", "myPa55w0rd");

    let cameras = 
        cameraSources
        |> List.map(fun (image. url) -> CameraStreaming(image, credentials, url))

    let mainStack = base.Content :?> StackLayout
    mainStack.Padding <- new Thickness (0, 10, 0, 0)
    mainStack.VerticalOptions <- LayoutOptions.StartAndExpand
    mainStack.HorizontalOptions <- LayoutOptions.CenterAndExpand

    do
        for camera in cameras do
            mainStack.Children.Add(camera.Image)
            camera.Start() 

As you can see, in this case we are running the application with four different video IP-cameras. 

Performance tip: recycling MemoryStream

The .NET programming languages rely on a mark-and-sweep GC that can negatively impact the performance of a program that generates a large number of memory allocations due to GC pressure. This is a performance penalty that the previous code pays when creating a System.IO.MemoryStream instance for each Image, including its underlying byte array. 

The quantity of memory stream instances increases with the number of the chunks of data to process, which can be hundreds in a large stream. As that byte array grows, the MemoryStream resizes it by allocating a new and larger array, and then copying the original bytes into it. This is inefficient, not only because it creates new objects and throws the old ones away, but also because it has to do the legwork of copying the content each time it resizes. 

One way to alleviate the memory pressure that can be caused by the frequent creation and destruction of large objects is to tell the .NET GC to compact the large object heap (LOH) using this setting:

GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce

This solution, while it may reduce the memory footprint of your application, does nothing to solve the initial problem of allocating all that memory in the first place. A better solution is to create an ObjectPool, also known as pooled buffers, to pre-allocate an arbitrary number of MemoryStream that can be reused.

Microsoft has released a new object, called RecyclableMemoryStream, which abstracts away the implementation of an ObjectPool optimized for MemoryStream, and minimizes the number of large object heap allocations and memory fragmentation. The discussion of RecycableMemoryStream is out of the scope of this article. For more information refer to the MSDN online documentation.

Here below is the output of the application processing fours video IP-camera streams

In terms of performance, the application runs almost exactly as the previous one with only one video stream. In this case, the application runs with the average of 9% CPU consumption, and about 51Mb of memory pressure.

The System.IO.Pipelines is part of the new .NET Core types whose aim is to improve performance by drastically reducing (almost zeroing) memory allocation.  The results garnered by implementing pipelines in my code far surpassed my expectations.  The phenomenal performance gains here have proven to be a valuable tool which I plan to implement in future applications.  

If you are interested in learning more about increasing performance in applications through the use of concurrency, multi-core parallelism, and distributed parallelism join me in Paris or London for my Concurrent Functional Programming in .NET Core course.  

ParisApril 25th-26th, 2019Link
LondonApril 29th-30th, 2019Link


Happy Holidays to you and your family, looking forward to wonderful new year with the vibrant F# community!  

2 thoughts on “How to parse a high rate stream of data with low memory allocation

  1. After looking over a few of the blog posts on your web site, I honestly appreciate your technique of writing a blog. I saved as a favorite it to my bookmark webpage list and will be checking back in the near future. Please check out my web site as well and let me know how you feel.|

Leave a Reply

Your email address will not be published. Required fields are marked *