From edd8d49b240c2f0237696e07755378c48a692001 Mon Sep 17 00:00:00 2001 From: Marcell Nagy Date: Tue, 29 Oct 2024 14:29:30 +0000 Subject: [PATCH] Add scheduler --- pyproject.toml | 1 + src/fastcs/backend.py | 75 ++++++++++++++++--------------------------- 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ff870e073..0fe075d0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ classifiers = [ description = "Control system agnostic framework for building Device support in Python that will work for both EPICS and Tango" dependencies = [ "aioserial", + "apscheduler", "numpy", "pydantic", "pvi~=0.10.0", diff --git a/src/fastcs/backend.py b/src/fastcs/backend.py index 80c8eab67..99b0b2907 100644 --- a/src/fastcs/backend.py +++ b/src/fastcs/backend.py @@ -1,9 +1,7 @@ import asyncio -from collections import defaultdict -from collections.abc import Callable -from concurrent.futures import Future from types import MethodType +from apscheduler.schedulers.asyncio import AsyncIOScheduler from softioc.asyncio_dispatcher import AsyncioDispatcher from .attributes import AttrR, AttrW, Sender, Updater @@ -21,7 +19,6 @@ def __init__( self._controller = controller self._initial_tasks = [controller.connect] - self._scan_tasks: list[Future] = [] asyncio.run_coroutine_threadsafe( self._controller.initialise(), self._loop @@ -30,6 +27,9 @@ def __init__( self._mapping = Mapping(self._controller) self._link_process_tasks() + self._scheduler = AsyncIOScheduler(event_loop=self._loop) + self._add_scan_tasks() + self._context = { "dispatcher": self._dispatcher, "controller": self._controller, @@ -41,9 +41,14 @@ def _link_process_tasks(self): _link_single_controller_put_tasks(single_mapping) _link_attribute_sender_class(single_mapping) + def _add_scan_tasks(self): + for single_mapping in self._mapping.get_controller_mappings(): + _add_scan_method_tasks(self._scheduler, single_mapping) + _add_attribute_updater_tasks(self._scheduler, single_mapping) + def run(self): self._run_initial_tasks() - self._start_scan_tasks() + self._scheduler.start() self._run() @@ -52,12 +57,6 @@ def _run_initial_tasks(self): future = asyncio.run_coroutine_threadsafe(task(), self._loop) future.result() - def _start_scan_tasks(self): - scan_tasks = _get_scan_tasks(self._mapping) - - for task in scan_tasks: - asyncio.run_coroutine_threadsafe(task(), self._loop) - def _run(self): raise NotImplementedError("Specific Backend must implement _run") @@ -98,36 +97,35 @@ async def callback(value): return callback -def _get_scan_tasks(mapping: Mapping) -> list[Callable]: - scan_dict: dict[float, list[Callable]] = defaultdict(list) - - for single_mapping in mapping.get_controller_mappings(): - _add_scan_method_tasks(scan_dict, single_mapping) - _add_attribute_updater_tasks(scan_dict, single_mapping) - - scan_tasks = _get_periodic_scan_tasks(scan_dict) - return scan_tasks - - -def _add_scan_method_tasks( - scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping -): +def _add_scan_method_tasks(scheduler: AsyncIOScheduler, single_mapping: SingleMapping): for method in single_mapping.scan_methods.values(): - scan_dict[method.period].append( - MethodType(method.fn, single_mapping.controller) + path = single_mapping.controller.path + scheduler.add_job( + MethodType(method.fn, single_mapping.controller), + "interval", + seconds=method.period, + name=f"{'_'.join(path)}_{method.fn.__name__}" + if path + else method.fn.__name__, ) def _add_attribute_updater_tasks( - scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping + scheduler: AsyncIOScheduler, single_mapping: SingleMapping ): - for attribute in single_mapping.attributes.values(): + for attr_name, attribute in single_mapping.attributes.items(): match attribute: case AttrR(updater=Updater(update_period=update_period)) as attribute: callback = _create_updater_callback( attribute, single_mapping.controller ) - scan_dict[update_period].append(callback) + path = single_mapping.controller.path + scheduler.add_job( + callback, + "interval", + seconds=update_period, + name=f"{'_'.join(path)}_{attr_name}" if path else attr_name, + ) def _create_updater_callback(attribute, controller): @@ -142,20 +140,3 @@ async def callback(): raise return callback - - -def _get_periodic_scan_tasks(scan_dict: dict[float, list[Callable]]) -> list[Callable]: - periodic_scan_tasks: list[Callable] = [] - for period, methods in scan_dict.items(): - periodic_scan_tasks.append(_create_periodic_scan_task(period, methods)) - - return periodic_scan_tasks - - -def _create_periodic_scan_task(period, methods: list[Callable]) -> Callable: - async def scan_task() -> None: - while True: - await asyncio.gather(*[method() for method in methods]) - await asyncio.sleep(period) - - return scan_task