Skip to content

Commit

Permalink
Draft: Initial version of keep alive lock thread (#57)
Browse files Browse the repository at this point in the history
* Initial version

* fix never ending thread

* performance improvement

* Updated assets/coverage.svg

* add more locking tests

* Updated assets/coverage.svg

* update workspace

* improvements

* reduce timeout

* more tests

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
mkrd and github-actions[bot] authored Dec 17, 2023
1 parent 7f52085 commit 15546cd
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 22 deletions.
2 changes: 1 addition & 1 deletion DictDataBase.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"editor.defaultFormatter": "charliermarsh.ruff"
},
"editor.codeActionsOnSave": {
"source.organizeImports": true
"source.organizeImports": "explicit"
},
}
}
74 changes: 67 additions & 7 deletions dictdatabase/locking.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import contextlib
import os
import threading
import time
Expand All @@ -10,8 +11,17 @@
# - Do not use pathlib, because it is slower than os

# Constants
SLEEP_TIMEOUT = 0.001
LOCK_TIMEOUT = 60.0 # Duration to wait before considering a lock as orphaned.
SLEEP_TIMEOUT = 0.001 * 1 # (ms)
LOCK_KEEP_ALIVE_TIMEOUT = 0.001 * 0.08 # (ms)

# Duration to wait updating the timestamp of the lock file
ALIVE_LOCK_REFRESH_INTERVAL_NS = 1_000_000_000 * 10 # (s)

# Duration to wait before considering a lock as orphaned
REMOVE_ORPHAN_LOCK_TIMEOUT = 20.0

# Duration to wait before giving up on acquiring a lock
AQUIRE_LOCK_TIMEOUT = 60.0


def os_touch(path: str) -> None:
Expand Down Expand Up @@ -50,6 +60,9 @@ def __init__(self, ddb_dir: str, name: str, id: str, time_ns: str, stage: str, m
lock_file = f"{name}.{id}.{time_ns}.{stage}.{mode}.lock"
self.path = os.path.join(ddb_dir, lock_file)

def __repr__(self) -> str:
return f"LockFileMeta({self.ddb_dir=}, {self.name=}, {self.id=}, {self.time_ns=}, {self.stage=}, {self.mode=})"

def new_with_updated_time(self) -> LockFileMeta:
"""
Create a new instance with an updated timestamp.
Expand Down Expand Up @@ -91,7 +104,7 @@ def __init__(self, need_lock: LockFileMeta) -> None:
# Remove orphaned locks
if lock_meta.path != need_lock.path:
lock_age = time.time_ns() - int(lock_meta.time_ns)
if lock_age > LOCK_TIMEOUT * 1_000_000_000:
if lock_age > REMOVE_ORPHAN_LOCK_TIMEOUT * 1_000_000_000:
os.unlink(lock_meta.path)
print(f"Removed orphaned lock ({lock_meta.path})")
continue
Expand Down Expand Up @@ -129,13 +142,15 @@ class AbstractLock:
provides a blueprint for derived classes to implement.
"""

__slots__ = ("db_name", "need_lock", "has_lock", "snapshot", "mode")
__slots__ = ("db_name", "need_lock", "has_lock", "snapshot", "mode", "is_alive" "keep_alive_thread")

db_name: str
need_lock: LockFileMeta
has_lock: LockFileMeta
snapshot: FileLocksSnapshot
mode: str
is_alive: bool
keep_alive_thread: threading.Thread

def __init__(self, db_name: str) -> None:
# Normalize db_name to avoid file naming conflicts
Expand All @@ -147,16 +162,59 @@ def __init__(self, db_name: str) -> None:
self.need_lock = LockFileMeta(dir, self.db_name, t_id, time_ns, "need", self.mode)
self.has_lock = LockFileMeta(dir, self.db_name, t_id, time_ns, "has", self.mode)

self.is_alive = False
self.keep_alive_thread = None

# Ensure lock directory exists
if not os.path.isdir(dir):
os.makedirs(dir, exist_ok=True)

def _keep_alive_thread(self) -> None:
"""
Keep the lock alive by updating the timestamp of the lock file.
"""

current_has_lock_time_ns: int = int(self.has_lock.time_ns)

while self.is_alive:
time.sleep(LOCK_KEEP_ALIVE_TIMEOUT)
if time.time_ns() - current_has_lock_time_ns < ALIVE_LOCK_REFRESH_INTERVAL_NS:
continue

# Assert: The lock is older than ALIVE_LOCK_REFRESH_INTERVAL_NS ns
# This means the has_lock must be refreshed

new_has_lock = self.has_lock.new_with_updated_time()
os_touch(new_has_lock.path)
with contextlib.suppress(FileNotFoundError):
os.unlink(self.has_lock.path) # Remove old lock file
self.has_lock = new_has_lock
current_has_lock_time_ns = int(new_has_lock.time_ns)

def _start_keep_alive_thread(self) -> None:
"""
Start a thread that keeps the lock alive by updating the timestamp of the lock file.
"""

if self.keep_alive_thread is not None:
raise RuntimeError("Keep alive thread already exists.")

self.is_alive = True
self.keep_alive_thread = threading.Thread(target=self._keep_alive_thread, daemon=False)
self.keep_alive_thread.start()

def _lock(self) -> None:
"""Override this method to implement locking mechanism."""
raise NotImplementedError

def _unlock(self) -> None:
"""Remove the lock files associated with this lock."""

if self.keep_alive_thread is not None:
self.is_alive = False
self.keep_alive_thread.join()
self.keep_alive_thread = None

for p in ("need_lock", "has_lock"):
try:
if lock := getattr(self, p, None):
Expand All @@ -169,7 +227,7 @@ def _unlock(self) -> None:
def __enter__(self) -> None:
self._lock()

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # noqa: ANN001
self._unlock()


Expand Down Expand Up @@ -202,9 +260,10 @@ def _lock(self) -> None:
self.has_lock = self.has_lock.new_with_updated_time()
os_touch(self.has_lock.path)
os.unlink(self.need_lock.path)
self._start_keep_alive_thread()
return
time.sleep(SLEEP_TIMEOUT)
if time.time() - start_time > LOCK_TIMEOUT:
if time.time() - start_time > AQUIRE_LOCK_TIMEOUT:
raise RuntimeError("Timeout while waiting for read lock.")
self.snapshot = FileLocksSnapshot(self.need_lock)

Expand Down Expand Up @@ -236,8 +295,9 @@ def _lock(self) -> None:
self.has_lock = self.has_lock.new_with_updated_time()
os_touch(self.has_lock.path)
os.unlink(self.need_lock.path)
self._start_keep_alive_thread()
return
time.sleep(SLEEP_TIMEOUT)
if time.time() - start_time > LOCK_TIMEOUT:
if time.time() - start_time > AQUIRE_LOCK_TIMEOUT:
raise RuntimeError("Timeout while waiting for write lock.")
self.snapshot = FileLocksSnapshot(self.need_lock)
63 changes: 49 additions & 14 deletions tests/test_locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ def test_lock_release():
pass


def test_orphaned_lock_timeout():
prev_timeout = locking.LOCK_TIMEOUT
locking.LOCK_TIMEOUT = 0.1
lock = locking.WriteLock("db_orphaned")
def test_read_lock_release():
read_lock = locking.ReadLock("test_db")
write_lock = locking.WriteLock("test_db")

lock._lock()
time.sleep(0.2)
# Acquire and release a read lock
with read_lock:
pass

# Trigger the removal of orphaned locks
ls = locking.FileLocksSnapshot(lock.need_lock)
assert len(ls.locks) == 0
# Now attempt to acquire a write lock
with write_lock:
assert write_lock.has_lock is not None

locking.LOCK_TIMEOUT = prev_timeout
read_lock._unlock()
write_lock._unlock()


def test_double_lock_exception(use_compression):
Expand Down Expand Up @@ -64,18 +65,52 @@ def test_get_lock_names(use_compression):
lock._unlock()


def test_lock_must_implement_lock_function():
class BadLock(locking.AbstractLock):
mode = "read"

lock = BadLock("db")
with pytest.raises(NotImplementedError):
lock._lock()


def test_remove_orphaned_locks():
prev_config = locking.LOCK_TIMEOUT
locking.LOCK_TIMEOUT = 0.1
# SLEEP_TIMEOUT = 0.001
# LOCK_KEEP_ALIVE_TIMEOUT = 0.001
# REMOVE_ORPHAN_LOCK_TIMEOUT = 20.0 # Duration to wait before considering a lock as orphaned.
# AQUIRE_LOCK_TIMEOUT = 60.0

prev = locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT

locking.AQUIRE_LOCK_TIMEOUT = 10.0
locking.LOCK_KEEP_ALIVE_TIMEOUT = 1.0
locking.REMOVE_ORPHAN_LOCK_TIMEOUT = 0.1
lock = locking.ReadLock("test_remove_orphaned_locks")
lock._lock()

ls = locking.FileLocksSnapshot(lock.need_lock)
assert len(ls.locks) == 1
assert len(ls.locks) >= 1 ## The one lock or two if currently in keep alive handover

time.sleep(0.2)
# Trigger the removal of orphaned locks
ls = locking.FileLocksSnapshot(lock.need_lock)

assert len(ls.locks) == 0
locking.LOCK_TIMEOUT = prev_config

lock._unlock()

locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT = prev


def test_lock_keep_alive():
prev = locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT

locking.LOCK_KEEP_ALIVE_TIMEOUT = 0.1
locking.ALIVE_LOCK_MAX_AGE = 0.5

lock = locking.ReadLock("test_lock_keep_alive")

with lock:
time.sleep(1.0)

locking.AQUIRE_LOCK_TIMEOUT, locking.LOCK_KEEP_ALIVE_TIMEOUT, locking.REMOVE_ORPHAN_LOCK_TIMEOUT = prev

0 comments on commit 15546cd

Please sign in to comment.