diff --git a/README.md b/README.md index b471794..cc83795 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,17 @@ you also may have to setup some [mules](https://uwsgi-docs.readthedocs.org/en/la ## Installation -Simple execute `pip install uwsgi_tasks` +It's recommended to use `uv` for installation: + +```bash +uv pip install uwsgi-tasks +``` + +Alternatively, you can use `pip`: + +```bash +pip install uwsgi-tasks +``` ## Usage @@ -78,14 +88,14 @@ from uwsgi_tasks import task, TaskExecutor, SPOOL_OK, SPOOL_RETRY def task_wrapper(func): @wraps(func) # required! def _inner(*args, **kwargs): - print 'Task started with parameters:', args, kwargs + print(f'Task started with parameters: {args} {kwargs}') try: func(*args, **kwargs) except Exception as ex: # example - print 'Exception is occurred', ex, 'repeat the task' + print(f'Exception is occurred: {ex}, repeat the task') return SPOOL_RETRY - print 'Task ended', func + print(f'Task ended: {func}') return SPOOL_OK return _inner @@ -93,7 +103,7 @@ def task_wrapper(func): @task(executor=TaskExecutor.SPOOLER, retry_count=3, retry_timeout=5) @task_wrapper def spooler_task(text): - print 'Hello, spooler! text =', text + print(f'Hello, spooler! text = {text}') raise Exception('Sorry, task failed!') ``` @@ -224,25 +234,25 @@ def print_every_5_seconds(signal_number): Keep in mind: task is created on initialization. """ - print 'Task for signal', signal_number + print(f'Task for signal {signal_number}') @timer(seconds=5, iterations=3, target='workers') def print_every_5_seconds(signal_number): """Prints string every 5 seconds 3 times""" - print 'Task with iterations for signal', signal_number + print(f'Task with iterations for signal {signal_number}') @timer_lazy(seconds=5) def print_every_5_seconds_after_call(signal_number): """Prints string every 5 seconds""" - print 'Lazy task for signal', signal_number + print(f'Lazy task for signal {signal_number}') @cron(minute=-2) def print_every_2_minutes(signal_number): - print 'Cron task:', signal_number + print(f'Cron task: {signal_number}') @cron_lazy(minute=-2, target='mule') def print_every_2_minutes_after_call(signal_number): - print 'Cron task:', signal_number + print(f'Cron task: {signal_number}') ... @@ -310,9 +320,9 @@ from uwsgi_tasks import timer_lazy @timer_lazy(target='worker') def run_me_periodically(signal): - print('Running with signal:', signal) + print(f'Running with signal: {signal}') def my_view(request): run_me_periodically.add_setup(seconds=10, iterations=2) run_me_periodically() -``` +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..97a16ba --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,33 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "uwsgi-tasks" +version = "0.8.0" +authors = [ + { name="Oleg Churkin", email="bahusoff@gmail.com" }, +] +description = "Asynchronous tasks management with UWSGI server" +readme = "README.md" +license = { file="LICENSE" } +requires-python = ">=3.8" +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Environment :: Web Environment", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Software Development :: Libraries :: Python Modules", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", +] +dependencies = [ + "uwsgi", +] + +[project.urls] +"Homepage" = "https://github.com/Bahus/uwsgi_tasks" +"Bug Tracker" = "https://github.com/Bahus/uwsgi_tasks/issues" \ No newline at end of file diff --git a/setup.py b/setup.py index 96c9f49..d5369b9 100644 --- a/setup.py +++ b/setup.py @@ -1,26 +1,15 @@ -# -*- coding: utf-8 -*- -from __future__ import print_function import io -import sys from setuptools import setup - -if sys.argv[-1] == 'test': - # python-mock is required to run unit-tests - import unittest - unittest.main('uwsgi_tasks.tests', argv=sys.argv[:-1]) - - def get_long_description(): with io.open('./README.md', encoding='utf-8') as f: readme = f.read() return readme - setup( name='uwsgi-tasks', packages=['uwsgi_tasks'], - version='0.7.3', + version='0.8.0', description='Asynchronous tasks management with UWSGI server', author='Oleg Churkin', author_email='bahusoff@gmail.com', @@ -33,14 +22,16 @@ def get_long_description(): 'Environment :: Web Environment', 'Operating System :: OS Independent', 'Programming Language :: Python', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', 'Topic :: Software Development :: Libraries :: Python Modules', 'Intended Audience :: Developers', 'License :: OSI Approved :: MIT License' ], long_description=get_long_description(), long_description_content_type='text/markdown', - requires=['uwsgi', 'six'], - install_requires=['six'], -) + requires=['uwsgi'], + install_requires=[], +) \ No newline at end of file diff --git a/tox.ini b/tox.ini index c7a6963..0296a5a 100644 --- a/tox.ini +++ b/tox.ini @@ -1,9 +1,10 @@ -# pip install tox tox-pyenv +# pip install tox tox-uv [tox] -envlist = {py27,py36,py37} +runner = uv +envlist = py311, py312, py313 [testenv] commands= - python setup.py test + python -m unittest uwsgi_tasks.tests deps = - mock + mock \ No newline at end of file diff --git a/uwsgi_tasks/__init__.py b/uwsgi_tasks/__init__.py index db45556..84cd9f2 100644 --- a/uwsgi_tasks/__init__.py +++ b/uwsgi_tasks/__init__.py @@ -1,7 +1,18 @@ -# -*- coding: utf-8 -*- from uwsgi_tasks.tasks import ( Task, SignalTask, TimerTask, CronTask, TaskExecutor, set_uwsgi_callbacks, RetryTaskException, SPOOL_OK, SPOOL_RETRY, SPOOL_IGNORE, get_current_task ) from uwsgi_tasks.utils import django_setup -from uwsgi_tasks.decorators import * +from uwsgi_tasks.decorators import ( + task, timer, timer_lazy, cron, cron_lazy, rbtimer, rbtimer_lazy, signal, signal_lazy +) + +__all__ = [ + # tasks + 'Task', 'SignalTask', 'TimerTask', 'CronTask', 'TaskExecutor', 'set_uwsgi_callbacks', + 'RetryTaskException', 'SPOOL_OK', 'SPOOL_RETRY', 'SPOOL_IGNORE', 'get_current_task', + # utils + 'django_setup', + # decorators + 'task', 'timer', 'timer_lazy', 'cron', 'cron_lazy', 'rbtimer', 'rbtimer_lazy', 'signal', 'signal_lazy', +] \ No newline at end of file diff --git a/uwsgi_tasks/decorators.py b/uwsgi_tasks/decorators.py index f7f9176..a073ad4 100644 --- a/uwsgi_tasks/decorators.py +++ b/uwsgi_tasks/decorators.py @@ -1,61 +1,123 @@ -# -*- coding: utf-8 -*- -from uwsgi_tasks.tasks import Task, TaskExecutor, TimerTask, CronTask +from __future__ import annotations -__all__ = ('task', 'timer', 'timer_lazy', 'cron', 'cron_lazy') +from typing import Callable, Any, TypeVar, Optional +from .tasks import Task, TaskExecutor, TimerTask, CronTask, SignalTask -def task(func=None, executor=TaskExecutor.AUTO, **setup): +CallableT = TypeVar('CallableT', bound=Callable[..., Any]) - def create_task(function): - return Task(function, executor, **setup) - if callable(func): - return create_task(func) - else: - return lambda f: create_task(f) +def task( + func: Optional[CallableT] = None, + executor: int = TaskExecutor.AUTO, + **setup: Any, +) -> Callable[[CallableT], Task] | Task: + """A decorator to create a task that can be executed asynchronously.""" + def create_task(function: CallableT) -> Task: + return Task(function, executor, **setup) -def timer(func=None, seconds=0, iterations=None, - executor=TaskExecutor.AUTO, **setup): - """Create timer on initialization""" + if func is None: + return create_task + return create_task(func) + + +def timer( + func: Optional[CallableT] = None, + seconds: int = 0, + iterations: Optional[int] = None, + executor: int = TaskExecutor.AUTO, + **setup: Any, +) -> Callable[[CallableT], TimerTask] | TimerTask: + """A decorator to create a timer task that runs on initialization.""" return timer_lazy( func=func, seconds=seconds, iterations=iterations, executor=executor, run=True, - **setup + **setup, ) -def timer_lazy(func=None, seconds=0, iterations=None, - executor=TaskExecutor.AUTO, run=False, **setup): - """Create task on execution""" +def timer_lazy( + func: Optional[CallableT] = None, + seconds: int = 0, + iterations: Optional[int] = None, + executor: int = TaskExecutor.AUTO, + run: bool = False, + **setup: Any, +) -> Callable[[CallableT], TimerTask] | TimerTask: + """A decorator to create a timer task that can be executed on demand.""" - def inner(function): + def inner(function: CallableT) -> TimerTask: timer_task = TimerTask( function=function, executor=executor, seconds=seconds, iterations=iterations, - **setup + **setup, ) if run: return timer_task() - else: - timer_task.register_signal() - return timer_task + + timer_task.register_signal() + return timer_task if func is None: return inner - return inner(func) -def cron(func=None, minute=-1, hour=-1, day=-1, month=-1, dayweek=-1, - executor=TaskExecutor.AUTO, **setup): - """Creates cron-like task on initialization""" +def rbtimer( + func: Optional[CallableT] = None, + seconds: int = 0, + iterations: int = 1, + executor: int = TaskExecutor.AUTO, + **setup: Any, +) -> Callable[[CallableT], TimerTask] | TimerTask: + """A decorator to create a red-black timer task that runs on initialization.""" + return rbtimer_lazy( + func=func, + seconds=seconds, + iterations=iterations, + executor=executor, + run=True, + **setup, + ) + + +def rbtimer_lazy( + func: Optional[CallableT] = None, + seconds: int = 0, + iterations: int = 1, + executor: int = TaskExecutor.AUTO, + run: bool = False, + **setup: Any, +) -> Callable[[CallableT], TimerTask] | TimerTask: + """A decorator to create a red-black timer task that can be executed on demand.""" + return timer_lazy( + func=func, + seconds=seconds, + iterations=iterations, + executor=executor, + run=run, + **setup, + ) + + +def cron( + func: Optional[CallableT] = None, + minute: int = -1, + hour: int = -1, + day: int = -1, + month: int = -1, + dayweek: int = -1, + executor: int = TaskExecutor.AUTO, + **setup: Any, +) -> Callable[[CallableT], CronTask] | CronTask: + """A decorator to create a cron-like task that runs on initialization.""" return cron_lazy( func=func, minute=minute, @@ -65,15 +127,24 @@ def cron(func=None, minute=-1, hour=-1, day=-1, month=-1, dayweek=-1, dayweek=dayweek, executor=executor, run=True, - **setup + **setup, ) -def cron_lazy(func=None, minute=-1, hour=-1, day=-1, month=-1, dayweek=-1, - executor=TaskExecutor.AUTO, run=False, **setup): - """Creates cron-like task on execution""" - - def inner(function): +def cron_lazy( + func: Optional[CallableT] = None, + minute: int = -1, + hour: int = -1, + day: int = -1, + month: int = -1, + dayweek: int = -1, + executor: int = TaskExecutor.AUTO, + run: bool = False, + **setup: Any, +) -> Callable[[CallableT], CronTask] | CronTask: + """A decorator to create a cron-like task that can be executed on demand.""" + + def inner(function: CallableT) -> CronTask: cron_task = CronTask( function=function, executor=executor, @@ -82,15 +153,53 @@ def inner(function): day=day, month=month, dayweek=dayweek, - **setup + **setup, ) if run: return cron_task() - else: - return cron_task + return cron_task if func is None: return inner - return inner(func) + + +def signal( + func: Optional[CallableT] = None, + executor: int = TaskExecutor.AUTO, + **setup: Any, +) -> Callable[[CallableT], SignalTask] | SignalTask: + """A decorator to create a signal task that runs on initialization.""" + return signal_lazy( + func=func, + executor=executor, + run=True, + **setup, + ) + + +def signal_lazy( + func: Optional[CallableT] = None, + executor: int = TaskExecutor.AUTO, + run: bool = False, + **setup: Any, +) -> Callable[[CallableT], SignalTask] | SignalTask: + """A decorator to create a signal task that can be executed on demand.""" + + def inner(function: CallableT) -> SignalTask: + signal_task = SignalTask( + function=function, + executor=executor, + **setup, + ) + + if run: + return signal_task() + + signal_task.register_signal() + return signal_task + + if func is None: + return inner + return inner(func) \ No newline at end of file diff --git a/uwsgi_tasks/tasks.py b/uwsgi_tasks/tasks.py index a407e8c..f70a036 100644 --- a/uwsgi_tasks/tasks.py +++ b/uwsgi_tasks/tasks.py @@ -1,197 +1,203 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals -import inspect +from __future__ import annotations + +import calendar import collections +import inspect import logging +import os +import pickle import threading import traceback -import calendar +import warnings from contextlib import contextmanager from datetime import datetime, timedelta +from enum import IntEnum +from typing import ( + Any, + Callable, + Dict, + Generator, + Optional, + Tuple, + Type, + TypeVar, + Union, +) -import os -import six -import warnings from uwsgi_tasks.utils import ( - import_by_path, - get_function_path, ProxyDict, + get_function_path, + import_by_path, ) - -try: - # noinspection PyPep8Naming - from six.moves import cPickle as pickle -except ImportError: - import pickle - - +# --- uWSGI import and setup --- try: import uwsgi + if uwsgi.masterpid() == 0: - warnings.warn('Uwsgi master process must be enabled', RuntimeWarning) + warnings.warn('uWSGI master process must be enabled for uwsgi_tasks to work.', RuntimeWarning) uwsgi = None - - SPOOL_OK = uwsgi.SPOOL_OK - SPOOL_IGNORE = uwsgi.SPOOL_IGNORE - SPOOL_RETRY = uwsgi.SPOOL_RETRY except ImportError: uwsgi = None - SPOOL_OK = -2 - SPOOL_IGNORE = 0 - SPOOL_RETRY = -1 - -# maximum message size for mule and spooler (64Kb) -# see uwsgi settings `mule-msg-size` -UWSGI_MAXIMUM_MESSAGE_SIZE = 64 * 1024 +# --- Spooler constants --- +SPOOL_OK: int = getattr(uwsgi, 'SPOOL_OK', -2) +SPOOL_IGNORE: int = getattr(uwsgi, 'SPOOL_IGNORE', 0) +SPOOL_RETRY: int = getattr(uwsgi, 'SPOOL_RETRY', -1) - -# storage for saved tasks -# often it's not required since tasks will be imported by path +# --- Globals --- +UWSGI_MAXIMUM_MESSAGE_SIZE: int = 64 * 1024 saved_tasks = threading.local() - -# logger logger = logging.getLogger('uwsgi_tasks') +# --- Type Variables --- +CallableT = TypeVar('CallableT', bound=Callable[..., Any]) +BaseTaskT = TypeVar('BaseTaskT', bound='BaseTask') + + +# --- Helper Functions --- -def serialize(content): +def serialize(content: Any) -> bytes: + """Serialize content using pickle.""" return pickle.dumps(content) -def deserialize(serialized): +def deserialize(serialized: bytes) -> Any: + """Deserialize content using pickle.""" return pickle.loads(serialized) -def load_function(func_name): +def load_function(func_name: str) -> Callable[..., Any]: + """Load a function by its dotted path, with a fallback to a local cache.""" func = getattr(saved_tasks, func_name, None) if func: return func return import_by_path(func_name) -def manage_spool_request(message): +def manage_spool_request(message: Dict[bytes, bytes]) -> int: + """uWSGI spooler callback. Deserializes a task and executes it.""" logger.debug('Processing request for spooler') try: task = SpoolerTask.extract_from_message(message) if task: return task.execute_now() - except: + except Exception: traceback.print_exc() logger.exception('Spooler message ignored: "%s"', message) - return SPOOL_OK + return SPOOL_OK -def manage_mule_request(message): +def manage_mule_request(message: bytes) -> Any: + """uWSGI mule message hook. Deserializes a task and executes it.""" logger.debug('Processing request for mule') task = MuleTask.extract_from_message(message) if task: return task.execute_now() + return None -def get_free_signal(): +def get_free_signal() -> Optional[int]: + """Finds and returns the number of an available uWSGI signal.""" if not uwsgi: return None - - for signum in range(0, 256): + for signum in range(256): if not uwsgi.signal_registered(signum): return signum - - raise Exception('No free uwsgi signal available') + raise RuntimeError('No free uWSGI signal available.') -def set_uwsgi_callbacks(): +def set_uwsgi_callbacks() -> None: + """Sets the uWSGI callbacks for the spooler and mule messages.""" if uwsgi: uwsgi.spooler = manage_spool_request uwsgi.mule_msg_hook = manage_mule_request + set_uwsgi_callbacks() -def get_current_task(): - """ Introspection API: allows to get current task instance in - called function. - """ +def get_current_task() -> Optional[BaseTask]: + """Introspection API: allows getting the current task instance from within the called function.""" stack = inspect.stack() try: - caller_name = stack[1][3] - caller_task = stack[1][0].f_globals[caller_name] + frame = stack[1][0] + caller_name = frame.f_code.co_name + caller_task_factory = frame.f_globals.get(caller_name) + if caller_task_factory and hasattr(caller_task_factory, 'function'): + return getattr(caller_task_factory.function, BaseTask.attr_name, None) except (IndexError, KeyError): logger.exception('get_current_task failed') - return None + return None - return getattr(caller_task.function, BaseTask.attr_name, None) +# --- Exceptions --- class RetryTaskException(Exception): - """ Throw this exception to re-execute spooler task """ + """Throw this exception to re-execute a spooler task.""" - def __init__(self, count=None, timeout=None): + def __init__(self, count: Optional[int] = None, timeout: Optional[int] = None): self.count = count self.timeout = timeout + super().__init__(f"Retrying task (count={count}, timeout={timeout})") -class TaskExecutor: +# --- Task Executors --- + +class TaskExecutor(IntEnum): AUTO = 1 SPOOLER = 2 MULE = 3 RUNTIME = 4 -class BaseTask(object): - attr_name = 'uwsgi_task' - executor = None +# --- Base Task Classes --- - def __init__(self, function, **setup): - self._function = None +class BaseTask: + attr_name: str = 'uwsgi_task' + executor: Optional[TaskExecutor] = None - if isinstance(function, six.string_types): - self.function_name = function + def __init__(self, function: Union[Callable[..., Any], str], **setup: Any): + self._function: Optional[Callable[..., Any]] = None + if isinstance(function, str): + self.function_name: str = function elif callable(function): self._function = function self.function_name = get_function_path(function) else: - raise TypeError('Callable or dotted path must be provided ' - 'as first argument') + raise TypeError('A callable or a dotted import path must be provided.') - self.args = setup.pop('args', ()) - self.kwargs = setup.pop('kwargs', {}) - self.setup = setup or {} + self.args: Tuple[Any, ...] = setup.pop('args', ()) + self.kwargs: Dict[str, Any] = setup.pop('kwargs', {}) + self.setup: Dict[str, Any] = setup or {} self.setup.setdefault('working_dir', os.getcwd()) - self._buffer = None + self._buffer: Optional[ProxyDict] = None - def __repr__(self): - return u''.format( - self.__class__.__name__, self.function_name, self.args, self.kwargs - ) + def __repr__(self) -> str: + return f'' @contextmanager - def set_working_dir(self): - """ Spooler uses its own working dir, so it's better to change cwd - to initial project dir, cause some path resolution mechanic - in Django depends on it, see #3. - """ - current_dir = os.getcwd() + def set_working_dir(self) -> Generator[None, None, None]: + """A context manager to temporarily change the working directory.""" working_dir = self.setup.get('working_dir') - if working_dir and os.path.exists(working_dir): + current_dir = os.getcwd() os.chdir(working_dir) - - try: + try: + yield + finally: + os.chdir(current_dir) + else: yield - finally: - os.chdir(current_dir) - def __call__(self, *args, **kwargs): + def __call__(self, *args: Any, **kwargs: Any) -> Any: if not uwsgi and self.executor != TaskExecutor.RUNTIME: - return - + return None self.args = args self.kwargs = kwargs - return self.execute_async() - def __getstate__(self): + def __getstate__(self) -> Dict[str, Any]: return { 'function_name': self.function_name, 'args': self.args, @@ -199,106 +205,104 @@ def __getstate__(self): 'setup': self.setup, } - def __setstate__(self, state): + def __setstate__(self, state: Dict[str, Any]) -> None: self.__dict__.update(state) self._function = None self._buffer = None @property - def buffer(self): - if not self._buffer: + def buffer(self) -> ProxyDict: + """A dict-like object to pass data between task execution attempts.""" + if self._buffer is None: self._buffer = ProxyDict(self.setup, 'buffer') - return self._buffer @property - def function(self): + def function(self) -> Callable[..., Any]: + """The actual callable function for this task.""" if self._function is None: self._function = load_function(self.function_name) - return self._function - def add_setup(self, **kwargs): + def add_setup(self, **kwargs: Any) -> None: + """Add or update setup parameters for the task.""" self.setup.update(kwargs) - def execute_now(self): + def execute_now(self) -> Any: + """Executes the task synchronously in the current process.""" logger.info('Executing %r', self) setattr(self.function, self.attr_name, self) with self.set_working_dir(): return self.function(*self.args, **self.kwargs) - def execute_async(self): + def execute_async(self) -> Any: + """Executes the task asynchronously.""" raise NotImplementedError() @classmethod - def create(cls, **kwargs): + def create(cls: Type[BaseTaskT], **kwargs: Any) -> Optional[BaseTaskT]: + """Creates an instance of the task.""" return cls(**kwargs) +# --- Concrete Task Implementations --- + class RuntimeTask(BaseTask): + """A task that runs immediately in the same process.""" executor = TaskExecutor.RUNTIME - def execute_now(self): + def execute_now(self) -> Any: serialize(self.args) serialize(self.kwargs) - super(RuntimeTask, self).execute_now() + return super().execute_now() - def execute_async(self): + def execute_async(self) -> Any: return self.execute_now() class MuleTask(BaseTask): - """ - Note: task function may not be initialized - mule be able to - import it successfully. - """ + """A task to be executed by a uWSGI mule.""" executor = TaskExecutor.MULE - mule_task_id = 'uwsgi_tasks.mule_task' - default_mule_id = 0 + mule_task_id: str = 'uwsgi_tasks.mule_task' + default_mule_id: int = 0 - def get_message_content(self): - # TODO: cache? + def get_message_content(self) -> bytes: return serialize({self.mule_task_id: self}) @classmethod - def extract_from_message(cls, message): + def extract_from_message(cls: Type[MuleTask], message: bytes) -> Optional[MuleTask]: msg = deserialize(message) - mule_task = msg.get(cls.mule_task_id) + return msg.get(cls.mule_task_id) - if mule_task is None: + def execute_async(self) -> Optional[int]: + if not uwsgi: return None - - return mule_task - - def execute_async(self): mule_id = self.setup.get('mule', self.default_mule_id) return uwsgi.mule_msg(self.get_message_content(), mule_id) @classmethod - def create(cls, **kwargs): - if 'mule' not in uwsgi.opt and 'mules' not in uwsgi.opt: + def create(cls: Type[MuleTask], **kwargs: Any) -> Optional[MuleTask]: + if not uwsgi or ('mule' not in uwsgi.opt and 'mules' not in uwsgi.opt): return None - self = cls(**kwargs) - - if len(self.get_message_content()) > UWSGI_MAXIMUM_MESSAGE_SIZE: + instance = cls(**kwargs) + if len(instance.get_message_content()) > UWSGI_MAXIMUM_MESSAGE_SIZE: + logger.warning("Task %r is too large for a mule message.", instance) return None - - return self + return instance class SpoolerTask(BaseTask): + """A task to be executed by a uWSGI spooler.""" executor = TaskExecutor.SPOOLER - spooler_default_arguments = ( - 'message_dict', 'spooler', 'priority', 'at' - ) + spooler_default_arguments: Tuple[str, ...] = ('message_dict', 'spooler', 'priority', 'at') - def __init__(self, function, **setup): - super(SpoolerTask, self).__init__(function, **setup) + def __init__(self, function: Union[Callable[..., Any], str], **setup: Any): + super().__init__(function, **setup) self.retry_count = self.setup.pop('retry_count', None) self.retry_timeout = self.setup.pop('retry_timeout', None) - def get_message_content(self): + def get_message_content(self) -> Dict[bytes, bytes]: base_message_dict = self.__getstate__() base_message_dict['setup'] = serialize(self.setup) @@ -306,258 +310,192 @@ def get_message_content(self): if key in self.setup: base_message_dict[key] = self.setup[key] - # datetime and timedelta conversion at = base_message_dict.get('at') - - if at: - if isinstance(at, timedelta): - at += datetime.utcnow() - - if isinstance(at, datetime): - at = calendar.timegm(at.timetuple()) - - base_message_dict['at'] = str(at) + if isinstance(at, timedelta): + at = datetime.utcnow() + at + if isinstance(at, datetime): + base_message_dict['at'] = str(calendar.timegm(at.timetuple())) logger.debug('Spooler base parameters: "%r"', base_message_dict) message_dict = base_message_dict.copy() - message_dict.update({ - 'args': serialize(self.args), - 'kwargs': serialize(self.kwargs) - }) + message_dict.update({'args': serialize(self.args), 'kwargs': serialize(self.kwargs)}) if len(repr(message_dict)) >= UWSGI_MAXIMUM_MESSAGE_SIZE: - # message too long for spooler - we have to use `body` parameter message_dict = base_message_dict message_dict.update({ 'args': serialize(()), 'kwargs': serialize({}), - 'body': serialize({ - 'args': self.args, - 'kwargs': self.kwargs, - }) + 'body': serialize({'args': self.args, 'kwargs': self.kwargs}), }) return self._encode_message(message_dict) @staticmethod - def _encode_message(message_dict): - """ Encode message key, since spooler accept only bytes literals """ - - def encoder(s): - try: - if isinstance(s, six.text_type): - return s.encode() - except UnicodeEncodeError: - pass - return s - - return {encoder(k): encoder(v) for k, v in six.iteritems(message_dict)} + def _encode_message(message_dict: Dict[str, Any]) -> Dict[bytes, bytes]: + encoded = {} + for k, v in message_dict.items(): + key_bytes = str(k).encode() + if isinstance(v, str): + value_bytes = v.encode() + elif isinstance(v, bytes): + value_bytes = v + else: + value_bytes = str(v).encode() # Fallback for other types like int + encoded[key_bytes] = value_bytes + return encoded @staticmethod - def _decode_message(message_dict): - - def decoder(s, decode=True): - try: - if decode and isinstance(s, six.binary_type): - return s.decode() - except UnicodeDecodeError: - pass - return s - - return { - decoder(k): decoder(v, k in {b'function_name'}) - for k, v in six.iteritems(message_dict) - } + def _decode_message(message_dict: Dict[bytes, bytes]) -> Dict[str, Union[str, bytes]]: + decoded = {} + for k, v in message_dict.items(): + key_str = k.decode() + if key_str == 'function_name': + decoded[key_str] = v.decode() + else: + decoded[key_str] = v + return decoded - def execute_async(self): + def execute_async(self) -> Optional[int]: + if not uwsgi: + return None return uwsgi.spool(self.get_message_content()) - def execute_now(self): + def execute_now(self) -> int: try: - result = super(SpoolerTask, self).execute_now() - except RetryTaskException as ex: - # let's retry the task - return self.retry(count=ex.count, timeout=ex.timeout) - - spool_return = self.setup.get('spooler_return') - - if result == SPOOL_RETRY and not spool_return: - return self.retry() - - if not spool_return: - return SPOOL_OK + result = super().execute_now() + except RetryTaskException as e: + return self.retry(count=e.count, timeout=e.timeout) + spooler_return = self.setup.get('spooler_return', False) + if not spooler_return: + return SPOOL_RETRY if result == SPOOL_RETRY else SPOOL_OK return result @property - def retry_count(self): - return self.setup.get('retry_count') + def retry_count(self) -> int: + return self.setup.get('retry_count', 0) @retry_count.setter - def retry_count(self, value): - if value is None: - value = 0 - - if not isinstance(value, int): - raise TypeError('retry_count must be integer ' - 'got {}'.format(type(value))) - self.setup['retry_count'] = value + def retry_count(self, value: Optional[int]): + self.setup['retry_count'] = int(value or 0) @property - def retry_timeout(self): + def retry_timeout(self) -> Optional[int]: return self.setup.get('retry_timeout') @retry_timeout.setter - def retry_timeout(self, value): - if not value: - return - + def retry_timeout(self, value: Optional[Union[int, timedelta]]): if isinstance(value, timedelta): - value = value.seconds - elif not isinstance(value, int): - raise TypeError('retry_timeout must be integer or timedelta' - ' got {}'.format(type(value))) - - self.setup['retry_timeout'] = value + self.setup['retry_timeout'] = value.seconds + elif value is not None: + self.setup['retry_timeout'] = int(value) @property - def at(self): + def at(self) -> Optional[Union[int, datetime, timedelta]]: return self.setup.get('at') @at.setter - def at(self, value): + def at(self, value: Optional[Union[int, datetime, timedelta]]): self.setup['at'] = value - def retry(self, count=None, timeout=None): - retry_count = count or self.retry_count - retry_timeout = timeout or self.retry_timeout or 0 - - if retry_count is not None: - if retry_count > 1: - # retry task - self.retry_timeout = retry_timeout - self.retry_count = retry_count - 1 - self.at = timedelta(seconds=self.retry_timeout) - self.execute_async() - else: - logger.info('Stop retrying for %r', self) - + def retry(self, count: Optional[int] = None, timeout: Optional[int] = None) -> int: + retry_count = count if count is not None else self.retry_count + if retry_count > 1: + self.retry_timeout = timeout if timeout is not None else self.retry_timeout + self.retry_count = retry_count - 1 + self.at = timedelta(seconds=self.retry_timeout or 0) + self.execute_async() + else: + logger.info('Stop retrying for %r', self) return SPOOL_OK @classmethod - def extract_from_message(cls, message): - message = cls._decode_message(message) - func_name = message.get('function_name') - + def extract_from_message(cls: Type[SpoolerTask], message: Dict[bytes, bytes]) -> Optional[SpoolerTask]: + decoded_message = cls._decode_message(message) + func_name = decoded_message.get('function_name') if not func_name: return None - if 'body' in message: - body = deserialize(message['body']) - args = body['args'] - kwargs = body['kwargs'] + if 'body' in decoded_message: + body = deserialize(decoded_message['body']) + args, kwargs = body['args'], body['kwargs'] else: - args = deserialize(message['args']) - kwargs = deserialize(message['kwargs']) + args = deserialize(decoded_message['args']) + kwargs = deserialize(decoded_message['kwargs']) - setup = deserialize(message.get('setup')) + setup = deserialize(decoded_message['setup']) - return cls( - function=func_name, - args=args, - kwargs=kwargs, - **setup - ) + return cls(function=func_name, args=args, kwargs=kwargs, **setup) @classmethod - def create(cls, **kwargs): - if 'spooler' not in uwsgi.opt: - return None - - return cls(**kwargs) + def create(cls: Type[SpoolerTask], **kwargs: Any) -> Optional[SpoolerTask]: + return cls(**kwargs) if uwsgi and 'spooler' in uwsgi.opt else None class SignalTask(BaseTask): - default_target = '' + """A task triggered by a uWSGI signal.""" + default_target: str = '' - def __init__(self, function, **setup): - super(SignalTask, self).__init__(function, **setup) - self._signal_id = None + def __init__(self, function: Union[Callable[..., Any], str], **setup: Any): + super().__init__(function, **setup) + self._signal_id: Optional[int] = None - def get_default_target(self): - """List of all available targets can be found here: - http://uwsgi-docs.readthedocs.org/en/latest/PythonModule.html#uwsgi.register_signal - """ + def get_default_target(self) -> str: if not uwsgi: return self.default_target - executor = self.setup.get('executor', TaskExecutor.AUTO) - - if (('mule' in uwsgi.opt or 'mules' in uwsgi.opt) - and executor in (TaskExecutor.AUTO, TaskExecutor.MULE)): + if ('mule' in uwsgi.opt or 'mules' in uwsgi.opt) and executor in (TaskExecutor.AUTO, TaskExecutor.MULE): return 'mule' - - if 'spooler' in uwsgi.opt and executor in (TaskExecutor.AUTO, - TaskExecutor.SPOOLER): + if 'spooler' in uwsgi.opt and executor in (TaskExecutor.AUTO, TaskExecutor.SPOOLER): return 'spooler' - return self.default_target @property - def signal_id(self): + def signal_id(self) -> Optional[int]: if self._signal_id is None: - try: - self._signal_id = int(self.setup.get('signal')) - except (ValueError, TypeError): - self._signal_id = get_free_signal() - + self._signal_id = self.setup.get('signal') or get_free_signal() return self._signal_id @property - def target(self): - target = self.setup.get('target') - return target if target is not None else self.get_default_target() + def target(self) -> str: + return self.setup.get('target') or self.get_default_target() @target.setter - def target(self, value): + def target(self, value: str): self.setup['target'] = value - def register_signal(self, handler=None): - if uwsgi and self._signal_id is None: - handler = handler or self.signal_handler - uwsgi.register_signal(self.signal_id, self.target, handler) + def register_signal(self, handler: Optional[Callable[[int], Any]] = None) -> bool: + if uwsgi and self.signal_id is not None and not uwsgi.signal_registered(self.signal_id): + uwsgi.register_signal(self.signal_id, self.target, handler or self.signal_handler) return True return False - def free_signal(self): - # currently uwsgi does not support signal deletion + def free_signal(self) -> None: + # uWSGI does not support unregistering signals. self._signal_id = None - def signal_handler(self, signal): + def signal_handler(self, signal: int) -> Any: self.args = (signal,) return self.execute_now() - def execute_async(self): - if not uwsgi: + def execute_async(self) -> None: + if not uwsgi or self.signal_id is None: return - self.register_signal() uwsgi.signal(self.signal_id) self.free_signal() class TimerTask(SignalTask): + """A task executed periodically by a uWSGI timer.""" - def execute_async(self): - if not uwsgi: + def execute_async(self) -> None: + if not uwsgi or self.signal_id is None: return - self.register_signal(self.signal_handler) - seconds = self.setup.get('seconds', 0) - iterations = self.setup.get('iterations', None) - + iterations = self.setup.get('iterations') if iterations is None: uwsgi.add_timer(self.signal_id, seconds) else: @@ -565,112 +503,81 @@ def execute_async(self): class CronTask(SignalTask): + """A task executed on a cron-like schedule.""" - def execute_async(self): - if not uwsgi: + def execute_async(self) -> None: + if not uwsgi or self.signal_id is None: return - self.register_signal(self.signal_handler) - - minute = self.setup.get('minute') - hour = self.setup.get('hour') - day = self.setup.get('day') - month = self.setup.get('month') - dayweek = self.setup.get('dayweek') - uwsgi.add_cron( self.signal_id, - minute, - hour, - day, - month, - dayweek + self.setup.get('minute', -1), + self.setup.get('hour', -1), + self.setup.get('day', -1), + self.setup.get('month', -1), + self.setup.get('dayweek', -1), ) -class OneTimeTask(TimerTask): - """It's like TimerTask but executes only ones""" - executor = TaskExecutor.AUTO - - def __init__(self, function, **setup): - if 'signal' not in setup: - warnings.warn( - 'You should provide "signal" parameter, otherwise ' - 'first found free signal id will be locked unless ' - 'worker is restarted', - RuntimeWarning - ) +# --- Task Factory --- - setup.update({ - 'seconds': 0, - 'iterations': 1, - }) - super(OneTimeTask, self).__init__(function, **setup) - - -tasks_registry = collections.OrderedDict(( +tasks_registry: Dict[int, Type[BaseTask]] = collections.OrderedDict([ (TaskExecutor.MULE, MuleTask), (TaskExecutor.SPOOLER, SpoolerTask), (TaskExecutor.RUNTIME, RuntimeTask), -)) +]) -class Task(object): - """Actual Task factory""" - - def __init__(self, func, executor=TaskExecutor.AUTO, **setup): - assert callable(func) +class Task: + """A factory for creating and executing tasks.""" + def __init__(self, func: Callable[..., Any], executor: TaskExecutor = TaskExecutor.AUTO, **setup: Any): + if not callable(func): + raise TypeError("A callable function must be provided.") self.function = func self.function_name = get_function_path(self.function) self.executor = executor self.setup = setup self._add_to_global_storage() - def __call__(self, *args, **kwargs): - + def __call__(self, *args: Any, **kwargs: Any) -> Optional[BaseTask]: if not uwsgi: - logger.warning('UWSGI environment is not available, so task %r ' - 'will be executed at runtime', self) + logger.warning('uWSGI not available. Task %r will run at runtime.', self) self.executor = TaskExecutor.RUNTIME - # create task and execute it at runtime or send to mule\spooler task = self.get_task(args, kwargs) - - logger.info('Executing asynchronously %s', task) - task.execute_async() + if task: + logger.info('Executing asynchronously %s', task) + task.execute_async() return task - def get_task(self, args, kwargs): - task_arguments = dict( - function=self.function, - args=args, - kwargs=kwargs, - ) - task_arguments.update(self.setup) - - current_task = None + def get_task(self, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Optional[BaseTask]: + task_arguments = { + 'function': self.function, + 'args': args, + 'kwargs': kwargs, + **self.setup, + } - if self.executor in tasks_registry: + # If executor is specified and not AUTO, try it first. + if self.executor != TaskExecutor.AUTO: task_class = tasks_registry.get(self.executor) - current_task = task_class.create(**task_arguments) - - if not current_task: - for task_class in tasks_registry.values(): - current_task = task_class.create(**task_arguments) - if current_task: - break - - if current_task: - return current_task - - raise RuntimeError( - 'Could not create a task for "{}" and executor "{}"'.format( - self.function_name, self.executor) - ) + if task_class: + task = task_class.create(**task_arguments) + if task: + return task + + # Fallback for AUTO or if the specified executor failed. + for task_class in tasks_registry.values(): + task = task_class.create(**task_arguments) + if task: + return task + + logger.error('Could not create any task for "%s"', self.function_name) + return None - def _add_to_global_storage(self): + def _add_to_global_storage(self) -> None: setattr(saved_tasks, self.function_name, self.function) - def __repr__(self): - return ''.format(self.function_name) + def __repr__(self) -> str: + return f'' \ No newline at end of file diff --git a/uwsgi_tasks/tests.py b/uwsgi_tasks/tests.py index be676a0..a25a135 100644 --- a/uwsgi_tasks/tests.py +++ b/uwsgi_tasks/tests.py @@ -1,31 +1,28 @@ -# -*- coding: utf-8 -*- -from datetime import timedelta -from unittest import TestCase - import os +from datetime import timedelta +from unittest import TestCase, mock -try: - import mock # Python 2 -except ImportError: - from unittest import mock # Python 3 - -import six -from uwsgi_tasks.utils import import_by_path, get_function_path +from uwsgi_tasks import TaskExecutor, RetryTaskException, SPOOL_OK, task from uwsgi_tasks.tasks import ( - RuntimeTask, manage_mule_request, manage_spool_request, TimerTask, - get_current_task, SpoolerTask, serialize + RuntimeTask, + TimerTask, + SpoolerTask, + get_current_task, + manage_mule_request, + manage_spool_request, + serialize, ) -from uwsgi_tasks import task, TaskExecutor, RetryTaskException, SPOOL_OK +from uwsgi_tasks.utils import import_by_path, get_function_path -def local_function(value): +def local_function(value: int) -> int: return value ** 2 class UtilsTest(TestCase): - builtin_module = '__builtin__' if six.PY2 else 'builtins' + builtin_module = 'builtins' - def test_import_by_path(self): + def test_import_by_path(self) -> None: with self.assertRaises(ImportError): import_by_path('len') @@ -35,26 +32,15 @@ def test_import_by_path(self): main = __import__('__main__') main.local_function = local_function - self.assertTrue(local_function is import_by_path('local_function')) - - self.assertTrue( - local_function is import_by_path('uwsgi_tasks.tests.local_function') - ) - self.assertTrue(len is import_by_path('{}.len'.format( - self.builtin_module))) - - def test_get_function_path(self): - self.assertEqual( - get_function_path(local_function), - 'uwsgi_tasks.tests.local_function' - ) + self.assertIs(local_function, import_by_path('local_function')) + self.assertIs(local_function, import_by_path('uwsgi_tasks.tests.local_function')) + self.assertIs(len, import_by_path(f'{self.builtin_module}.len')) - self.assertEqual( - get_function_path(len), - '{}.len'.format(self.builtin_module) - ) + def test_get_function_path(self) -> None: + self.assertEqual(get_function_path(local_function), 'uwsgi_tasks.tests.local_function') + self.assertEqual(get_function_path(len), f'{self.builtin_module}.len') - def nested(): + def nested() -> None: pass self.assertEqual(get_function_path(nested), 'uwsgi_tasks.tests.nested') @@ -64,24 +50,23 @@ def nested(): @task(executor=TaskExecutor.MULE) -def mule_task(a, b, c='3', d='4'): +def mule_task(a: int, b: int, c: str = '3', d: str = '4') -> None: storage(a, b, c, d) @task(executor=TaskExecutor.SPOOLER, priority='10', at=timedelta(seconds=10)) -def spooler_task(a, b=None): +def spooler_task(a: int, b: str | None = None) -> None: storage(a, b) @task(executor=TaskExecutor.SPOOLER, retry_count=2) -def spooler_retry_task(g, h): +def spooler_retry_task(g: int, h: int) -> None: storage(g, h) raise RetryTaskException(timeout=10) -@task(executor=TaskExecutor.SPOOLER, retry_count=2, - retry_timeout=timedelta(seconds=20)) -def spooler_and_task_introspection(a, b): +@task(executor=TaskExecutor.SPOOLER, retry_count=2, retry_timeout=timedelta(seconds=20)) +def spooler_and_task_introspection(a: str, b: int) -> None: current_task = get_current_task() assert isinstance(current_task, SpoolerTask) assert current_task.executor == TaskExecutor.SPOOLER @@ -91,14 +76,14 @@ def spooler_and_task_introspection(a, b): assert current_task.retry_timeout == 50 assert current_task.at == timedelta(seconds=50) assert len(current_task.buffer) == 1 - assert current_task.buffer[b'message'] == u'Hello, World!' + assert current_task.buffer['message'] == 'Hello, World!' else: assert current_task.executor == TaskExecutor.SPOOLER assert current_task.retry_count == 2 assert current_task.retry_timeout == 20 storage(a, b) - current_task.buffer[b'message'] = u'Hello, World!' + current_task.buffer['message'] = 'Hello, World!' raise RetryTaskException(timeout=50) @@ -106,193 +91,150 @@ def spooler_and_task_introspection(a, b): @task(executor=TaskExecutor.SPOOLER) -def spooler_task_with_cwd_changed(): +def spooler_task_with_cwd_changed() -> None: storage() - current_dir = os.getcwd() assert current_dir != SPOOLER_DIR @task(executor=TaskExecutor.SPOOLER, working_dir=None) -def spooler_task_with_cwd_not_changed(): +def spooler_task_with_cwd_not_changed() -> None: storage() - current_dir = os.getcwd() assert current_dir == SPOOLER_DIR -def timer_task(signum): +def timer_task(signum: int) -> None: storage(signum) @task(executor=TaskExecutor.RUNTIME) -def runtime_task(*args, **kwargs): +def runtime_task(*args, **kwargs) -> bool: return True class TaskTest(TestCase): - def setUp(self): + def setUp(self) -> None: self.storage = storage self.storage.reset_mock() self.patcher = mock.patch('uwsgi_tasks.tasks.uwsgi') - def test_runtime_task_execution(self): - - def runtime_task(a, b, c=None): + def test_runtime_task_execution(self) -> None: + def rt_func(a: int, b: int, c: int | None = None) -> None: self.storage(a, b, c=c) - runtime_task = RuntimeTask(runtime_task) - - runtime_task(1, 2, c=3) + rt = RuntimeTask(rt_func) + rt(1, 2, c=3) self.storage.assert_called_once_with(1, 2, c=3) - runtime_task(4, '2') + rt(4, '2') self.storage.assert_called_with(4, '2', c=None) with self.assertRaises(TypeError): - runtime_task() + rt() - def test_task_is_executed_at_runtime_if_uwsgi_not_available(self): + def test_task_is_executed_at_runtime_if_uwsgi_not_available(self) -> None: mule_task(4, 5, '6', '7') self.storage.assert_called_once_with(4, 5, '6', '7') - def test_mule_task_execution_became_runtime(self): - # 'mule' not in uwsgi.opt, so it goes to runtime + def test_mule_task_execution_became_runtime(self) -> None: with self.patcher: mule_task(4, 7, '8') - self.storage.assert_called_with(4, 7, '8', '4') - def test_mule_task_execution(self): + def test_mule_task_execution(self) -> None: with self.patcher as uwsgi_mock: uwsgi_mock.opt = {'mule': 1} - m_task = mule_task(6, 6, u'a', u'кириллический') - + m_task = mule_task(6, 6, 'a', 'кириллический') message = m_task.get_message_content() - uwsgi_mock.mule_msg.assert_called_with( - message, m_task.default_mule_id - ) + uwsgi_mock.mule_msg.assert_called_with(message, m_task.default_mule_id) manage_mule_request(message) + self.storage.assert_called_once_with(6, 6, 'a', 'кириллический') - self.storage.assert_called_once_with(6, 6, u'a', u'кириллический') - - def test_spooler_task_execution(self): + def test_spooler_task_execution(self) -> None: with self.patcher as uwsgi_mock: uwsgi_mock.opt = {'spooler': '/tmp/spooler'} uwsgi_mock.SPOOL_OK = SPOOL_OK - s_task = spooler_task(0, '1') message = s_task.get_message_content() - self.assertEqual(message[b'priority'], b'10') uwsgi_mock.spool.assert_called_with(message) manage_spool_request(message) - self.storage.assert_called_once_with(0, '1') - def test_rb_timer_task_execution(self): + def test_rb_timer_task_execution(self) -> None: with self.patcher as uwsgi_mock: - t_task = TimerTask( - timer_task, - signal=10, - seconds=5, - iterations=3, - target='workers' - ) - + uwsgi_mock.signal_registered.return_value = False + t_task = TimerTask(timer_task, seconds=5, iterations=3, target='workers') + t_task.setup['signal'] = 10 t_task() - - uwsgi_mock.register_signal.assert_called_once_with( - 10, 'workers', t_task.signal_handler - ) + uwsgi_mock.register_signal.assert_called_once_with(10, 'workers', t_task.signal_handler) uwsgi_mock.add_rb_timer.assert_called_once_with(10, 5, 3) - t_task.signal_handler(10) self.storage.assert_called_once_with(10) - def test_spooler_retry_on_exception(self): + def test_spooler_retry_on_exception(self) -> None: with self.patcher as uwsgi_mock: uwsgi_mock.opt = {'spooler': '/tmp/spooler'} uwsgi_mock.SPOOL_OK = SPOOL_OK - s_task = spooler_retry_task(666, 777) message = s_task.get_message_content() - - self.assertFalse(message.get(b'at')) - + self.assertNotIn(b'at', message) uwsgi_mock.spool.assert_called_with(message) - manage_spool_request(message) self.storage.assert_called_once_with(666, 777) self.assertEqual(2, uwsgi_mock.spool.call_count) - new_message = uwsgi_mock.spool.call_args_list[-1][0][0] - self.assertTrue(new_message.get(b'at')) - + self.assertIn(b'at', new_message) manage_spool_request(message) - self.assertEqual(2, self.storage.call_count) - def test_spooler_task_introspection(self): + def test_spooler_task_introspection(self) -> None: with self.patcher as uwsgi_mock: uwsgi_mock.opt = {'spooler': '/tmp/spooler'} - s_task = spooler_and_task_introspection('a', 1) message = s_task.get_message_content() manage_spool_request(message) - new_message = uwsgi_mock.spool.call_args_list[-1][0][0] manage_spool_request(new_message) - self.storage.assert_called_with('a', 1) self.assertEqual(2, self.storage.call_count) - def test_spooler_task_large_args_in_body(self): + def test_spooler_task_large_args_in_body(self) -> None: large_arg = 'b' * 64 * 1024 with self.patcher as uwsgi_mock: uwsgi_mock.opt = {'spooler': '/tmp/spooler'} s_task = spooler_task('a', large_arg) message = s_task.get_message_content() - - expected_args = s_task._encode_message({ - 'args': serialize(()), - 'kwargs': serialize({}), - }) - + expected_args = SpoolerTask._encode_message({'args': serialize(()), 'kwargs': serialize({})}) self.assertEqual(message[b'args'], expected_args[b'args']) self.assertEqual(message[b'kwargs'], expected_args[b'kwargs']) manage_spool_request(message) - self.storage.assert_called_once_with('a', large_arg) - def test_task_working_directory_changed(self): + def test_task_working_directory_changed(self) -> None: current_dir = os.getcwd() - with self.patcher as uwsgi_mock: uwsgi_mock.opt = {'spooler': SPOOLER_DIR} - s_task = spooler_task_with_cwd_changed() message = s_task.get_message_content() os.chdir(SPOOLER_DIR) manage_spool_request(message) self.assertTrue(self.storage.called) self.storage.reset_mock() - os.chdir(current_dir) - s_task = spooler_task_with_cwd_not_changed() message = s_task.get_message_content() os.chdir(SPOOLER_DIR) manage_spool_request(message) self.assertTrue(self.storage.called) self.storage.reset_mock() - os.chdir(current_dir) - def test_runtime_task_pickling(self): + def test_runtime_task_pickling(self) -> None: runtime_task('args', {'kw': 'args'}) with self.assertRaises(Exception): runtime_task(lambda x: x, {'kw': 'args'}) with self.assertRaises(Exception): - runtime_task('args', {'kw': lambda x: x}) + runtime_task('args', {'kw': lambda x: x}) \ No newline at end of file diff --git a/uwsgi_tasks/utils.py b/uwsgi_tasks/utils.py index 1c3ba02..b8ee5aa 100644 --- a/uwsgi_tasks/utils.py +++ b/uwsgi_tasks/utils.py @@ -1,86 +1,71 @@ -# -*- coding: utf-8 -*- -import six +from __future__ import annotations + from importlib import import_module +from typing import Any, Callable, Optional -def import_by_path(dotted_path): +def import_by_path(dotted_path: str) -> Any: """Import a dotted module path and return the attribute/class designated by the last name in the path. Raise ImportError if the import failed. - Adapted from Django 1.7 + Adapted from Django. """ - try: - if not dotted_path.count('.'): - dotted_path = '.'.join(['__main__', dotted_path]) + if '.' not in dotted_path: + dotted_path = f'__main__.{dotted_path}' module_path, class_name = dotted_path.rsplit('.', 1) - except ValueError: - msg = '"{}" doesn\'t look like a module path'.format(dotted_path) - raise ImportError(msg) + except ValueError as e: + raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e try: module = import_module(module_path) - except ImportError as ex: - raise ImportError('Failed to import "{}" - {}'.format(dotted_path, ex)) + except ImportError as e: + raise ImportError(f'Failed to import "{dotted_path}" - {e}') from e try: return getattr(module, class_name) - except AttributeError: - msg = 'Module "{}" does not define a "{}" attribute/class'.format( - dotted_path, class_name) - raise ImportError(msg) + except AttributeError as e: + raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class') from e -def get_function_path(function): - """Get received function path (as string), to import function later - with `import_string`. - """ - if isinstance(function, six.string_types): +def get_function_path(function: Callable[..., Any] | str) -> str: + """Get the import path for a function as a string.""" + if isinstance(function, str): return function - func_path = [] - module = getattr(function, '__module__', '__main__') if module: - func_path.append(module) + return f'{module}.{function.__name__}' - func_path.append(function.__name__) - return '.'.join(func_path) + return function.__name__ -def django_setup(settings_module=None): +def django_setup(settings_module: Optional[str] = None) -> None: """Initialize Django if required, must be run before performing - any task on spooler or mule. + any task on a spooler or mule. """ from django.conf import settings, ENVIRONMENT_VARIABLE + import os if settings.configured: return if settings_module: - import os os.environ[ENVIRONMENT_VARIABLE] = settings_module - try: - # django > 1.7 - from django import setup - except ImportError: - # django < 1.7 - def setup(): - settings._setup() - + from django import setup setup() class ProxyDict(dict): + """A dict that proxies access to a nested dictionary within a parent dict.""" - def __init__(self, dict_instance, key): - super(ProxyDict, self).__init__() - + def __init__(self, dict_instance: dict[str, Any], key: str): + super().__init__() self.key = key if self.key in dict_instance: self.update(dict_instance[self.key]) - dict_instance[self.key] = self + dict_instance[self.key] = self \ No newline at end of file