Source code for robot_skills.robot_part

# Maintainer: Janno Lunenburg <jannolunenburg@gmail.com>

# System
import os

import actionlib
# ROS
import rospy

from tf2_ros import Buffer

# Determine simulation mode
SIM_MODE = os.environ.get("ROBOT_REAL", "false").lower() != "true"


[docs]class RobotPart(object): """ Base class for robot parts """ def __init__(self, robot_name, tf_buffer): """ Constructor :param robot_name: string with robot name :param tf_buffer: tf buffer object """ self.robot_name = robot_name self.tf_buffer = tf_buffer # type: Buffer # Body parts self.parts = dict() self.functionalities = [] self.__ros_connections = {} self.__diagnostics_name = "" # This is set to False by subscribe_hardware_status, because then apparently there is a meaningful check # If no such check exists, then assume it's operational unless overridden in subclass self._operational = True
[docs] def close(self): """ Close the robot part """ pass
[docs] def add_part(self, partname, part): """ Add a component part to the robot part. This is added to the parts dict and set as an attribute :param partname: name of the bodypart :param part: bodypart object """ self.parts[partname] = part setattr(self, partname, part)
[docs] def load_param(self, param_name, default=None): """ Loads a parameter from the parameter server, namespaced by robot name :param param_name: parameter name :param default: default value for when parameter unavailable :return: loaded parameters """ if default is None: return rospy.get_param('/' + self.robot_name + '/' + param_name) else: return rospy.get_param('/' + self.robot_name + '/' + param_name, default)
[docs] def wait_for_connections(self, timeout, log_failing_connections=True): """ Waits for the connections until they are connected :param timeout: timeout in seconds :param log_failing_connections: (bool) whether to log errors if not connected. This is useful when checking multiple robot parts in a loop :return: bool indicating whether all connections are connected """ start = rospy.Time.now() t = rospy.Duration(timeout) r = rospy.Rate(20) # Loop until the timeout while (rospy.Time.now() - start) < t: # If shutdown: return immediately without any further checks or prints if rospy.is_shutdown(): return False # If everything is connected: return True if len(self.__ros_connections) == 0: return True # Check all connections new_connections = {} for name, connection in self.__ros_connections.items(): rospy.logdebug("Checking {}".format(name)) connected = False # Check actionlib connection if isinstance(connection, actionlib.SimpleActionClient): connected = connection.wait_for_server(rospy.Duration(0.01)) elif isinstance(connection, rospy.ServiceProxy): # Check service connection # Need to use try-except in case of service since this throws an exception if not connected. try: connection.wait_for_service(timeout=0.01) connected = True except Exception: connected = False elif isinstance(connection, rospy.Subscriber): connected = connection.get_num_connections() >= 1 else: rospy.logerr("Don't know what to do with a {}".format(type(connection))) # If connected, remove from the list if connected: rospy.logdebug("Connected to {}".format(name)) # self.__ros_connections = {name: connection # for name, connection in self.__ros_connections.items() if name != k} else: new_connections[name] = connection self.__ros_connections = new_connections r.sleep() if log_failing_connections: for name, connection in self.__ros_connections.items(): rospy.logerr("{} not connected timely".format(name)) return False
[docs] def create_simple_action_client(self, name, action_type): """ Creates a simple actionlib client and waits for the action server :param name: string with the name of the action in the correct namespace :param action_type: action type of this action :return: the action client """ ac = actionlib.SimpleActionClient(name, action_type) self._add_connection(name, ac) return ac
[docs] def create_service_client(self, name, srv_type): """ Creates a service client and waits for the server :param name: string with the name of the service in the correct namespace :param srv_type: service type :return: the service client """ srv = rospy.ServiceProxy(name, srv_type) self._add_connection(name, srv) return srv
[docs] def create_subscriber(self, name, *args, **kwargs): """ Creates a Subscriber and add to the connections to check :param name: string with the name topic to subscribe :param args: other args passed to rospy.Subscriber :param kwargs: other keyword args passed to rospy.Subscriber :return: the Subscriber """ sub = rospy.Subscriber(name, *args, **kwargs) self._add_connection(name, sub) return sub
[docs] def _add_connection(self, name, connection): """ Adds a connection to the internal dict with connections that is used when initializing the robot object. :param name: name of the connection :param connection: connection to add. This might be a ServiceProxy, ActionClient or Subscriber """ self.__ros_connections[name] = connection
@property def operational(self): """ Check whether this bodypart's hardware is operational If the associated hardware is not yet up, has an error etc, the bodypart is not operational. Note: in in simulation: operational is always true :rtype: bool True is part is ready to do work, False otherwise """ if SIM_MODE: return True else: return self._operational
[docs] def subscribe_hardware_status(self, name): """ Start to check if the bodypart is operational. To do so, subscribe to the hardware status/diagnostics :param name: check on the level of the diagnostic_msgs/DiagnosticStatus with this name """ self._operational = False self.__diagnostics_name = name
[docs] def unsubscribe_hardware_status(self): """ When process_hardware_status sees that self.__diagnostics_name evaluates to False, it will not process the diagnostics and ignore them """ self.__diagnostics_name = None
[docs] def process_hardware_status(self, diagnostic_dict): """ hardware_status callback to determine if the bodypart is operational :param diagnostic_dict: dictionary[str, diagnostic_msgs.msg.DiagnosticStatus] :return: no return """ if not self.__diagnostics_name: return diag = diagnostic_dict.get(self.__diagnostics_name, None) if not diag: # (Logwarn throttle: show this message once every day (24 * 3600 seconds) rospy.logwarn_throttle(86400.0, 'no diagnostic msg received for the %s' % self.__diagnostics_name) else: # TODO: diagnostic_msgs.DiagnosticStatus.OK instead of our own enumeration. # 0. Stale # 1. Idle # 2. Operational # 3. Homing # 4. Error if diag.level != 2: self._operational = False else: self._operational = True
[docs] def reset(self): """ Reset body part and all its components, this function should not be overwritten. :return: Succes :rtype: bool """ results = {} self.selfreset() for partname, part in self.parts.items(): rospy.logdebug("Resetting {}".format(partname)) if self.robot_name == 'hero' and partname == 'torso': # Todo: get rid of this custom thing. rospy.logwarn("Skipping reset of %s", partname) else: part.reset() return all(results.values())
[docs] def selfreset(self): """ Reset of the body part itself. This function may be overwritten. Returns: bool """ return True