Jack Morris
Distributing Work Between Actors
6 Nov 2023

Whilst working on a lightweight SQLite wrapper for Swift, I've had a need to distribute workloads between a pool of actors. Specifically:

Here's a rough outline:

@discardableResult
public func read<R: Sendable>(
  _ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
  let readConnection = /* get a connection */
  try await readConnection.transaction(action)
}

Ideally, I want to choose a Connection that is "free", in the sense that no Task is currently executing on it. Additionally, I need to handle the case where all Connections are occupied, in which case I need to queue up and wait my turn.

A naive solution could be to choose randomly, before await-ing on that Connection.

let readConnections: [Connection] = ...

@discardableResult
public func read<R: Sendable>(
  _ action: @Sendable (_ connection: isolated Connection) throws -> R
) async throws -> R {
  let readConnection = readConnections.randomElement()!
  try await readConnection.transaction(action)
}

This however could result in unnecessary suspensions when there are free Connections waiting to accept work, if you're unlucky enough to randomly choose one that is currently fulfilling a transaction.

Instead, I went down the route of implementing a simple async-aware pool. This allows for free Connections to be returned immediately, whilst queueing requests (through Continuations) if no Connection is currently available.

/// An async-aware element pool.
///
/// Elements can be fetched with `get()`. If an element is available, it will be returned
/// immediately. If not available, it will be built on demand.
///
/// If the maximum number of elements have already been built, the caller will suspend until
/// an element is available.
///
/// Elements must be returned with `return(_ element:)` once they are no longer needed.
actor AsyncPool<Element> {
  init(maxElements: Int, elementBuilder: @escaping @Sendable () throws -> Element) {
    precondition(maxElements > 0)
    self.maxElements = maxElements
    self.elementBuilder = elementBuilder
  }

  /// Retrieves an element from the pool.
  ///
  /// Will suspend if an element is not yet available.
  func get() async throws -> Element {
    // Attempt to return an element directly from the pool.
    if let element = elements.popLast() {
      return element
    }

    // Attempt to build a new element, since there are no free elements.
    if builtElements < maxElements {
      let element = try elementBuilder()
      builtElements += 1
      return element
    }

    // Wait for an element to become available.
    return await withCheckedContinuation { continuation in
      continuationQueue.enqueue(continuation)
    }
  }

  /// Returns an element to the pool.
  func `return`(_ element: Element) {
    if let nextContinuation = continuationQueue.dequeue() {
      // A task is waiting for this element, so provide it directly.
      nextContinuation.resume(returning: element)
    } else {
      // Return the element back to the pool.
      elements.append(element)
    }
  }

  private let maxElements: Int
  private var builtElements = 0
  private let elementBuilder: () throws -> Element
  private var elements: [Element] = []
  private var continuationQueue = Queue<CheckedContinuation<Element, Never>>()
}

(Note that Queue is a simple FIFO queue, you could use an Array and just pop elements from the front, if the potential performance implications don't concern you).

The only concerns I have with this approach are:

extension AsyncPool where Element: Sendable {
  func withElement<R: Sendable>(_ action: (Element) async throws -> R) async throws -> R {
    let element = try await get()
    defer { self.return(element) }
    return try await action(element)
  }
}

It seems to be working pretty well so far, however since I'm still in the early days of properly diving in to Swift concurrency I wouldn't be surprised if there's some gotcha I haven't considered. I'd love to hear from you if so!