Friday, December 30, 2011

Solving F# Async.StartChild Leak: Futures

I discovered a memory leak in Async.StartChild and here discuss a workaround based on a Future abstraction.

I noticed what appears to be a memory leak in F# standard library function Async.StartChild. This happened in a context of a socket server, where I attempted to perform socket reading and writing in parallel. It seems that memory use slowly grows and memory profiler points to some CancellationTokenSource-related objects not being released.

As a non-leaking alternative, I used my own abstractions. The basic idea is to use synchronizable events. Unfortunately Event is already used in F# to mean something different, so I will use the word Future instead. If you know F# events, the key problem is that subscribing to events after they happen is meaningless, for example this code procuces nothing:

let test () =
    let e = Event<_>()
    e.Trigger(1)
    e.Publish.Add(printf "%i")

In contrast, Future objects retain the value. For simplicity, I allow subscribing and triggering only once. In addition, the sample includes Executor code. I found by experimentation that running short-lived coordination tasks on a single thread instead of the ThreadPool is beneficial. Enjoy:

(*
Copyright (c) 2008-2011 IntelliFactory
GNU Affero General Public License Usage The code
is free software: you can redistribute it and/or
modify it under the terms of the GNU Affero
General Public License, version 3, as published by
the Free Software Foundation.
The code is distributed in the hope that it will
be useful, but WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Affero
General Public License for more details at
<http://www.gnu.org/licenses/>.
If you are unsure which license is appropriate for
your use, please contact IntelliFactory at
<http://intellifactory.com/contact>.
See this blog for the discussion:
<http://tinyurl.com/fsharp-futures>
*)
#if INTERACTIVE
#else
namespace IntelliFactory.Examples
#endif
open System
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent
type private FutureState<'T> =
| Computed of 'T
| Created
| Finalized
| Waiting of ('T -> unit)
[<Sealed>]
type Future<'T>() =
let root = obj ()
let transact f = lock root f ()
let mutable state : FutureState<'T> = Created
let await f =
transact <| fun () ->
match state with
| Computed x -> state <- Finalized; (fun () -> f x)
| Created -> state <- Waiting f; ignore
| Finalized -> invalidOp "Future is finalized."
| Waiting f -> invalidOp "Future is already waited on."
let provide value =
transact <| fun () ->
match state with
| Computed x -> invalidOp "Future is already provided."
| Created -> state <- Computed value; ignore
| Finalized -> invalidOp "Future is finalized."
| Waiting f -> state <- Finalized; (fun () -> f value)
let event = Async.FromContinuations(fun (k, _, _) -> await k)
member this.Await = event
member this.Provide(value) = provide value
[<Sealed>]
type Executor(?maxTaskCount, ?logError) =
let logError = defaultArg logError ignore
let mailbox =
let n = defaultArg maxTaskCount 128
new BlockingCollection<_>(ConcurrentQueue(), n)
let work () =
let mutable loop = true
while loop do
match mailbox.Take() with
| None -> loop <- false
| Some exec -> try exec () with e -> logError e
let task =
Task.Factory.StartNew(work,
TaskCreationOptions.LongRunning)
member this.Dispose() =
mailbox.Add(None)
mailbox.CompleteAdding()
task.Wait()
task.Dispose()
mailbox.Dispose()
member this.Fork(job: Async<'T>) =
let f = Future()
let work () =
Async.StartWithContinuations(job,
f.Provide, logError, logError)
this.Schedule(work)
f.Await
member this.Schedule(task) = mailbox.Add(Some task)
member this.TaskCount = mailbox.Count
interface IDisposable with
member this.Dispose() = this.Dispose()
#if INTERACTIVE
let test () =
use e = new Executor()
let task =
async {
let read = e.Fork(async { return stdin.ReadLine() })
do stdout.WriteLine("Waiting for input..")
return! read
}
Async.RunSynchronously task
#endif
view raw AsyncUtils.fs hosted with ❤ by GitHub

2 comments:

  1. Can you give an example how you would do that? My understanding is that you can indeed use Lazy for coordination: forcing a Lazy value will make the consumer thread wait until the producer thread will compute the value. The reason this does not make me happy is because it consumes a whole .NET-level thread. With a callback-based Future, only Async-level thread is waiting, which consumes a lot less resources.

    ReplyDelete