Skip to content

Commit b750572

Browse files
committed
upd: convert to direct asyncio locks
1 parent ad82356 commit b750572

File tree

5 files changed

+71
-67
lines changed

5 files changed

+71
-67
lines changed

src/experimaestro/connectors/local.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import os
77
import threading
88
from experimaestro.launcherfinder import LauncherRegistry
9-
import fasteners
9+
from fasteners import InterProcessLock as FastenersInterProcessLock
1010
import psutil
1111

12-
from experimaestro.locking import Lock
12+
from asyncio import Lock
1313

1414
from . import (
1515
Connector,
@@ -93,7 +93,7 @@ def fromspec(connector, spec):
9393

9494

9595
def getstream(redirect: Redirect, write: bool):
96-
if redirect.type == RedirectType.FILE:
96+
if redirect.type == RedirectType.FILE and redirect.path:
9797
return redirect.path.open("w" if write else "r")
9898

9999
if redirect.type == RedirectType.PIPE:
@@ -145,25 +145,35 @@ def start(self, task_mode=False):
145145
return process
146146

147147

148-
class InterProcessLock(fasteners.InterProcessLock, Lock):
148+
class InterProcessLock(FastenersInterProcessLock, Lock):
149149
def __init__(self, path, max_delay=-1):
150-
super().__init__(path)
150+
FastenersInterProcessLock.__init__(self, path)
151151
self.max_delay = max_delay
152152

153153
def __enter__(self):
154154
logger.debug("Locking %s", self.path)
155-
if not super().acquire(blocking=True, max_delay=self.max_delay, timeout=None):
155+
if not FastenersInterProcessLock.acquire(
156+
self, blocking=True, max_delay=self.max_delay, timeout=None
157+
):
156158
raise threading.ThreadError("Could not acquire lock")
157159
logger.debug("Locked %s", self.path)
158160
return self
159161

162+
def __aenter__(self):
163+
# use the synchronous __enter__ method in async context
164+
return self.__enter__()
165+
160166
def __exit__(self, *args):
161167
logger.debug("Unlocking %s", self.path)
162168
super().__exit__(*args)
163169

170+
def __aexit__(self, *args):
171+
# use the synchronous __exit__ method in async context
172+
return self.__exit__(*args)
173+
164174

165175
class LocalConnector(Connector):
166-
INSTANCE: Connector = None
176+
INSTANCE: Optional[Connector] = None
167177

168178
@staticmethod
169179
def instance():
@@ -175,7 +185,7 @@ def instance():
175185
def init_registry(registry: LauncherRegistry):
176186
pass
177187

178-
def __init__(self, localpath: Path = None):
188+
def __init__(self, localpath: Optional[Path] = None):
179189
localpath = localpath
180190
if not localpath:
181191
localpath = Path(
@@ -200,7 +210,7 @@ def createtoken(self, name: str, total: int) -> Token:
200210
def processbuilder(self) -> ProcessBuilder:
201211
return LocalProcessBuilder()
202212

203-
def resolve(self, path: Path, basepath: Path = None) -> str:
213+
def resolve(self, path: Path, basepath: Optional[Path] = None) -> str:
204214
assert isinstance(path, PosixPath) or isinstance(
205215
path, WindowsPath
206216
), f"Unrecognized path {type(path)}"

src/experimaestro/connectors/ssh.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io
1010
import os
1111
import re
12+
from asyncio import Lock
1213
from experimaestro.launcherfinder import LauncherRegistry
1314
from urllib.parse import urlparse
1415
from itertools import chain
@@ -19,7 +20,6 @@
1920
RedirectType,
2021
Redirect,
2122
)
22-
from experimaestro.locking import Lock
2323
from experimaestro.tokens import Token
2424

2525
try:

src/experimaestro/locking.py

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,42 @@
1-
from experimaestro.utils.asyncio import asyncThreadcheck
1+
from asyncio import Lock
22
from .utils import logger
33

44

5-
class Lock:
6-
"""A lock"""
7-
8-
def __init__(self):
9-
self._level = 0
10-
self.detached = False
11-
12-
def detach(self):
13-
self.detached = True
14-
15-
def acquire(self):
16-
if self._level == 0:
17-
self._level += 1
18-
self._acquire()
19-
return self
20-
21-
def release(self):
22-
if not self.detached and self._level == 1:
23-
self._level -= 1
24-
self._release()
25-
26-
def __enter__(self):
27-
self.acquire()
28-
return self
29-
30-
def __exit__(self, *args):
31-
self.release()
32-
33-
async def __aenter__(self):
34-
return await asyncThreadcheck("lock (aenter)", self.__enter__)
35-
36-
async def __aexit__(self, *args):
37-
return await asyncThreadcheck("lock (aexit)", self.__exit__, *args)
38-
39-
def _acquire(self):
40-
raise NotImplementedError()
41-
42-
def _release(self):
43-
raise NotImplementedError()
44-
45-
465
class LockError(Exception):
476
pass
487

498

509
class Locks(Lock):
51-
"""A set of locks"""
10+
"""A set of locks that can be acquired/released together"""
5211

5312
def __init__(self):
5413
super().__init__()
5514
self.locks = []
5615

5716
def append(self, lock):
17+
"""Add a lock to the collection"""
5818
self.locks.append(lock)
5919

60-
def _acquire(self):
61-
for lock in self.locks:
62-
lock.acquire()
20+
async def acquire(self):
21+
"""Acquire all locks in order"""
22+
if not self.locked():
23+
for lock in self.locks:
24+
await lock.acquire()
25+
self._acquired = True
26+
await super().acquire()
27+
return self
28+
29+
def release(self):
30+
"""Release all locks in reverse order"""
31+
if self.locked():
32+
# if not self.detached and self._acquired:
33+
logger.debug("Releasing %d locks", len(self.locks))
34+
# Release in reverse order to prevent deadlocks
35+
for lock in reversed(self.locks):
36+
logger.debug("[locks] Releasing %s", lock)
37+
lock.release()
38+
super().release()
6339

64-
def _release(self):
65-
logger.debug("Releasing %d locks", len(self.locks))
66-
for lock in self.locks:
67-
logger.debug("[locks] Releasing %s", lock)
68-
lock.release()
40+
async def __aenter__(self):
41+
await super().__aenter__()
42+
return self

src/experimaestro/scheduler/jobs.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,18 @@ def __init__(self, job):
7474
super().__init__()
7575
self.job = job
7676

77-
def _acquire(self):
77+
# def _acquire(self):
78+
# return self.job.state == JobState.DONE
79+
80+
async def acquire(self):
81+
await super().acquire()
7882
return self.job.state == JobState.DONE
7983

80-
def _release(self):
84+
# def _release(self):
85+
# return False
86+
87+
def release(self):
88+
super().release()
8189
return False
8290

8391

@@ -299,9 +307,9 @@ async def aio_start(self, sched_dependency_lock, notification_server=None):
299307
# We first lock the job before proceeding
300308
assert self.launcher is not None
301309

302-
with Locks() as locks:
310+
async with Locks() as locks:
303311
logger.debug("[starting] Locking job %s", self)
304-
async with self.launcher.connector.lock(self.lockpath):
312+
with self.launcher.connector.lock(self.lockpath):
305313
logger.debug("[starting] Locked job %s", self)
306314

307315
state = None
@@ -317,7 +325,10 @@ async def aio_start(self, sched_dependency_lock, notification_server=None):
317325
async with sched_dependency_lock:
318326
for dependency in self.dependencies:
319327
try:
320-
locks.append(dependency.lock().acquire())
328+
lock = dependency.lock()
329+
await lock.acquire()
330+
# locks.append(dependency.lock().acquire())
331+
locks.append(lock)
321332
except LockError:
322333
logger.warning(
323334
"Could not lock %s, aborting start for job %s",

src/experimaestro/tokens.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212
import threading
1313
import os.path
1414
from watchdog.events import FileSystemEventHandler
15+
from asyncio import Lock
1516
from typing import Dict
1617
from experimaestro.launcherfinder.base import TokenConfiguration
1718

1819
from experimaestro.launcherfinder.registry import LauncherRegistry
1920

2021
from .ipc import ipcom
21-
from .locking import Lock, LockError
22+
from .locking import LockError
2223
from .scheduler.dependencies import Dependency, DependencyStatus, Resource
2324
import logging
2425
import json
@@ -51,9 +52,17 @@ def __init__(self, dependency: "CounterTokenDependency"):
5152
def _acquire(self):
5253
self.dependency.token.acquire(self.dependency)
5354

55+
async def acquire(self):
56+
self._acquire()
57+
return await super().acquire()
58+
5459
def _release(self):
5560
self.dependency.token.release(self.dependency)
5661

62+
def release(self):
63+
self._release()
64+
return super().release()
65+
5766
def __str__(self):
5867
return "Lock(%s)" % self.dependency
5968

0 commit comments

Comments
 (0)