I discovered a memory leak in
Async.StartChild
and here discuss a workaround based on a Future abstraction.
I noticed what appears to be a memory leak in F# standard library
function Async.StartChild
. This happened in a context of
a socket server, where I attempted to perform socket reading and
writing in parallel. It seems that memory use slowly grows and memory
profiler points to some CancellationTokenSource
-related
objects not being released.
As a non-leaking alternative, I used my own abstractions. The basic
idea is to use synchronizable events. Unfortunately Event
is already used in F# to mean something different, so I will use the
word Future
instead. If you know F# events, the key
problem is that subscribing to events after they happen is
meaningless, for example this code procuces nothing:
let test () = let e = Event<_>() e.Trigger(1) e.Publish.Add(printf "%i")
In contrast, Future
objects retain the value. For
simplicity, I allow subscribing and triggering only once. In
addition, the sample includes Executor
code. I found by
experimentation that running short-lived coordination tasks on a
single thread instead of the ThreadPool
is
beneficial. Enjoy:
(* | |
Copyright (c) 2008-2011 IntelliFactory | |
GNU Affero General Public License Usage The code | |
is free software: you can redistribute it and/or | |
modify it under the terms of the GNU Affero | |
General Public License, version 3, as published by | |
the Free Software Foundation. | |
The code is distributed in the hope that it will | |
be useful, but WITHOUT ANY WARRANTY; without even | |
the implied warranty of MERCHANTABILITY or FITNESS | |
FOR A PARTICULAR PURPOSE. See the GNU Affero | |
General Public License for more details at | |
<http://www.gnu.org/licenses/>. | |
If you are unsure which license is appropriate for | |
your use, please contact IntelliFactory at | |
<http://intellifactory.com/contact>. | |
See this blog for the discussion: | |
<http://tinyurl.com/fsharp-futures> | |
*) | |
#if INTERACTIVE | |
#else | |
namespace IntelliFactory.Examples | |
#endif | |
open System | |
open System.Threading | |
open System.Threading.Tasks | |
open System.Collections.Concurrent | |
type private FutureState<'T> = | |
| Computed of 'T | |
| Created | |
| Finalized | |
| Waiting of ('T -> unit) | |
[<Sealed>] | |
type Future<'T>() = | |
let root = obj () | |
let transact f = lock root f () | |
let mutable state : FutureState<'T> = Created | |
let await f = | |
transact <| fun () -> | |
match state with | |
| Computed x -> state <- Finalized; (fun () -> f x) | |
| Created -> state <- Waiting f; ignore | |
| Finalized -> invalidOp "Future is finalized." | |
| Waiting f -> invalidOp "Future is already waited on." | |
let provide value = | |
transact <| fun () -> | |
match state with | |
| Computed x -> invalidOp "Future is already provided." | |
| Created -> state <- Computed value; ignore | |
| Finalized -> invalidOp "Future is finalized." | |
| Waiting f -> state <- Finalized; (fun () -> f value) | |
let event = Async.FromContinuations(fun (k, _, _) -> await k) | |
member this.Await = event | |
member this.Provide(value) = provide value | |
[<Sealed>] | |
type Executor(?maxTaskCount, ?logError) = | |
let logError = defaultArg logError ignore | |
let mailbox = | |
let n = defaultArg maxTaskCount 128 | |
new BlockingCollection<_>(ConcurrentQueue(), n) | |
let work () = | |
let mutable loop = true | |
while loop do | |
match mailbox.Take() with | |
| None -> loop <- false | |
| Some exec -> try exec () with e -> logError e | |
let task = | |
Task.Factory.StartNew(work, | |
TaskCreationOptions.LongRunning) | |
member this.Dispose() = | |
mailbox.Add(None) | |
mailbox.CompleteAdding() | |
task.Wait() | |
task.Dispose() | |
mailbox.Dispose() | |
member this.Fork(job: Async<'T>) = | |
let f = Future() | |
let work () = | |
Async.StartWithContinuations(job, | |
f.Provide, logError, logError) | |
this.Schedule(work) | |
f.Await | |
member this.Schedule(task) = mailbox.Add(Some task) | |
member this.TaskCount = mailbox.Count | |
interface IDisposable with | |
member this.Dispose() = this.Dispose() | |
#if INTERACTIVE | |
let test () = | |
use e = new Executor() | |
let task = | |
async { | |
let read = e.Fork(async { return stdin.ReadLine() }) | |
do stdout.WriteLine("Waiting for input..") | |
return! read | |
} | |
Async.RunSynchronously task | |
#endif |