util.py 780 Bytes
Newer Older
1
from logging import getLogger
segfault's avatar
segfault committed
2
import threading
3 4 5
import fcntl
from contextlib import contextmanager
from typing import TextIO
segfault's avatar
segfault committed
6
from pathlib import Path
7

segfault's avatar
segfault committed
8 9 10
from gi.repository import GLib


11 12 13
logger = getLogger(__name__)


segfault's avatar
segfault committed
14 15 16
def process_mainloop_events():
    context = GLib.MainLoop().get_context()
    while context.pending():
17 18 19 20
        context.iteration()


@contextmanager
segfault's avatar
segfault committed
21 22
def open_locked(path: Path, *args, **kwargs) -> TextIO:
    with path.open(*args, **kwargs) as f:
23
        try:
segfault's avatar
segfault committed
24
            logger.log(5, "Acquiring file lock on %s", path)
25
            fcntl.flock(f, fcntl.LOCK_EX)
segfault's avatar
segfault committed
26
            logger.log(5, "Acquired file lock on %s", path)
27 28 29 30

            yield f

        finally:
segfault's avatar
segfault committed
31
            logger.log(5, "Releasing file lock on %s", path)
32
            fcntl.flock(f, fcntl.LOCK_UN)