utils: add file lock utility
Uses linux flock to create lock files that can be used to protect access across multiple processes. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
parent
02144b472a
commit
683d93a894
|
@ -0,0 +1,110 @@
|
|||
# Async file locking using flock
|
||||
#
|
||||
# Copyright (C) 2024 Eric Callahan <arksine.code@gmail.com>
|
||||
#
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license
|
||||
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import fcntl
|
||||
import errno
|
||||
import logging
|
||||
import pathlib
|
||||
import contextlib
|
||||
import asyncio
|
||||
from . import ServerError
|
||||
from typing import Optional, Type
|
||||
from types import TracebackType
|
||||
|
||||
class LockTimeout(ServerError):
|
||||
pass
|
||||
|
||||
class AsyncExclusiveFileLock(contextlib.AbstractAsyncContextManager):
|
||||
def __init__(
|
||||
self, file_path: pathlib.Path, timeout: int = 0
|
||||
) -> None:
|
||||
self.lock_path = file_path.parent.joinpath(f".{file_path.name}.lock")
|
||||
self.timeout = timeout
|
||||
self.fd: int = -1
|
||||
self.locked: bool = False
|
||||
self.required_wait: bool = False
|
||||
|
||||
async def __aenter__(self) -> AsyncExclusiveFileLock:
|
||||
await self.acquire()
|
||||
return self
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
__exc_type: Optional[Type[BaseException]],
|
||||
__exc_value: Optional[BaseException],
|
||||
__traceback: Optional[TracebackType]
|
||||
) -> None:
|
||||
await self.release()
|
||||
|
||||
def _get_lock(self) -> bool:
|
||||
flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC
|
||||
fd = os.open(str(self.lock_path), flags, 0o644)
|
||||
with contextlib.suppress(PermissionError):
|
||||
os.chmod(fd, 0o644)
|
||||
try:
|
||||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except OSError as err:
|
||||
os.close(fd)
|
||||
if err.errno == errno.ENOSYS:
|
||||
raise
|
||||
return False
|
||||
stat = os.fstat(fd)
|
||||
if stat.st_nlink == 0:
|
||||
# File was deleted before opening and after acquiring
|
||||
# lock, create a new one
|
||||
os.close(fd)
|
||||
return False
|
||||
self.fd = fd
|
||||
return True
|
||||
|
||||
async def acquire(self) -> None:
|
||||
self.required_wait = False
|
||||
if self.timeout < 0:
|
||||
return
|
||||
loop = asyncio.get_running_loop()
|
||||
endtime = loop.time() + self.timeout
|
||||
logged: bool = False
|
||||
while True:
|
||||
try:
|
||||
self.locked = await loop.run_in_executor(None, self._get_lock)
|
||||
except OSError as err:
|
||||
logging.info(
|
||||
"Failed to aquire advisory lock, allowing unlocked entry."
|
||||
f"Error: {err}"
|
||||
)
|
||||
self.locked = False
|
||||
return
|
||||
if self.locked:
|
||||
return
|
||||
self.required_wait = True
|
||||
await asyncio.sleep(.25)
|
||||
if not logged:
|
||||
logged = True
|
||||
logging.info(
|
||||
f"File lock {self.lock_path} is currently acquired by another "
|
||||
"process, waiting for release."
|
||||
)
|
||||
if self.timeout > 0 and endtime >= loop.time():
|
||||
raise LockTimeout(
|
||||
f"Attempt to acquire lock '{self.lock_path}' timed out"
|
||||
)
|
||||
|
||||
def _release_file(self) -> None:
|
||||
with contextlib.suppress(OSError, PermissionError):
|
||||
self.lock_path.unlink(missing_ok=True)
|
||||
with contextlib.suppress(OSError, PermissionError):
|
||||
fcntl.flock(self.fd, fcntl.LOCK_UN)
|
||||
with contextlib.suppress(OSError, PermissionError):
|
||||
os.close(self.fd)
|
||||
|
||||
async def release(self) -> None:
|
||||
if not self.locked:
|
||||
return
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(None, self._release_file)
|
||||
self.locked = False
|
Loading…
Reference in New Issue