Source code for pcs.agents.bluefors_tc.agent

# Bluefors TC agent

import argparse
import threading
from contextlib import contextmanager

from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from twisted.internet import reactor

from pcs.drivers.bluefors_tc import BFTC

class YieldingLock:
    """
    Borrowed from SOCS Lakeshore372 Agent on 11/9/2023 - could just import
    it but want to ensure it doesn't get changed in a future SOCS release
    without us knowing it.
    
    A lock protected by a lock.  This braided arrangement guarantees
    that a thread waiting on the lock will get priority over a thread
    that has just released the lock and wants to reacquire it.

    The typical use case is a Process that wants to hold the lock as
    much as possible, but occasionally release the lock (without
    sleeping for long) so another thread can access a resource.  The
    method release_and_acquire() is provided to make this a one-liner.

    """

    def __init__(self, default_timeout=None):
        self.job = None
        self._next = threading.Lock()
        self._active = threading.Lock()
        self._default_timeout = default_timeout

    def acquire(self, timeout=None, job=None):
        if timeout is None:
            timeout = self._default_timeout
        if timeout is None or timeout == 0.:
            kw = {'blocking': False}
        else:
            kw = {'blocking': True, 'timeout': timeout}
        result = False
        if self._next.acquire(**kw):
            if self._active.acquire(**kw):
                self.job = job
                result = True
            self._next.release()
        return result

    def release(self):
        self.job = None
        return self._active.release()

    def release_and_acquire(self, timeout=None):
        job = self.job
        self.release()
        return self.acquire(timeout=timeout, job=job)

    @contextmanager
    def acquire_timeout(self, timeout=None, job='unnamed'):
        result = self.acquire(timeout=timeout, job=job)
        if result:
            try:
                yield result
            finally:
                self.release()
        else:
            yield result


[docs] class Bluefors_TC_Agent: """Agent to connect to a single Bluefors Temperature Controller device. """ def __init__(self, agent, name, ip): # self._acq_proc_lock is held for the duration of the acq Process. # Tasks that require acq to not be running, at all, should use # this lock. self._acq_proc_lock = TimeoutLock() # self._lock is held by the acq Process only when accessing # the hardware but released occasionally so that (short) Tasks # may run. Use a YieldingLock to guarantee that a waiting # Task gets activated preferentially, even if the acq thread # immediately tries to reacquire. self._lock = YieldingLock(default_timeout=5) self.name = name self.ip = ip self.module = None self.thermometers = [] self.log = agent.log self.initialized = False self.take_data = False self.agent = agent # Registers temperature feeds agg_params = { 'frame_length': 10 * 60 # [sec] } self.agent.register_feed('temperatures', record=True, agg_params=agg_params, buffer_time=1)
[docs] @ocs_agent.param('auto_acquire', default=False, type=bool) @ocs_agent.param('acq_params', type=dict, default=None) def init_bftc(self, session, params=None): """init_bftc(auto_acquire=False, acq_params=None) **Task** - Perform first time setup of the communication to the BFTC. Parameters: auto_acquire (bool, optional): Default is False. Starts data acquisition after initialization if True. acq_params (dict, optional): Params to pass to acq process if auto_acquire is True. """ if params is None: params = {} if self.initialized and not params.get('force', False): self.log.info("BF TC already initialized. Returning...") return True, "Already initialized" with self._lock.acquire_timeout(job='init') as acquired1, \ self._acq_proc_lock.acquire_timeout(timeout=0., job='init') \ as acquired2: if not acquired1: self.log.warn(f"Could not start init because " f"{self._lock.job} is already running") return False, "Could not acquire lock" if not acquired2: self.log.warn(f"Could not start init because " f"{self._acq_proc_lock.job} is already running") return False, "Could not acquire lock" session.set_status('running') try: self.module = BFTC(self.ip) except ConnectionError: self.log.error("Could not connect to the BF TC. Exiting.") reactor.callFromThread(reactor.stop) return False, 'BF TC initialization failed' except Exception as e: self.log.error(f"Unhandled exception encountered: {e}") reactor.callFromThread(reactor.stop) return False, 'BF TC initialization failed' print("Initialized BF TC module: {!s}".format(self.module)) session.add_message("BF TC initilized with ID: %s" % self.module.id) self.thermometers = [channel.name for channel in self.module.channels] self.initialized = True # Start data acquisition if requested if params.get('auto_acquire', False): self.agent.start('acq', params.get('acq_params', None)) return True, 'BF TC initialized.'
[docs] @ocs_agent.param('_') def acq(self, session, params=None): """acq() **Process** - Acquire data from the BF TC. Set to check for new data at 2 Hz - this needs to be adjusted if wait_time+meas_time in the BF TC is shorter than a second (it is much longer than 1 sec by default). Notes: The most recent data collected is stored in session data in the structure:: >>> response.session['data'] {"fields": {"Channel_05": {"T": 293.644, "R": 33.752, "timestamp": 1601924482.722671}, "Channel_06": {"T": 0, "R": 1022.44, "timestamp": 1601924499.5258765}, "Channel_08": {"T": 0, "R": 1026.98, "timestamp": 1601924494.8172355}, "Channel_01": {"T": 293.41, "R": 108.093, "timestamp": 1601924450.9315426}, "Channel_02": {"T": 293.701, "R": 30.7398, "timestamp": 1601924466.6130798}, } } """ pm = Pacemaker(2, quantize=True) # OCS pacemaker set to check for new data at 2 Hz with self._acq_proc_lock.acquire_timeout(timeout=0, job='acq') \ as acq_acquired, \ self._lock.acquire_timeout(job='acq') as acquired: if not acq_acquired: self.log.warn(f"Could not start Process because " f"{self._acq_proc_lock.job} is already running") return False, "Could not acquire lock" session.set_status('running') self.log.info("Starting data acquisition for {}".format(self.agent.agent_address)) previous_timestamp = None last_release = time.time() session.data = {"fields": {}} self.take_data = True while self.take_data: pm.sleep() # Relinquish sampling lock occasionally. if time.time() - last_release > 1.: last_release = time.time() if not self._lock.release_and_acquire(timeout=10): self.log.warn(f"Failed to re-acquire sampling lock, " f"currently held by {self._lock.job}.") continue current_measurement = self.module.get_latest_measurement() current_timestamp = current_measurement['timestamp'] current_ch_num = current_measurement['channel_nr'] channel_str = f'Channel_{current_ch_num:02}' # The BFTC reports a timestamp of the most recent measurement # when we query it - we only save the new data point when the # timestamp changes. if previous_timestamp != current_timestamp: # Potential upgrade: implement checks of channel wait_time # and meas_time to ensure we aren't missing data # Record most recent data # Temperature data returns None if no curve or out of range # Will set to zero to match LS372 behavior for now temp_reading = current_measurement['temperature'] # in K if temp_reading == None: temp_reading = 0.0 res_reading = current_measurement['resistance'] # in Ohms # Update timestamp previous_timestamp = current_timestamp current_time = time.time() # Using time.time() to be consistent with other agents data = { 'timestamp': current_time, 'block_name': channel_str, 'data': {} } # For data feed data['data'][channel_str + '_T'] = temp_reading data['data'][channel_str + '_R'] = res_reading session.app.publish_to_feed('temperatures', data) self.log.debug("{data}", data=session.data) # For session.data field_dict = {channel_str: {"T": temp_reading, "R": res_reading, "timestamp": current_time}} session.data['fields'].update(field_dict) return True, 'Acquisition exited cleanly.'
def _stop_acq(self, session, params=None): """ Stops acq process. """ if self.take_data: session.set_status('stopping') self.take_data = False return True, 'requested to stop taking data.' else: return False, 'acq is not currently running'
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent. pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--ip-address') pgroup.add_argument('--serial-number') pgroup.add_argument('--mode', type=str, default='acq', choices=['idle', 'init', 'acq'], help="Starting action for the Agent.") return parser def main(args=None): # For logging txaio.use_twisted() txaio.make_logger() # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='BFTCAgent', parser=parser, args=args) # Automatically acquire data if requested (default) init_params = False if args.mode == 'init': init_params = {'auto_acquire': False, 'acq_params': {}} elif args.mode == 'acq': init_params = {'auto_acquire': True, 'acq_params': {}} # Interpret options in the context of site_config. print('I am in charge of device with serial number: %s' % args.serial_number) agent, runner = ocs_agent.init_site_agent(args) bftc_agent = Bluefors_TC_Agent(agent, args.serial_number, args.ip_address) agent.register_task('init_bftc', bftc_agent.bftc, startup=init_params) agent.register_process('acq', bftc_agent.acq, bftc_agent._stop_acq) # And many more to come... runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()