Source code for pychron.tx.protocols.laser

# ===============================================================================
# Copyright 2015 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
# ============= standard library imports ========================
# ============= local library imports  ==========================
from __future__ import absolute_import

from pychron.core.helpers.binpack import encode_blob
from pychron.core.helpers.strtools import to_bool
from pychron.lasers.laser_managers.ilaser_manager import ILaserManager
from pychron.tx.errors import EnableErrorCode
from pychron.tx.errors import InvalidArgumentsErrorCode
from pychron.tx.errors import (
    LogicBoardCommErrorCode,
    InvalidMotorErrorCode,
    InvalidSampleHolderErrorCode,
)
from pychron.tx.protocols.service import ServiceProtocol


[docs]class LaserProtocol(ServiceProtocol): def __init__(self, application, name, addr, logger): ServiceProtocol.__init__(self, logger=logger) # self._application = application man = application.get_service(ILaserManager, 'name=="{}"'.format(name)) self._manager = man self._addr = addr services = ( ("GetError", "_get_error"), # ('MachineVisionDegas', '_machine_vision_degas'), ("StartVideoRecording", "_start_video_recording"), ("StopVideoRecording", "_stop_video_recording"), ("ReadLaserPower", "_read_laser_power"), ("GetLaserStatus", "_get_laser_status"), ("Snapshot", "_snapshot"), ("PrepareLaser", "_prepare_laser"), ("LaserReady", "_laser_ready"), ("Enable", "_enable"), ("Disable", "_disable"), ("SetXY", "_set_x_y"), ("SetX", "_set_x"), ("SetY", "_set_y"), ("SetZ", "_set_z"), ("GetPosition", "_get_position"), ("GetAutoCorrecting", "_get_auto_correcting"), ("CancelAutoCorrecting", "_cancel_auto_correcting"), ("GetDriveMoving", "_get_drive_moving"), ("GetXMoving", "_get_x_moving"), ("GetYMoving", "_get_y_moving"), ("GetZMoving", "_get_z_moving"), ("StopDrive", "_stop_drive"), ("SetDriveHome", "_set_drive_home"), ("SetHomeX", "_set_home_x"), ("SetHomeY", "_set_home_y"), ("SetHomeZ", "_set_home_z"), ("GoToHole", "_go_to_hole"), ("DoPattern", "_do_pattern"), ("IsPatterning", "_is_patterning"), ("AbortPattern", "_abort_pattern"), ("DoJog", "_do_pattern"), ("IsJogging", "_is_patterning"), ("AbortJog", "_abort_pattern"), ("GetJogProcedures", "_get_pattern_names"), ("GetPatternNames", "_get_pattern_names"), ("SetBeamDiameter", "_set_beam_diameter"), ("GetBeamDiameter", "_get_beam_diameter"), ("SetZoom", "_set_zoom"), ("GetZoom", "_get_zoom"), ("SetMotorLock", "_set_motor_lock"), ("SetMotor", "_set_motor"), ("GetMotorMoving", "_get_motor_moving"), ("SetSampleHolder", "_set_sample_holder"), ("GetSampleHolder", "_get_sample_holder"), ("StartMeasureGrainPolygon", "_start_measure_grain_polygon"), ("StopMeasureGrainPolygon", "_stop_measure_grain_polygon"), ("GetGrainPolygonBlob", "_get_grain_polygon_blob"), ("AcquireGrainPolygonBlob", "_get_grain_polygon"), ("SetLaserPower", "_set_laser_power"), ("SetLaserOutput", "_set_laser_output"), ("GetAchievedOutput", "_get_achieved_output"), ("GetResponseBlob", "_get_response_blob"), ("GetOutputBlob", "_get_output_blob"), ("GetPyrometerTemperature", "_get_pyrometer_temperature"), ("GoToNamedPosition", "_go_to_named_position"), ("GoToPoint", "_go_to_point"), # ('TracePath', '_trace_path'), ("IsTracing", "_is_tracing"), ("StopTrace", "_stop_trace"), ("Prepare", "_prepare"), ("IsReady", "_is_ready"), ("SetLight", "_set_light"), ) self._register_services(services) # =============================================================================== # Machine Vision # =============================================================================== # def _machine_vision_degas(self, data): # if isinstance(data, dict): # lumens, duration = data['lumens'], data['duration'] # else: # lumens, duration = data # # lumens, duration = float(lumens), float(duration) # self._manager.do_machine_vision_degas(lumens, duration, new_thread=True) def _get_auto_correcting(self, data): return self._manager.stage_manager.is_auto_correcting() def _cancel_auto_correcting(self, data): return self._manager.stage_manager.cancel_auto_correcting() # =============================================================================== # Video # =============================================================================== def _start_video_recording(self, data): if isinstance(data, dict): name = data["name"] else: name = data return self._manager.start_video_recording(name) def _stop_video_recording(self, data): return self._manager.stop_video_recording() def _snapshot(self, data): """ name: base name for file. saved in default directory returns: abs path to saved file in the media server """ if isinstance(data, dict): name, pic_format = data["name"], data["pic_format"] else: name, pic_format = data sm = self._manager.stage_manager if hasattr(sm, "video"): if pic_format not in (".jpg", ".png"): pic_format = ".jpg" lpath, upath, imageblob = sm.snapshot( name=name, return_blob=True, inform=False, pic_format=pic_format ) s = "{:02X}{}{:02x}{}{}".format( len(lpath), lpath, len(upath), upath, imageblob ) return s def _get_grain_polygon_blob(self, data): return self._manager.get_grain_polygon_blob() def _get_grain_polygon(self, data): return self._manager.get_grain_polygon() def _stop_measure_grain_polygon(self, data): return self._manager.stop_measure_grain_polygon() def _start_measure_grain_polygon(self, data): return self._manager.start_measure_grain_polygon() # =============================================================================== # Laser # =============================================================================== def _get_error(self, data): return self._manager.get_error() or "OK" def _read_laser_power(self, data): """ return watts """ return self._manager.get_laser_watts() def _get_laser_status(self, data): return def _prepare_laser(self, data): self._manager.prepare_laser() return True def _laser_ready(self, data): return self._manager.is_laser_ready() def _enable(self, data): err = self._manager.enable_laser() if err is None: err = LogicBoardCommErrorCode() elif isinstance(err, str): err = EnableErrorCode(err) return err or "OK" def _disable(self, data): err = self._manager.disable_laser() if err is None: err = LogicBoardCommErrorCode() elif isinstance(err, str): err = EnableErrorCode(err) return err or "OK" def _set_laser_power(self, data): try: p = float(data) except: return InvalidArgumentsErrorCode("SetLaserPower", data) self._manager.set_laser_power(p) return True def _set_laser_output(self, data): if isinstance(data, dict): value, units = data["value"], data["units"] else: value, units = data try: p = float(value) except: return InvalidArgumentsErrorCode("SetLaserOutput", value) self._manager.set_laser_output(p, units=units) return True def _get_achieved_output(self, data): return self._manager.get_achieved_output() def _get_response_blob(self, data): return encode_blob(self._manager.get_response_blob()) def _get_output_blob(self, data): return encode_blob(self._manager.get_output_blob()) def _get_pyrometer_temperature(self, data): return self._manager.get_pyrometer_temperature() # =============================================================================== # Motors # =============================================================================== def _set_beam_diameter(self, data): try: bd = float(data) except ValueError: return InvalidArgumentsErrorCode("SetBeamDiameter", data) return self._manager.set_beam_diameter(bd, block=False) def _get_beam_diameter(self, data): motor = self._manager.get_motor("beam") pos = "No Beam Motor" if motor: pos = motor.data_position return pos def _set_zoom(self, data): try: zoom = float(data) except (ValueError, TypeError): return InvalidArgumentsErrorCode("SetZoom", data) self._manager.zoom = zoom return True def _get_zoom(self, data): motor = self._manager.get_motor("zoom") pos = "No Zoom Motor" if motor: pos = motor.data_position return pos def _set_motor_lock(self, data): if isinstance(data, dict): name = data["name"] value = data["value"] else: name, value = data return self._manager.set_motor_lock(name, value) def _set_motor(self, data): if isinstance(data, dict): name = data["name"] value = data["value"] else: name, value = data try: value = float(value) except ValueError: return InvalidArgumentsErrorCode("SetMotor", value) return self._manager.set_motor(name, value, block=False) def _get_motor_moving(self, data): if isinstance(data, dict): name = data["name"] else: name = data motor = self._manager.get_motor(name) if motor is None: r = InvalidMotorErrorCode(name) else: r = motor.is_moving() return r # =============================================================================== # Positioning # =============================================================================== def _set_x_y(self, data): if isinstance(data, dict): x, y = data["x"], data["y"] else: x, y = data # try: # x, y = data # except (ValueError, AttributeError): # return InvalidArgumentsErrorCode('SetXY', '{}'.format(data)) try: x = float(x) except ValueError: return InvalidArgumentsErrorCode("SetXY", "x={}".format(x)) try: y = float(y) except ValueError: return InvalidArgumentsErrorCode("SetXY", "y{}".format(y)) # need to remember x,y so we can fool mass spec that we are at position self._manager.stage_manager.temp_position = x, y err = self._manager.stage_manager.set_xy(x, y) return err or "OK" def _set_x(self, data): return self._set_axis("x", data) def _set_y(self, data): return self._set_axis("y", data) def _set_z(self, data): return self._set_axis("z", data) def _get_position(self, data): smanager = self._manager.stage_manager z = smanager.get_z() if smanager.temp_position is not None and not smanager.moving(): x, y = smanager.temp_position else: x, y = smanager.get_calibrated_xy() pos = x, y, z return ",".join(["{:0.5f}".format(i) for i in pos]) def _get_drive_moving(self, data): return self._manager.stage_manager.moving() def _get_x_moving(self, data): return self._manager.stage_manager.moving(axis="x") def _get_y_moving(self, data): return self._manager.stage_manager.moving(axis="y") def _get_z_moving(self, data): return self._manager.stage_manager.moving(axis="z") def _stop_drive(self, data): self._manager.stage_manager.stop() return True def _set_drive_home(self, data): self._manager.stage_manager.define_home() return True def _set_home_x(self, data): return self._set_home_(self._manager, axis="x") def _set_home_y(self, data): return self._set_home_(self._manager, axis="y") def _set_home_z(self, data): return self._set_home_(self._manager, axis="z") def _set_sample_holder(self, name): if name is None: r = InvalidArgumentsErrorCode("SetSampleHolder", name) else: err = self._manager.stage_manager._set_stage_map(name) if not err: r = InvalidSampleHolderErrorCode(name) return r def _get_sample_holder(self, data): return self._manager.stage_manager.stage_map def _go_to_hole(self, data): if isinstance(data, dict): hole, autocenter = data["hole"], data["autocenter"] else: hole, autocenter = data try: hole = int(hole) autocenter = to_bool(autocenter) err = self._manager.stage_manager.move_to_hole( str(hole), correct_position=autocenter ) except (ValueError, TypeError): err = InvalidArgumentsErrorCode("GoToHole", (hole, autocenter)) return err or "OK" def _go_to_named_position(self, data): return self._manager.goto_named_position(data) def _go_to_point(self, data): return self._manager.goto_point(data) # def _trace_path(self, manager, value, pathname, kind): # return self._manager.trace_path(value, pathname, kind) def _is_tracing(self, data): return self._manager.isTracing() def _stop_trace(self, data): return self._manager.stop_trace() # =============================================================================== # Patterning # =============================================================================== def _get_pattern_names(self, data): ret = "" jogs = self._manager.get_pattern_names() if jogs: ret = ",".join(jogs) return ret def _do_pattern(self, data): if isinstance(data, dict): name = data["name"] duration = data["duration"] else: name, duration = data return self._manager.execute_pattern(name, duration) or "OK" def _is_patterning(self, data): return self._manager.isPatterning() def _abort_pattern(self, data): return self._manager.stop_pattern() or "OK" # =============================================================================== # Misc # =============================================================================== def _prepare(self, data): return self._manager.prepare() def _is_ready(self, data): return self._manager.is_ready() def _set_light(self, data): self._manager.set_light(data) return True # helpers def _set_axis(self, axis, value): try: d = float(value) except (ValueError, TypeError) as err: return InvalidArgumentsErrorCode("Set{}".format(axis.upper()), err) err = self._manager.stage_manager.single_axis_move(axis, d) return err or "OK" def _set_home_(self, **kw): """ """ err = self._manager.stage_manager.define_home(**kw) return err or "OK"
# ============= EOF =============================================