Parallelizing Async Tasks with Dependencies

Design your code to optimize performance

 

A little while ago, I had the requirement to write a tool that could execute a series of Async I/O tasks; each with a different set of dependencies, which influenced the order of the operation. This can be addressed simply with sequential execution, however if you want to maximize the performance, sequential operations just wont do – you must build the tasks to run in parallel.
To optimize performance these tasks need to be scheduled based on the dependency and the algorithm must be optimized to run the dependent tasks in serial as necessary and in parallel as much as possible.
Below is a typical example of a data structure such as a Graph, which can be used to represent scheduling constraints. A graph is an extremely powerful data structure in computer science that gives rise to very powerful algorithms.
A graph data structure consists of two basic elements:
Vertex – A single node in the graph, often encapsulates some sort of information.
Edge – Connects one or two vertices. Can contain a value.

A graph is collection of vertices connected by edges.
The implication of a directed graph leads to an expressive programming model. By using directed graph it is easy to enforce the one-way restriction. The definition says that a directed graph is a set of vertices and a collection of directed edges. Each directed edge connects an ordered pair of vertices. Here each task is represented as a Node in a graph, and the dependency between two nodes is represented by a direct edge.
dag

In this representation of a Directed Graph, Node 1 has dependencies on Node 4 and 5, Node 2 depends on Node 5, Node 3 has dependencies on Node 5 and 6 and so on.

See the code below for a simple implementation of a Directed Graph. The implementation is not the most elegant. It follows the imperative paradigm with mutable variable, but for demo purpose you get the idea of implementation.

type DirectGraph(value:int) =
let mutable e = 0

let vertices = Array.init value (fun _ -> List())

member this.Vertex = value
member this.Edge = e

member this.getVertex(v:int) = vertices.[v]

member this.AddEdge(v:int, w:int) =
vertices.[v].Add(w)
e

This code is a good starting point, but there are some problems.

How can we ensure that all the edges required have been registered? Consider if Node 2 with dependencies to Node 7 and 8 is registered, but maybe Node 8 isn’t. Moreover, it could happen that some edges depend on each other, which would lead to a Directed Cycle. In the case of a Directed Cycle, it is critical to run some tasks in parallel; otherwise certain tasks could wait on another forever in a deadlock.

Another problem is registering a set of computations that have an order and precedence constraint. This means that some tasks must complete before some other task is begun. How can the system enforce and verify that all the tasks are completed while still respecting the ordered constraint?
The solution is called Topological Sort, which means that I can order all the vertices of the graph in such a way that all its directed edges target from a vertex earlier in the order to a vertex later in order. For example, if a Task A must be completed before Task B, and Task B must be compete before Task C which must complete before Task A; then there is a cycle reference and the system will notify of the mistake by throwing an exception. If a precedence constraint has a direct cycle, then there is not a solution. This kind of checking is called Directed cycle detection.
If a Directed Graph has satisfied these rules, it is considered a Directed Acyclic Graph (DAG), which is primed to run several tasks, which have dependencies in parallel.
The link here is a great article providing additional information about DAGs Paralleling operation with dependencies

Back to the figure above, Task 7 and 8 run in parallel. As soon as Task 8 complete the Task 5 starts and Task 6 will run after Task 7 and 8 are both complete, and so on.
Let’s implement the DAG solution applying the strategy we have learn here to run in tasks in parallel while respecting the order of the dependencies for increasing the performance.

Now, let’s define the data type representing the Task

Type TaskInfo =
{ Context : System.Threading.ExecutionContext
Edges : int array
Id : int
Task : unit -> unit
NumRemainingEdges : int option
Start : DateTimeOffset option
End : DateTimeOffset option }

The Type TaskInfo contains and keeps track of the details of the registered task, the id, function operation and dependency edges. The execution context is captured to be able to access information during the delayed execution such as the current user, any state associated with the logical thread of execution, code-access security information, and so forth. The start and end for the execution time will be published when the event fires.

member this.AddTask(id, task, [<ParamArrayAttribute>] edges : int array) =
let data =
{ Context = ExecutionContext.Capture()
  Edges = edges
  Id = id
  Task = task
  NumRemainingEdges = None
  Start = None
  End = None }

dependencyManager.Post(AddTask(id, data))

The purpose of the function AddTask is to register a task including arbitrary dependency edges. This function accepts a unique id, a function task that has to be executed and a set of edges which are representing the ids of other registered tasks which all must all be completed before the current task can executed. If the array is empty, it means there are no dependencies.

[<CLIEventAttribute>]
member this.OnTaskCompleted = onTaskCompleted.Publish

The event OnTaskCompleted triggered each time a task is completed providing details such as execution time.

member this.ExecuteTasks() = dagAgent.Post ExecuteTasks

The function ExecuteTasks starts the process executing the tasks.

The core of the solution is implemented using a MailboxProcessor (aka Agent) which provides several benefits. Because the natural thread-safety of this is primitive, I can use .NET mutable collection to simplify the implementation of DAG. Immutability is an important component for writing correct and lock-free concurrent applications. Another important component to reach a thread safe result is isolation. The MailboxProcessor provides both concepts out of the box. In this case, I am taking advantage of isolation.

Overall is about finding the right tool for the job and being pragmatic in your approach,  in this case the .NET generic mutable collections work.

The MailboxProcessor named dagAgent is keeping the registered tasks in a current state “tasks” which is a map (tasks : Dictionary<int, TaskInfo>) between the id of each task and its details. Moreover, the Agent also keeps the state of the edge dependencies for each task id (edges : Dictionary<int, int list>). When the Agent receives the notification to start the execution, part of the process involves verifying that all the edge dependencies are registered and that there are no cycles within the graph.

let verifyThatAllOperationsHaveBeenRegistered (tasks:Dictionary<int, TaskInfo>) =
            let tasksNotRegistered =           
                tasks.Values                 
                |> (Seq.collect (fun f -> f.Edges) >> set)
                |> Seq.filter(tasks.ContainsKey >> not)
            if tasksNotRegistered |> Seq.length > 0 then
                let edgesMissing = tasksNotRegistered |> Seq.map (string) |> Seq.toArray 
                raise (InvalidOperationException
                     (sprintf "Missing operation: %s" (String.Join(", ", edgesMissing))))
let verifyTopologicalSort(tasks:Dictionary<int, TaskInfo>) =
    // Build up the dependencies graph
    let tasksToFrom = new Dictionary<int, MList>(tasks.Values.Count, HashIdentity.Structural)
    let tasksFromTo = new Dictionary<int, MList>(tasks.Values.Count, HashIdentity.Structural)

    for op in tasks.Values do
    // Note that op.Id depends on each of op.Edges
        tasksToFrom.Add(op.Id, new MList(op.Edges))
        // Note that each of op.Dependencies is relied on by op.Id
        for deptId in op.Edges do
        let success, _ = tasksFromTo.TryGetValue(deptId)
        if not <| success then tasksFromTo.Add(deptId, new MList())
        tasksFromTo.[deptId].Add(op.Id)

    // Create the sorted list
    let partialOrderingIds = new MList(tasksToFrom.Count)
    let iterationIds = new MList(tasksToFrom.Count)

     let rec buildOverallPartialOrderingIds() =  
           match tasksToFrom.Count with
           | 0 -> Some(partialOrderingIds)
           | _ ->  iterationIds.Clear()
                        for item in tasksToFrom do
                            if item.Value.Count = 0 then
                                iterationIds.Add(item.Key)
                                let success, depIds = tasksFromTo.TryGetValue(item.Key)
                                if success = true then
                                    // Remove all outbound edges
                                    for depId in depIds do                                  
                                        tasksToFrom.[depId].Remove(item.Key) |> ignore
                        // If nothing was found to remove, there's no valid sort.
                        if iterationIds.Count = 0 then None
                        else
                            // Remove the found items from the dictionary and 
                            // add them to the overall ordering
                            for id in iterationIds do
                                tasksToFrom.Remove(id) |> ignore
                            partialOrderingIds.AddRange(iterationIds)
         buildOverallPartialOrderingIds()   

If any of the validations are not satisfied, the process is interrupted and an error is thrown. For demo purposes only the message is printed in the console, a better solution should be to provide a public handler.

inbox.Error |> Observable.add(fun ex -> printfn "Error : %s" ex.Message )

If the validation passed successfully, the process starts the execution, checking each task for dependencies thus enforcing the order and prioritization of execution. In this last case the edge task is re-queued into the dagAgent using the “QueueTask” message. Upon completion of a task, we simply remove the task from the graph. This frees up all its dependencies to be executed.

Here below the full implementation of the dagAgent:

       
let dagAgent =
    let inbox = new MailboxProcessor(fun inbox ->
        let rec loop (tasks : Dictionary) 
                     (edges : Dictionary) = async {
                let! msg = inbox.Receive()
                match msg with
                | ExecuteTasks ->
                    // Verify that all operations are registered
                    verifyThatAllOperationsHaveBeenRegistered(tasks)
                    // Verify no cycles
                    verifyThereAreNoCycles(tasks)

                    let dependenciesFromTo = new Dictionary()
                    let operations' = new Dictionary()

                    // Fill dependency data structures
                    for KeyValue(key, value) in tasks do
                        let operation' =
                            { value with NumRemainingEdges = Some(value.Edges.Length) }
                        for from in operation'.Edges do
                            let exists, lstDependencies = dependenciesFromTo.TryGetValue(from)
                            if not <| exists then 
                                dependenciesFromTo.Add(from, [ operation'.Id ])
                            else
                                dependenciesFromTo.[from] <- (operation'.Id :: lstDependencies)
                        operations'.Add(key, operation')
                   
                    
                    operations' |> Seq.filter (fun kv ->
                                           match kv.Value.NumRemainingEdges with                                                 
                                           | Some(n) when n = 0 -> true
                                           | _ -> false)
                                |> Seq.iter (fun op -> inbox.Post(QueueTask(op.Value)))
                    return! loop operations' dependenciesFromTo
             
                | QueueTask(op) ->
                        Async.Start <| async { 
                            // Time and run the operation's delegate
                            let start' = DateTimeOffset.Now
                            match op.Context with
                            | null -> op.Task()
                            | ctx ->
                                ExecutionContext.Run(ctx.CreateCopy(),
                                                        (fun op -> let opCtx = (op :?> TaskInfo)
                                                                   (opCtx.Task())), op)
                            let end' = DateTimeOffset.Now
                            // Raise the operation completed event
                            onTaskCompleted.Trigger  { op with Start = Some(start')
                                                               End = Some(end') }
                            
                            // Queue all the operations that depend on the completation 
                            // of this one, and potentially launch newly available
                            let exists, lstDependencies = edges.TryGetValue(op.Id)
                            if exists && lstDependencies.Length > 0 then
                                let dependentOperation' = getDependentOperation lstDependencies tasks []
                                edges.Remove(op.Id) |> ignore
                                dependentOperation'
                                    |> Seq.iter (fun nestedOp -> inbox.Post(QueueTask(nestedOp))) }
                        return! loop tasks edges
              
                | AddTask(id, op) -> tasks.Add(id, op)
                                     return! loop tasks edges
            }
        loop (new Dictionary(HashIdentity.Structural)) (new Dictionary(HashIdentity.Structural)))
    inbox.Error |> Observable.add(fun ex -> printfn "Error : %s" ex.Message )
    inbox.Start()
    inbox

You can find the complete code implementation here .

To run an example we can replicate the edge dependencies in the figure above.

let dagAsync = Async.DAG.ParallelTasksDAG()
dagAsync.OnTaskCompleted |> Observable.add(fun op -> System.Console.ForegroundColor printfn "Completed %d" op.Id)

dagAsync.AddTask(1, acc1, 4,5)
dagAsync.AddTask(2, acc2, 5)
dagAsync.AddTask(3, acc3, 6, 5)
dagAsync.AddTask(4, acc4, 6)
dagAsync.AddTask(5, acc5, 7, 8)
dagAsync.AddTask(6, acc6, 7)
dagAsync.AddTask(7, acc7)
dagAsync.AddTask(8, acc8)
dagAsync.ExecuteTasks()

The ouput should be

ouput

I hope you find this to be a useful example of how you can leverage parallelism to optimize your applications.