from __future__ import absolute_import
import rospy
from geometry_msgs.msg import WrenchStamped
from numpy import array as np_array
from numpy.linalg import norm as np_norm
# TU/e Robotics
from robot_skills.robot_part import RobotPart
from robot_skills.util.exceptions import TimeOutException
class ForceSensor(RobotPart):
def __init__(self, robot_name, tf_buffer, topic, force_norm_threshold=2.5):
"""
Class for conveniently using a force sensor
:param robot_name: Which robot is this part of?
:param tf_buffer: tf2_ros.Buffer for use in RobotPart
:param topic: Topic to use for WrenchStamped measurement
:param force_norm_threshold: Edge up if norm exceeds this value [N] (defaults to 2.5N)
"""
super(ForceSensor, self).__init__(robot_name=robot_name, tf_buffer=tf_buffer)
self._topic = topic
self._calibrated_msg = None
self._edge_up = False
self._force_norm_threshold = force_norm_threshold
[docs] def _wrench_callback(self, msg):
# type: (WrenchStamped) -> None
"""
Process a WrenchStamped-message to determine if the norm(force) goes over the limit
:param msg: WrenchStamped to see if it goes over self.force_norm_threshold
:return: None but sets self.edge_up
"""
def _detect_edge_up(calibrated_msg, msg):
"""
Determine if the difference between calibrated_msg and msg is larger than self._force_norm_threshold
:param calibrated_msg: Reference message
:param msg: Comparision message, to be compared with the reference message
:return: bool if the difference is larger than the threshold.
"""
def _norm(v):
return np_norm(np_array([v.x, v.y, v.z]))
calibrated_force_norm = _norm(calibrated_msg.wrench.force)
force_norm = _norm(msg.wrench.force)
rospy.logdebug("Force norm: %.2f", force_norm - calibrated_force_norm)
return abs(force_norm - calibrated_force_norm) > self._force_norm_threshold
if self._calibrated_msg is None:
self._calibrated_msg = msg
else:
self._edge_up = _detect_edge_up(self._calibrated_msg, msg)
[docs] def wait_for_edge_up(self, timeout=10.0):
"""
Returns if an edge up event is detected. Will raise a TimeOutException if no edge up is detected within timeout.
:param timeout: Edge up wait patience
"""
self._edge_up = False
self._calibrated_msg = None
subscriber = rospy.Subscriber(self._topic, WrenchStamped, self._wrench_callback, queue_size=1)
end_time = rospy.Time.now() + rospy.Duration.from_sec(timeout)
while not rospy.is_shutdown() and not self._edge_up:
if rospy.Time.now() > end_time:
raise TimeOutException("Force sensor edge up detection timeout of {} exceeded".format(timeout))
rospy.Rate(10).sleep()
subscriber.unregister()
rospy.loginfo("Edge up detected!")