Source code for pcs.agents.lakeshore325.agent

import argparse
import threading
from contextlib import contextmanager

from ocs import ocs_agent, site_config
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from twisted.internet import reactor

from pcs.drivers.lakeshore325 import LS325

[docs] class LS325_Agent: """Agent to connect to a single Bluefors Temperature Controller device. """ def __init__(self, agent, name, port): self.name = name self.port = port self.module = None self.thermometers = [] self.log = agent.log self.initialized = False self.take_data = False self.agent = agent # Registers temperature feeds agg_params = { 'frame_length': 10 * 60 # [sec] } """ self.agent.register_feed('temperatures', record=True, agg_params=agg_params, buffer_time=1) """
[docs] def init_ls325(self, session, params=None): """init_bftc(auto_acquire=False, acq_params=None) **Task** - Perform first time setup of the communication to the BFTC. Parameters: auto_acquire (bool, optional): Default is False. Starts data acquisition after initialization if True. acq_params (dict, optional): Params to pass to acq process if auto_acquire is True. """ if params is None: params = {} if self.initialized and not params.get('force', False): self.log.info("LS325 already initialized. Returning...") return True, "Already initialized" session.set_status('running') try: self.module = LS325(self.port) except ConnectionError: self.log.error("Could not connect to the LS325. Exiting.") reactor.callFromThread(reactor.stop) return False, 'LS325 initialization failed' except Exception as e: self.log.error(f"Unhandled exception encountered: {e}") reactor.callFromThread(reactor.stop) return False, 'LS325 initialization failed' print("Initialized LS325 module: {!s}".format(self.module)) session.add_message("LS325 initilized with ID: %s" % self.module.id) #self.thermometers = [channel.name for channel in self.module.channels] self.initialized = True return True, 'LS325 initialized.'
@ocs_agent.param('value',type=str) def set_heater_units(self, session, params=None): self.module.heater1.set_units(params['value']) return True, 'Heater 1 units set' @ocs_agent.param('_') def get_heater_units(self, session, params=None): resp = self.module.heater1.get_units() return True, resp
def make_parser(parser=None): """Build the argument parser for the Agent. Allows sphinx to automatically build documentation based on this function. """ if parser is None: parser = argparse.ArgumentParser() # Add options specific to this agent. pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--port') pgroup.add_argument('--serial_number') return parser def main(args=None): # For logging txaio.use_twisted() txaio.make_logger() # Start logging txaio.start_logging(level=os.environ.get("LOGLEVEL", "info")) parser = make_parser() args = site_config.parse_args(agent_class='LS325Agent', parser=parser, args=args) # Interpret options in the context of site_config. print('I am in charge of device with serial number: %s' % args.serial_number) agent, runner = ocs_agent.init_site_agent(args) ls325_agent = LS325_Agent(agent, args.serial_number, args.port) agent.register_task('init_ls325', ls325_agent.init_ls325) agent.register_task('get_heater_units', ls325_agent.get_heater_units) agent.register_task('set_heater_units', ls325_agent.set_heater_units) # And many more to come... runner.run(agent, auto_reconnect=True) if __name__ == '__main__': main()