Concurrent Programming Abstraction

NB: This is a X-Post from a public Notion document

The goal of this discussion is to choose a new abstraction layer for representing our concurrent programming logic. Jane Street Async’s existing abstractions (Pipes + *vars) have proven to be unprincipled enough to repeatadly shoot us in the foot. We would prefer to use a more principled abstraction layer that will enable us to write code with clearer semantics, and hopefully provide us tools for analyzing our concurrent structure/control flow in order to better analyze changes and find bugs earlier.

Anyone on the team is welcome to propose an abstraction we should use. The proposal should focus on what is gained by using the abstraction, how the abstraction can be instrumented/tooled/analyzed, and should provide some pseudo code to show how we would actually use the abstraction in our code base (best to choose a real example).

Former issues requesting features in this area: https://github.com/CodaProtocol/coda/issues/231

I will be starting this thread off by posting my own proposal in a reply.

1 Like

OTP/Erlang Style

Why Consider this Abstraction?

The OTP/Erlang concurrent programming model provides a more principled approach to structuring concurrent applications while still providing a high degree of flexibility. It details a low level, flexible definition of concurrent programming through “processes”, and provides the tools to build a tree-structured, concurrent, fault-tolerant application from this primitive. By creating a semantic separation between the generic and specific definitions of a what an individual process is, it provides a framework for building reusable logic. This helps to solve one of the main pain points with our use of the builtin Async concurrent programming primitives; that is, we can easily express tightly defined, simple, high level abstractions that we decorate with specifications in order to build our application. The tree-structuring approach to modeling an application helps greatly with designing fault tolerance systems as it puts fault recovery at the forefront of a developers mind while building subsystems with this abstraction. The tree-structuring approach also greatly increases the debuggability and analyzability of the application, giving us unified layers where we can stick our debugging and visualization logic. The OTP/Erlang model has proven to be a very effective and useful concurrent programming abstraction.

Description

Much of the information here is distilled from the erlang documentation (some of it nearly word for word). If you would rather take a look at this information directly, take a look at these links:

Processes

A “process” is a thread of execution which follows a certain set of properties. Firstly, a process shares no data (state) with other processes. Every process has an input mailbox, which is a essentially a queue of messages. A process can request to receive messages, which will cause the process to block and read messages from the queue until a message it is interested in is found. A process terminates when it’s main function exits. Processes can be spawned, starting the execution of the process’ main function and returning a PID to the process. Processes can be registered to a name for debugging purposes. Processes can exit execution early with an error. Processes can also be linked/unlinked from one another, which determines notification of errors, processes to handle errors from each other. Processes can also be monitored, which provides a stream of events regarding changes in the processes state (this is a more heavyweight alternative to process links).

Mailbox Semantics

At a basic level, mailboxes are queues of unbound length. However, unlike a traditional queue model (like how Async.Pipe is intended to be used), items can be taken from the middle of the queue. In erlang, receiving a message in a process has the following behavior: if there are no messages in the mailbox, block and wait for messages, but while there are messages, read messages from the front of the queue until you find a message- that matches one of the receive patterns requested, removing an item from the queue and continuing execution of the current process if a match is found. Next time the process blocks, if there are new patterns present in the receive statement, then the older messages in the mailbox that were skipped will be checked against those cases again. Otherwise, erlang knows how to skip checking messages that have already been checked for all cases you are matching against. The last bit of behavior is implementable in OCaml, but it would require creating a type level separation between the pattern matching function checking items in the queue and where the receive is actually called from.

At first glance, mailboxes can seem like they are just pipes all over again, but there is a key difference here: if you want queue semantics, then you just write a receive pattern that accepts all messages. The use case for looking further into a mailbox is for other messages is very rare, and is opt in instead of opt-out. In fact, in our implementation, I would argue the default receive should just always read a message, and another function receive_selected could be called if you need those semantics. In general, logic for patterns around mailbox processing will be handled by behaviors and not specific process definitions (see section below).

Process Behaviors

Processes are a generalized concurrent thread model. We categorize various processes based on their behaviors. In Erlang, behaviors are actually built into the language, and the way they work is fairly akin to the concept of functors. Therefore, behaviors in our code would just be a functor that returns a process.

More specifically, a behavior describes some generalized logic for a category of process, and a “callback handler” is implemented for specific instantiations of a behavior in order to provide the specific logic for that category of process.

Erlang/OTP comes with a few builtin behaviors for describing processes: gen_server (client-server based process), gen_statem (state machine processes), gen_event (event handling process), and supervisor (see supervision trees section).

Supervision Trees

A supervison tree is a structuring model for “worker” and “supervisor” processes. A worker process is a process that performs some computation, and a supervisor process monitors workers as they run. Supervisors are nested in a tree structure, and each supervisor has some specified behaviors around how it should handle various states of its children. This design pattern is very useful for building fault-tolerant concurrent programs. Each supervisor can have its own rules for how it will handle various errors, allowing it to make decisions like whether to respawn a worker, to ignore the error, or perhaps to exit and notify the parent supervisor of an error.

gIn the diagram below (pulled from -the Erlang docs) shows an example supervisor tree setup. The squares are supervisors, circles are workers, and edges are monitoring links.

Monitors

Monitors are the primary tool for getting rich events for state changes in processes. There are a lot of things that monitors can notify supervising processes of, so rather than lay them all out here, I will just link to the documentation so that you can get an idea of the scope of what it means to monitor a process. http://erlang.org/doc/man/erlang.html#monitor-2

Advantages

Behaviors as Robust, Reusable Patterns

At a core level, the abstraction of a process is still rather generic (for instance, mailboxes are queues, errors can be thrown from anywhere, messages to other processes can be sent from anywhere, receives can be bound anywhere, etc…). However, the power of processes is that they are simple and can be built on top of easily and clearly to provide more restrictive concurrent programming models. Working off of the base behaviors provided by the OTP system, we can define any other behavior we need on top of those or directly on top of raw processes when we need to. Implementing patterns such as message buffers and timeout monitors becomes very simple when built on top of servers or state machines.

Fault Tolerant Design

Supervisor trees are a great and time-tested tool that allow for quick, easy descriptions of fault tolerant behaviors in your application. The supervisor behavior is very high level and, if implemented similar to erlang, allows the description of how a supervisor will handle fault tolerance very simple and abstract. It is quick to define a supervision tree for a subset of your application and plug it into a larger supervision tree to connect all your fault tolerant logic together. Using a supervision tree also makes reacting to certain system-wide events far simpler. For instance, graceful shutdown can become fairly easy since you can set it up so that if any supervisor in the tree receives a graceful shutdown request, it can propagate it to all the other supervisors, and each one can know whether or not it’s workers can just be killled immediately or if they require graceful shutdown.

Visualizability/Analyzability/Debugability

Since applications build on top of an OTP model are connected together in supervision trees, it is very easy to implement a process visualizer on top of our process structure. Because processes are registered globally, disjoint processes can be detected and we can show exactly which processes are disconnected at any point in time (we would probably make it an error to not connect a process in some way to some root supervisor, but that’s an aside). Using this system, it’s also easy to collect information about how errors are handled and propagated between processes, which could prove very useful for debugging things like graceful shutdown or an error state reached while trying to handle something in a fault tolerant manner. We can also have CI generate a graph of our processes, and if the graph ever changes between master and a PR, it could post a comment with a graph diff (or something) to the PR so that we are sure to review that the new adjustments to the architecture are correct, forcing us to consider all future additions to our concurrent architecture more carefully.

Besides visualizability, this structured representation is also ripe for other forms of automatic analysis. For instance, if we describe all our processes through behaviors, and those behaviors have consistent communication semantics, then we can actually extract our process information and do some formal analysis on it. One potential way we could analyze our process graph would be by turning it into a petri net and checking it for reachability and liveness (I have a petri net library for OCaml that does some of this, so it wouldn’t be too much work).

It’s also possible to stick in an event logging system for our message passing and then correlate those events back to a graph of the processes. This could prove invaluable when debugging asynchronous bugs since we would be able to navigate events between processes in the system. Using tools like Jaeger, we could probably even visualize a single daemon as an entire distributed system and walk through the messages as if they were requests between services.

Cross-Process Concurrency Support

If the OTP process model is properly followed (specifically, if we ensure that processes do not share state), we can split OTP processes up into various OS processes. In the BeamVM, running processes are split up between multiple CPU cores (in addition to per-core process switch parallelism). This is easy to do because the message passing interface between processes is so generic. So, if we wanted, we could abstract over Async RPC (or some other IPC system) and just describe our OS level worker processes as OTP processes, further unifying our concurrent programming abstraction.

Disadvantages

As far as I can see, the main disadvantage to this system is that it puts a lot of emphasis on correct design and implementation of our behaviors. The OTP process system by itself does not immediately abstract our concurrent model to an ideal layer, but it lays the groundwork and model for creating basically any abstraction we would need. What OTP really gives us, besides this more principaled basis, is a limited layer in which to implement the abstractions we need in our system (behaviors). We should be able to limit all of our “threads” down to a small set of process behaviors, so as long as we test these behaviors well and make sure they are well designed (i.e. maximum encapsulation), this shouldn’t be an issue.

One other thing to note is that the true OTP process model is designed for working with purely immutable values. In erlang, each process has it’s own state (which is just a stack of immutable values) and processes only share information via message passing. However, in our system, some of our values are mutable (such as the transition frontier). If we want to get the full, complete power of the OTP process model (specifically, if we ever want to have Cross-Process Concurrency; see above), we would need to figure out some way of managing this so that only one process at a time has access to some mutable state. A naive way to do this would be to wrap all of the mutable state in the system in Agents (which is the proper way to do it in Erlang), but we would need to test performance of this because if this is implemented with Async.Deferred, you may pay 1-2 context switches for every Agent access. We would need to test this out and make a decision as to whether or not we want to break this part of the OTP process model in order to get more efficiency but less safety in our design.

Pseudo Code

Mock Interface

This interface can be cleaned up some in order to make it a little easier to define a process, but this describes a simple way to implement the OTP Process model in OCaml using functors. Only one example of a behavior is given to show how you would interact with them.

module Process : sig
  (* And Interface is a description of what arguments
   * and inputs a process has *)
  module Interface_intf : sig
    type input
    type args
  end

  (* A Lib provides functions that are scoped only
   * to an instance of a Process. This allows the
   * Process.Make functor to inject some functionality
   * into a Process Specification in a way that
   * encapsulates that functionality only to that
   * individual Process. *)
  module Lib_intf : sig
    type input

    (* would need to lift message filtering to enumerated
     * level in order to support selection skip optimization *)
    val receive_selected : f:(input -> bool) -> input Deferred.t

    (* same as received_selected ~f:(Fn.const true) *)
    val receive : unit -> input Deferred.t
    val send : 'a pid -> 'a -> unit Deferred.t
  end

  (* A Specification provides the non-generalized
   * behaviors of a Process. It contains only an
   * entrypoint for the Process, which will be
   * executed when the process starts. *)
  module Specification_intf : sig
    type args
    type input

    (* [exec] is the main routine which will be run when
     * the process is spawned. When the [unit Deferred.t]
     * returned from [exec] resolves, the process will stop *)
    val exec : input pid -> args -> unit Deferred.t
  end

  module Make
    (Interface : Interface_intf)
    (Spec :
         functor (Lib : Lib_intf with type input := input)
      -> Specification_intf
           with type args := Interface.args
            and type input := Interface.input)
    : S
      with type args = Spec.args
       and type input = Spec.input
end

module Behaviors = struct
  module State_machine = struct
    module type Specification_intf = sig
      type input
      type args
      type state

      val initialize : args -> state
      val reduce : state -> input -> state option
    end

    module Make
      (Interface : Interface_intf)
      (Spec :
           functor (Lib : Lib_intf with type input := input)
        -> Specification_intf
             with type args := Interface.args
              and type input := Interface.input)
      : Process.S
        with type args = Spec.args
         and type input = Spec.input
  end
end

Ping/Pong Example

This is a translation of a simple “hello world” level erlang process example with a ping/pong process setup. Original erlang code is here: http://erlang.org/doc/getting_started/conc_prog.html#message-passing.

open Async_kernel
open Deferred.Let_syntax
open Otp

type ping_msg =
  [ `Finished
  | `Ping of [`Pong] pid ]

module Ping = Process.Make
  (struct
    type args = {n: int; pong_pid: ping_msg pid}
    type input = [`Pong]
  end)
  (functor (Lib : Lib_intf with type input := [`Pong]) -> struct
    open Lib		

    let rec exec self {n; pong_id} =
      if n > 0 then (
        send pong_id (`Ping self);
        let%bind `Pong = receive () in
        let%bind () = Printf.printf "Ping received pong\n" in
        exec {n= n - 1; pong_id})
      else
        send pong_id `Finished
  end)

module Pong = Process.Make
  (struct
    type args = unit
    type input = ping_msg
  end)
  (functor (Lib : Lib_intf with type input := [`Pong]) -> struct
    open Lib

    let rec exec _ () =
      match%bind receive () with
      | `Finished ->
          Printf.printf "Pong finished\n"
      | `Ping ping_pid ->
          let%bind () = Printf.printf "Pong received ping\n" in
          send ping_pid `Pong;
          exec ()
  end)

let run_ping_pong () =
  let pong_pid = Pong.spawn () in
  let ping_pid = Ping.spawn {n=3; pong_pid} in
  let%map () = Process.wait_for_termination pong_pid in
  assert (not (Process.running ping_pid))

run_ping_pong ()
Async.Scheduler.run ()
1 Like

It might be cool to make use of linear types. We have a lot of instances where a variable should be not be used more than once during it’s life time, i.e. processing transactions in the staged_ledger. Having this type safety can reduce a good amount of bugs that can occur for our system. We can probably create a monadic DSL for it:

I love linear types. Like, alot (big fan of ATS even though it’s basically unusable lol). However, I’m not sure it can be achieved in OCaml without wrapping all of our code in a monadic computation, which would be a lot of work. Would be interested in seeing a demo that is safe and allows you to drop in and out of the monad in an easy way.

thanks for this writeup – it’s great.
looking at the pipe related issues we’ve had, it seems like most of them are either

  1. Writing to closed pipes
  2. Pipes overflowing (I guess because we’re not making adequate use of the push back mechanism)

can you outline briefly how this would help address these issues, and any other issues we’ve seen with pipes that this would help address?

I will address the issues that you brought up (which have been pervasive throughout our development), as well as an issue that I believe this alternative improves on.

Writing to closed pipes

Writing to closed pipes is an issue we have run into repeatedly because we do not have or fit our code into a formal abstraction around “component management” (here, a component is being defined as a group of threads that have a shared lifecycles, resources, and concerns). For instance, the most common place we have seen this is when adding new pipes to the transition frontier controller. The transition frontier controller and transition handler are a group of threads with a shared lifecycle and resource scope. We run this “component” when we are in the participation phase of the protocol. However, the system will shut down this component when entering a bootstrap phase and restart it when exiting that phase. In order to manage this shut down, we close all the pipes that were opened for the component to run. What keeps on happening is that new pipes get introduced internally ad-hoc and the logic to close those pipes correctly is not implemented. This can be trickier than it seems on the surface as there are “high level” pipes which are the entry point of the concurrent control flow, and if you add one of those, you need to make sure to close those in some imperative function, but there are also “intermediate pipes” which a consuming further downstream, and each individual thread that manages those pipes needs to be responsible for tearing it down as a side effect of it’s upstream pipe closing.

Under the Erlang/OTP model, the concept of a “process” encapsulates the individual lifecycle and resource management of one piece of the architecture, automatically closing downstream connections and reopening new pipes when a new process is spawned. This means that if there is a write to a closed pipe, that is actually a bug with the abstraction and not the code as the code is left completely abstract from this low level primitive manipulation. Further more, the usage of “supervisor trees” provides further fault tolerance to this system, ensuring all children shut down and that any errors from them are handled or bubbled up in scope to be handled elsewhere. This creates a more concrete methodology for us to design the lifecycle of our processes to avoid other less obvious mistakes that have similar root causes to the “writing to a closed pipe” issue.

Pipe Overflow

Pipe overflow is a little bit more complicated as it has shown up for a variety of reasons. I think the easiest thing to do is to give a little history on why it happens at all, because this comes from our abstraction we built on top of janestreet’s pipes in order to help prevent shortcomings in our architecture design, though in hindsight, I think it was a mistake.

Async pipes are modeled as infinite queues of data sent between concurrent threads. Infinite queues are a common abstraction in concurrent programming (and in fact, the OTP mailboxes described above also work off of an infinite queue, though there are more interesting behaviors you can implement on top of it). However, in a tightly nit system that intends to stay as synchronized as possible (like a cryptocurrency protocol node), letting these queues grow forever is far from ideal since that means you could have a constant drift of desynchronization where you get further and further behind. In order to help combat this, we decided that while we design and layout our system, we wanted to put restrictions on the max size of pipes and crash so that we can identify bottlenecks in the architecture and patch them (though long term we always knew we would need to revisit capped pipes since it’s not good to do this in a production system). So, whenever we got the pipe overflow issue, it was typically indicative that either the wrong type of pipe was chosen, or that there is an assumption in the concurrent control flow that was not true in practice. I would also expect these to show up when we stress test our concurrent code for things like DDoS prevention, though we haven’t gotten there yet.

OTP helps us address these issues indirectly by giving us a higher level abstraction layer to reason about our architecture. Furthermore, it also enables us to easily do more reasonable pipe management. For instance, priority queues and filtered queues are easy to implement on top of the OTP mailbox model. However, I think the most powerful tool we get is at the supervisor layer of abstraction, where we can actually monitor children processes and handle things like a mailbox queue growing over some threshold gracefully. Many things can be implemented clearly at this layer of abstraction, such as pruning a mailboxes queue when it reaches a threshold, or spawning new processes and splitting the mailbox queue up. Furthermore, the graceful handling of these cases can also be bubbled up easily to higher layers in the architecture if need be (though, generally for style, it’s best to keep supervision logic encapsulated at a lower level when sensible). In Erlang, most of these complex supervisor structures are defined at a high level through configuring a “behavior”, which is basically Erlang’s equivalent of a functor (more or less), so if we adapt that system into OCaml, it makes defining each of these supervisors rather simple, in practice.

Visualizability/Diffability

Something that is really lacking with our current abstraction is the ability to properly vet and analyze changes to the concurrent architecture. With concurrent programming (at least, reasonably efficient concurrent programming), you create a big imperative and tightly coupled architecture. Seemingly simple changes to this architecture can actually have large effects which are hard to reason about unless you closely re-evaluate these changes in the context of the full architecture. With the current development approach, many ad-hoc changes are made to fix bugs or inefficiencies, and it can be very hard to visualize these changes in the context of the entire architecture. We attempted to keep some high level documents up to date at the beginning of designing much of our current async code, but keeping those in sync with the codebase has proven too difficult at the rate we are moving, and now they are very inaccurate to what the current architecture looks like. It is, in my opinion, much more ideal if we have an abstraction that allows us to extract our architecture into another format, such as a visual graph (via dot for instance). If this is done, we can even add tooling to easily diff the changes made in a PR, showing what processes were added or removed from the control flow of the program, and showing connections between processes that were added or removed as well. This makes it much easier to cross reference what the code is actually doing when reviewing async code. I think there is some further work that can be done for even providing live visualizations in the browser, which could prove useful for capturing, replaying, and debugging the behavior of our application. I also think we could do some formal analysis on a graph of our architecture and check for some basic properties such as liveness.

Wow, that is a lot to read and I got lost in the details.

Are you proposing to use this approach for the CODA blockchain or across many others?

Apologies if I am asking a noobie Q :slight_smile:

This proposal is for the concurrent programming abstraction we use in the Coda node we are building. Right now, we use Janestreet’s Async library, but that abstraction is very primitive and has caused us issues, so we are looking at heavier weight abstractions to build our program off of.