Using a F# event and asynchronous in multi-threaded code

FYI; the implementation for Event<int> can be found here.

The interesting bit seems to be:

member e.AddHandler(d) =
  x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>)
member e.RemoveHandler(d) = 
  x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>)

Subscribing to an event combines the current event handler with the event handler passed into subscribe. This combined event handler replaces the current one.

The problem from a concurrency perspective is that here we have a race-condition in that concurrent subscribers might use the came current event handler to combine with and the "last" one that writes back the handler win (last is a difficult concept in concurrency these days but nvm).

What could be done here is to introduce a CAS loop using Interlocked.CompareAndExchange but that adds performance overhead that hurts non-concurrent users. It's something one could make a PR off though and see if it viewed favourably by the F# community.

WRT to your second question on what to do about it I can just say what I would do. I would go for the option of creating a version of FSharpEvent that supports protected subscribe/unsubscribe. Perhaps base it of FSharpEvent if your company FOSS policy allows it. If it turns out a success then it could form a future PR to F# core libary.

I don't know your requirements but it's also possible that if what you need is coroutines (ie Async) and not threads then it's possible to rewrite the program to use only 1 thread and thus you won't be affected by this race condition.


At first, thanks to FuleSnable for his answer. He pointed me in the right direction. Based on the information he provided I implemented a ConcurrentEvent type myself. This type uses Interlocked.CompareExchange for adding/removing its handlers so it is lock-free and hopefully the fastest way of doing it.

I started the implementation by copying the Event type from the F# Compiler. (I also leave the comment as-is.) The current implementation looks like this:

type ConcurrentEvent<'T> =
    val mutable multicast : Handler<'T>
    new() = { multicast = null }

    member x.Trigger(arg:'T) =
        match x.multicast with
        | null -> ()
        | d -> d.Invoke(null,arg) |> ignore
    member x.Publish =
        // Note, we implement each interface explicitly: this works around a bug in the CLR
        // implementation on CompactFramework 3.7, used on Windows Phone 7
        { new obj() with
            member x.ToString() = "<published event>"
          interface IEvent<'T>
          interface IDelegateEvent<Handler<'T>> with
            member e.AddHandler(d) =
                let mutable exchanged = false
                while exchanged = false do
                    System.Threading.Thread.MemoryBarrier()
                    let dels    = x.multicast
                    let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T>
                    let result  = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels)
                    if obj.ReferenceEquals(dels,result) then
                        exchanged <- true
            member e.RemoveHandler(d) =
                let mutable exchanged = false
                while exchanged = false do
                    System.Threading.Thread.MemoryBarrier()
                    let dels    = x.multicast
                    let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T>
                    let result  = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels)
                    if obj.ReferenceEquals(dels,result) then
                        exchanged <- true
          interface System.IObservable<'T> with
            member e.Subscribe(observer) =
                let h = new Handler<_>(fun sender args -> observer.OnNext(args))
                (e :?> IEvent<_,_>).AddHandler(h)
                { new System.IDisposable with
                    member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } }

Some notes on the design:

  • I started with a recursive loop. But doing that and looking at the compiled code it creates an anonymous class and calling AddHandler or RemoveHandler created an object of this. With direct implementation of the while loop it avoids instantiation of an object whenever a new handler is added/removed.
  • I explicitly used obj.ReferenceEquals to avoid a generic hash equality.

At least in my tests adding/removing a handler now seems to be thread-safe. ConcurrentEvent can just be swapped with the Event type as needed.


A benchmark if people are curious on how much slower the ConcurrentEvent will be compared to Event:

let stopWatch () = System.Diagnostics.Stopwatch.StartNew()

let event = Event<int>()
let sub   = event.Publish

let cevent = ConcurrentEvent<int>()
let csub   = cevent.Publish

let subscribe sub x = async {
    let mutable disposables = []
    for i=0 to x do
        let dis = Observable.subscribe (fun x -> printf "%d" x) sub
        disposables <- dis :: disposables
    for dis in disposables do
        dis.Dispose()
}

let sw = stopWatch()
Async.RunSynchronously(async{
    // Amount of tries
    let tries = 10000

    // benchmarking Event subscribe/unsubscribing
    let sw = stopWatch()
    let! x = Async.StartChild (subscribe sub tries)
    let! y = Async.StartChild (subscribe sub tries)
    do! x
    do! y
    sw.Stop()
    printfn "Event: %O" sw.Elapsed
    do! Async.Sleep 1000
    event.Trigger 1
    do! Async.Sleep 2000

    // Benchmarking ConcurrentEvent subscribe/unsubscribing
    let sw = stopWatch()
    let! x = Async.StartChild (subscribe csub tries)
    let! y = Async.StartChild (subscribe csub tries)
    do! x
    do! y
    sw.Stop()
    printfn "\nConcurrentEvent: %O" sw.Elapsed
    do! Async.Sleep 1000
    cevent.Trigger 1
    do! Async.Sleep 2000
})

On my system subscribing/unsubscribing 10,000 handlers with the non-thread-safe Event takes around 1.4 seconds to complete.

The thread-safe ConcurrentEvent takes around 1.8 seconds to complete. So I think the overhead is pretty low.