Skip to content

Latest commit

 

History

History
141 lines (102 loc) · 6.19 KB

File metadata and controls

141 lines (102 loc) · 6.19 KB

Locking

The dstack server supports SQLite and Postgres databases with two implementations of resource locking to handle concurrent access:

  • In-memory locking for SQLite.
  • DB-level locking for Postgres.

SQLite locking

SQLite is missing efficient mechanisms to handle concurrent writes (e.g. select for update), so dstack implements in-memory resource-level locking. In-memory locking works correctly under the assumption that there is only one server instance (process), which is a dstack limitation when using SQLite.

The in-memory locking is implemented via locksets. Locksets are Python sets that store IDs of locked resources. Concurrent access to locksets is guarded with asyncio locks:

lock, lockset = get_lockset("my_table")
async with lock:
    # select resource that is not in lockset
    lockset.add(resource.id)
try:
    process_resource(resource)
finally:
    lockset.remove(resource.id)

Locksets are an optimization. One can think of them as per-resource-id locks that allow independent locking of different resources.

Postgres locking

Postgres resource locking is implemented via standard SELECT FOR UPDATE. SQLAlchemy provides .with_for_update() that has no effect if SELECT FOR UPDATE is not supported as in SQLite.

There are few places that rely on advisory locks as when generating unique resource names.

Working with locks

Concurrency is hard. Concurrency with locking is especially hard. Below you'll find common patterns and gotchas when working with locks to make it a bit more manageable.

A task should acquire locks on resources it modifies

This is common sense. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged. In this example, run processing should take job lock instead.

Start new transaction after acquiring a lock to see other transactions changes in SQLite.

select resource ids by names
lock resource ids
await session.commit()
# The next statement will start new transaction
select ...

SQLite exhibits Snapshot Isolation. When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html

Thus, if a new transaction is not started, you won't see changes that concurrent transactions made before you acquired the lock.

This is not relevant for Postgres since it doesn't rely on in-memory locking (and it also runs on Read Committed isolation level by default).

Release in-memory locks only after committing changes

# Don't do this!
lock resources
unlock resources
do smth else
await session.commit()
# Do this!
lock resources
await session.commit()
unlock resources

If a transaction releases a lock before committing changes, the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes.

Using joinedload when selecting .with_for_update()

Using joinedload and .with_for_update() triggers an error in case of no related rows because joinedload produces OUTER LEFT JOIN and SELECT FOR UPDATE cannot be applied to the nullable side of an OUTER JOIN. Here's the options:

  • Use .with_for_update(of=MainModel).
  • Select with selectinload
  • First select with .with_for_update() without loading related attributes and then re-select with joinedload without .with_for_update().
  • Use regular .join() to lock related resources, but you may get 0 rows if there is no related row to join.

Always use .with_for_update(key_share=True) unless you plan to delete rows or update a primary key column

If you SELECT FOR UPDATE from a table that is referenced in a child table via a foreign key, it can lead to deadlocks if the child table is updated because Postgres will issue a FOR KEY SHARE lock on the parent table rows to ensure valid foreign keys. For this reason, you should always do SELECT FOR NO KEY UPDATE (.with_for_update(key_share=True)) if primary key columns are not modified. SELECT FOR NO KEY UPDATE is not blocked by a FOR KEY SHARE lock, so no deadlock.

Lock unique names

The following pattern can be used to lock a unique name of some resource type:

lock_namespace = f"fleet_names_{project.name}"
if get_db().dialect_name == "sqlite":
    # Start new transaction to see committed changes after lock
    await session.commit()
elif get_db().dialect_name == "postgresql":
    await session.execute(
        select(func.pg_advisory_xact_lock(string_to_lock_id(lock_namespace)))
    )

lock, _ = get_locker(get_db().dialect_name).get_lockset(lock_namespace)
async with lock:
    # ... select taken names, use a unique name
    await session.commit()

Note that:

  • This pattern works assuming that Postgres is using default isolation level Read Committed. By the time a transaction acquires the advisory lock, all other transactions that can take the name have committed, so their changes can be seen and a unique name is taken.
  • SQLite needs a commit before selecting taken names due to Snapshot Isolation as noted above.

Use AsyncExitStack

In-memory locking typically requires taking lock for long (until commit). Using lock context managers for in-memory locking is often hard because the lock is tied to a block:

if something:
    # Can't do this because the lock will be released before commit. How to lock?
    async with get_locker(get_db().dialect_name).lock_ctx(...):
        # ...
# ...
await session.commit()

Use contextlib.AsyncExitStack:

async with AsyncExitStack() as exit_stack:
    if something:
        # The lock will be released only on stack exit, so it's ok.
        await exit_stack.enter_async_context(
            get_locker(get_db().dialect_name).lock_ctx(...)
        )
        # ...
    # ...
    await session.commit()