Jack Morris
A Swift SQLite Wrapper, Part 3: Pooling Connections
12 Jan 2024

Happy 2024! In this series of posts I'm going to be running through the implementation of a lightweight SQLite wrapper, from scratch, in Swift.


In the last two parts of the series, we got a single SQLite connection up and running, allowing us to execute queries, bind parameterized values, and pull values back out of our database. Today we'll be adding support for concurrent reads by adding a connection pool, but first, we'll talk quickly about transactions.

Transactions

Our Connection type has an execute API allowing us to execute a single query against the database. However, there are times where we may want to run multiple queries against the database as a single "unit", rolling those back (if they made any changes) if a later query in the unit fails. For example:

For a database, the above is usually achieved with transactions, which are fully supported by SQLite (documentation). We can add a new API to Connection allowing us to do just that:

public actor Connection {

  ...

  @discardableResult
  func transaction<R>(
    _ action: @Sendable (_ connection: isolated Connection) throws -> R
  ) throws -> R {
    try execute("BEGIN TRANSACTION")
    do {
      let result = try action(self)
      try execute("COMMIT TRANSACTION")
      return result
    } catch {
      try execute("ROLLBACK TRANSACTION")
      throw error
    }
  }
}

BEGIN TRANSACTION indicates to SQLite that we're starting a transaction, which should then be treated as a single "unit". action contains our actual in-transaction statements (we'll get to that in a sec), and assuming they're successful, COMMIT TRANSACTION will actually make any changes we made visible to other transactions.

However, if there is a thrown error (of any kind) whilst executing your transaction statements, ROLLBACK TRANSACTION takes us back to our initial database state, prior to the transaction starting to execute. This gives us a clean "all or nothing" API in regards to making changes against our data.

Coming full circle, to actually use our new API, we just have to use our existing .execute API within a transaction closure.

connection.transaction { 
  $0.execute("INSERT INTO table1 VALUES ('a', 'b', 'c')")
  $0.execute("INSERT INTO table2 VALUES ('d', 'e', 'f')")
}

You may have noticed the isolated parameter used above - this allows our action closure to operate isolated to the Connection actor, which means we don't need to use await for each of the invocations to execute above. This is actually very important, and to see why, let's imagine that we didn't mark this closure as taking an isolated Connection. Now, in order to be able to call execute within the transaction closure (which now requires await), the closure itself would have to be async.

public actor Connection {

  ...

  @discardableResult
  func transaction<R>(
    _ action: @Sendable (_ connection: Connection) async throws -> R
  ) throws -> R {
    ...
  }
}

connection.transaction { 
  await $0.execute("INSERT INTO table1 VALUES ('a', 'b', 'c')")
  await $0.execute("INSERT INTO table2 VALUES ('d', 'e', 'f')")
}

However, permitting awaits within a transaction is potentially dangerous. If we were to suspend (for any reason) within a transaction, another Task could come in and start executing on the same Connection, which would then attempt to create a nested transaction (a call to BEGIN TRANSACTION before a previous COMMIT TRANSACTION / ROLLBACK TRANSACTION). This would thankfully fail rather than doing anything funny, however it's still a failure we'd want to avoid.

Pool

With transactions at the Connection level handled, let's move up a layer and add a new type to manage multiple Connections. We'll add Pool, which will be responsible for managing multiple connections, and for providing APIs to read/write our database (agnostic of the Connection that actually ends up doing the work). Here's how our API could look.

public final class Pool: Sendable {
  public init(url: URL, maxReaders: Int) {
    ...
  }

  @discardableResult
  public func read<R: Sendable>(
    _ action: @Sendable (_ connection: isolated Connection) throws -> R
  ) async throws -> R {
    ...
  }

  @discardableResult
  public func write<R: Sendable>(
    _ action: @Sendable (_ connection: isolated Connection) throws -> R
  ) async throws -> R {
    ...
  }
}

The Pool takes a URL for the database file (as with Connection), as well as a configurable parameter for the maximum number of permitted concurrent read connections. When using WAL-mode, SQLite permits multiple connections to read from the same database file concurrently, at the same time as a single connection is writing to it. Therefore our Pool will manage a number of Connections kept aside for read operations (which the client can use through the .read API), as well as a single Connection for writing (accessible through .write).

It is ultimately up to the client to use .read only when performing read-only work; misuse of this API could result in a SQLITE_BUSY error being thrown by SQLite. I have a few techniques for making it easier to manage and separate read-only vs. read-write database usage, which I'll cover in a follow-up to the series.

Let's fill in the APIs, and then go over the implementation.

public final class Pool: Sendable {
  public init(url: URL, maxReaders: Int) {
    initializeWriteConnectionTask = Task {
      let writeConnection = try await Connection(url: url)
      try await writeConnection.execute("PRAGMA journal_mode = WAL")
      return writeConnection
    }
    readConnections = AsyncPool(maxElements: maxReaders) {
      try await Connection(url: url)
    }
  }

  @discardableResult
  public func read<R: Sendable>(
    _ action: @Sendable (_ connection: isolated Connection) throws -> R
  ) async throws -> R {
    try await waitForReady()
    let readConnection = try await readConnections.get()

    do {
      let result = try await readConnection.transaction(action)
      await readConnections.return(readConnection)
      return result
    } catch {
      await readConnections.return(readConnection)
      throw error
    }
  }

  @discardableResult
  public func write<R: Sendable>(
    _ action: @Sendable (_ connection: isolated Connection) throws -> R
  ) async throws -> R {
    try await initializeWriteConnectionTask.value.transaction(action)
  }

  private let initializeWriteConnectionTask: Task<Connection, any Swift.Error>
  private let readConnections: AsyncPool<Connection>

  private func waitForReady() async throws {
    _ = try await initializeWriteConnectionTask.value
  }
}

Rather than using initializeWriteConnectionTask, we could setup the write Connection inline in init, and instead make the initializer async. However, if using a Pool as part of a dependency-injection chain, having an async initializer can result in making your entire dependency initialization stack async, which can have other implications and difficulties when setting up the app initial UI. Personally, I prefer to keep initializers non-async, and perform any async setup internally.

AsyncPool

We glossed over our usage of AsyncPool above, which is used to manage our readable Connections, distributing them to the Pool as read requests come in. I cover this in more detail in Distributing Work Between Actors, however for completeness I'll show the implementation here:

/// An async-aware element pool.
///
/// Elements can be fetched with `get()`. If an element is available, it will be returned
/// immediately. If not available, it will be built on demand.
///
/// If the maximum number of elements have already been built, the caller will suspend until
/// an element is available.
///
/// Elements must be returned with `return(_ element:)` once they are no longer needed.
actor AsyncPool<Element> {
  init(maxElements: Int, elementBuilder: @escaping @Sendable () throws -> Element) {
    precondition(maxElements > 0)
    self.maxElements = maxElements
    self.elementBuilder = elementBuilder
  }

  /// Retrieves an element from the pool.
  ///
  /// Will suspend if an element is not yet available.
  func get() async throws -> Element {
    // Attempt to return an element directly from the pool.
    if let element = elements.popLast() {
      return element
    }

    // Attempt to build a new element, since there are no free elements.
    if builtElements < maxElements {
      let element = try elementBuilder()
      builtElements += 1
      return element
    }

    // Wait for an element to become available.
    return await withCheckedContinuation { continuation in
      continuationQueue.enqueue(continuation)
    }
  }

  /// Returns an element to the pool.
  func `return`(_ element: Element) {
    if let nextContinuation = continuationQueue.dequeue() {
      // A task is waiting for this element, so provide it directly.
      nextContinuation.resume(returning: element)
    } else {
      // Return the element back to the pool.
      elements.append(element)
    }
  }

  private let maxElements: Int
  private var builtElements = 0
  private let elementBuilder: () throws -> Element
  private var elements: [Element] = []
  private var continuationQueue = Queue<CheckedContinuation<Element, Never>>()
}

Tests

Time for some more tests! I'm keeping testing pretty light in this series for the sake of brevity, however we can easily write a test that checks that our pool can be initialized, and transactions can be executed through both the .read and .write APIs without errors.

   func testPool() async throws {
    // Given:
    let pool = Pool(url: temporaryDatabaseURL(), maxReaders: 8)
    try await pool.write { try $0.execute("CREATE TABLE test (id INTEGER NOT NULL)") }

    // "Concurrent" writes.
    try await withThrowingTaskGroup(of: Void.self) { group in
      for _ in 0 ..< 100 {
        group.addTask {
          _ = try await pool.write { connection in
            try connection.execute("INSERT INTO test VALUES (?)", Int.random(in: 0 ... 1000))
          }
        }
      }
      try await group.waitForAll()
    }

    // Concurrent reads.
    try await withThrowingTaskGroup(of: Void.self) { group in
      for _ in 0 ..< 1000 {
        group.addTask {
          _ = try await pool.read { connection in
            try connection.execute("SELECT * FROM test")
          }
        }
      }
      try await group.waitForAll()
    }
  }

Since our writes are not actually performed concurrently, here we're just queueing up 100 write operations and validating that they don't throw an error. Our reads however should be permitted to execute concurrently (8 at a time), and those 1000 reads should complete pretty quickly.

Conclusion

Our wrapper is nearly feature complete! At this point you have a fully-functioning database pool, supporting concurrent reads (and a write) along with support for transactions. The code up to this point can be found on Github.

The final part of the series will focus on some "optional" functionality that I've found very useful in practice as part of previous projects: managing schema and data migrations. See you there!