Source code for scanplans.move_and_do

"""Conduct the plans for the samples one by one."""
import typing as tp

import bluesky.plan_stubs as bps
from xpdacq.beamtime import Beamtime
from xpdacq.beamtime import xpd_configuration

from scanplans.mdgetters import get_from_sample
from scanplans.mdgetters import translate_to_plan, translate_to_sample


[docs]def move_and_do_many( bt: Beamtime, sps: tp.List[tp.Tuple[tp.Union[int, str], tp.Union[int, str, tp.Generator]]], wait_times: tp.Union[float, tp.List[float]] = 0., wait_at_first: bool = False, sample_x: str = "sample_x", sample_y: str = "sample_y", x_controller: str = "x_controller", y_controller: str = "y_controller", ) -> tp.List[tp.Generator]: """Move to the sample and conduct the bluesky plan on the sample one by one. Parameters ---------- bt : Beamtime The beamtime object. sps : list A list of (sample index, plan index). The index is shown in the 'bt.list()'. wait_times : float or list of float The wait time for all the samples. wait_at_first : bool Whether to wait before the plan is conducted for the first samples. If False, the plan will be conducted immediately to the first sample no matter how wait_time is set. sample_x : str The key to the x position of the sample in the sample information. Default 'sample_y'. sample_y : str The key to the y position of the sample in the sample information. Default 'sample_x'. x_controller : str The key to the x position controller in `~xpdacq.beamtime.xqd_configuration`. y_controller : str The key to the x position controller in `~xpdacq.beamtime.xqd_configuration`. Returns ------- plans : list A list of the bluesky plans. Each plan is a generator. """ if isinstance(wait_times, (int, float)): wait_times = [wait_times] * len(sps) else: wait_times = wait_times[:] if not wait_at_first: wait_times[0] = 0 return [ move_and_do_one( bt, s, p, wait_time=wt, sample_x=sample_x, sample_y=sample_y, x_controller=x_controller, y_controller=y_controller, ) for (s, p), wt in zip(sps, wait_times) ]
def move_and_do_one( bt: Beamtime, sample_ind: tp.Union[int, str], plan_ind: tp.Union[int, str, tp.Generator], wait_time: float = 0., sample_x: str = "sample_x", sample_y: str = "sample_y", x_controller: str = "x_controller", y_controller: str = "y_controller" ) -> tp.Generator: """Move to the sample and conduct the plan.""" sample = translate_to_sample(bt, sample_ind) plan = translate_to_plan(bt, plan_ind, sample) xc = xpd_configuration[x_controller] yc = xpd_configuration[y_controller] x = float(get_from_sample(sample, sample_x)) y = float(get_from_sample(sample, sample_y)) yield from bps.checkpoint() print("Start moving to sample {} at ({}, {}).".format(sample_ind, x, y)) yield from bps.mv(xc, x, yc, y) print("Finish. ") yield from bps.checkpoint() print("Start sleeping for {} s.".format(wait_time)) yield from bps.sleep(wait_time) print("Wake up.") yield from bps.checkpoint() print("Start plan {} for sample {}".format(plan_ind, sample_ind)) yield from plan print("Finish.")