11 import geometry_msgs.msg
14 from tf2_pykdl_ros
import VectorStamped
18 import tf2_geometry_msgs
23 from ed.entity
import Entity, Volume
25 import robot_smach_states.util.designators
as ds
26 from robot_smach_states.human_interaction
import Say
27 from robot_smach_states.world_model.world_model
import Inspect
28 from robot_smach_states.utility
import WriteDesignator
30 from robot_skills
import get_robot_from_argv
31 from robot_skills.robot
import Robot
32 from robot_skills.classification_result
import ClassificationResult
34 from robocup_knowledge
import load_knowledge
43 def __init__(self, robot, furniture_designator, entity_designator, max_number_items=2):
46 Drives to the designated furniture object, inspects this and selects the entity that will be pointed to
48 :param robot: (Robot) robot API object
49 :param furniture_designator: (EdEntityDesignator) designates the furniture object that was pointed to.
50 :param entity_designator: (EdEntityDesignator) writeable EdEntityDesignator
51 :param max_number_items: Max number of items in the entity designator
54 smach.StateMachine.__init__(self,
55 outcomes=[
"succeeded",
"failed"],
56 input_keys=[
"laser_dot"])
58 assert ds.is_writeable(entity_designator),
"Entity designator must be writeable for this purpose"
59 object_ids_des = ds.VariableDesignator([], resolve_type=[ClassificationResult])
60 inspect_area_des = ds.VariableDesignator(resolve_type=str)
61 nav_area_des = ds.VariableDesignator(resolve_type=str)
63 common_knowledge = load_knowledge(
'common')
65 @smach.cb_interface(outcomes=[
"done",
"failed"], input_keys=[
"laser_dot"])
66 def _select_inspect_area(userdata, robot, furniture_des, inspect_area_des):
67 def _ray_volume_check(ray_vs: VectorStamped, vol: Volume):
68 if vol.min_corner.z() <= ray_vs.vector.z() <= vol.max_corner.z():
71 return min(abs(ray_vs.vector.z()-vol.min_corner.z()), abs(ray_vs.vector.z()-vol.max_corner.z()))
73 entity_to_inspect = furniture_des.resolve()
74 if entity_to_inspect
is None:
75 inspect_area_des.write(
"on_top_of")
78 laser_dot = tf2_ros.convert(userdata[
"laser_dot"], VectorStamped)
79 laser_dot.header.stamp = rospy.Time()
80 laser_dot_entity_frame = robot.tf_buffer.transform(laser_dot, entity_to_inspect.uuid)
83 for vol_name, vol
in entity_to_inspect.volumes.items():
84 if vol_name
not in common_knowledge.get_inspect_areas(entity_to_inspect.uuid):
87 best_volumes.append((vol_name, _ray_volume_check(laser_dot_entity_frame, vol)))
90 rospy.loginfo(
"No volumes found from knowledge, using 'on_top_of'")
91 inspect_area_des.write(
"on_top_of")
94 best_volumes.sort(key=
lambda tup: tup[1], reverse=
False)
95 rospy.loginfo(f
"Volumes: {best_volumes}")
96 best_vol = best_volumes[0]
97 rospy.loginfo(f
"Using volume: {best_vol[0]}")
99 inspect_area_des.write(best_vol[0])
102 @smach.cb_interface(outcomes=[
"done",
"failed"])
103 def _select_nav_area(userdata, furniture_des, inspect_area_des, nav_area_des):
104 entity_to_inspect = furniture_des.resolve()
105 inspect_area = inspect_area_des.resolve()
106 if entity_to_inspect
is None or not inspect_area:
107 rospy.loginfo(
"Using backup nav_area: 'in_front_of'")
108 nav_area_des.write(
"in_front_of")
111 nav_area = common_knowledge.get_inspect_position(entity_to_inspect.uuid, inspect_area)
112 rospy.loginfo(f
"For {entity_to_inspect.uuid}:{inspect_area} selected '{nav_area}' to navigate")
113 nav_area_des.write(nav_area)
118 smach.StateMachine.add(
"SAY_GO",
119 Say(robot,
"Let's go to the {furniture_object}",
120 furniture_object=ds.AttrDesignator(furniture_designator,
"uuid",
122 transitions={
"spoken":
"CLEAR_FOUND_ENTITY_DESIGNATOR"})
124 smach.StateMachine.add(
'CLEAR_FOUND_ENTITY_DESIGNATOR',
125 WriteDesignator(object_ids_des.writeable, []),
126 transitions={
'written':
'SELECT_INSPECT_AREA'})
128 smach.StateMachine.add(
"SELECT_INSPECT_AREA", smach.CBState(_select_inspect_area,
130 furniture_designator,
131 inspect_area_des.writeable]),
132 transitions={
"done":
"SELECT_NAV_AREA",
133 "failed":
"SELECT_NAV_AREA"})
135 smach.StateMachine.add(
"SELECT_NAV_AREA", smach.CBState(_select_nav_area,
136 cb_args=[furniture_designator,
137 inspect_area_des.writeable,
138 nav_area_des.writeable]),
139 transitions={
"done":
"INSPECT_FURNITURE",
140 "failed":
"INSPECT_FURNITURE"})
142 smach.StateMachine.add(
"INSPECT_FURNITURE",
143 Inspect(robot=robot, entityDes=furniture_designator, searchArea=inspect_area_des,
144 objectIDsDes=object_ids_des, navigation_area=nav_area_des),
145 transitions={
"done":
"SELECT_ENTITY",
146 "failed":
"SAY_INSPECTION_FAILED"})
148 smach.StateMachine.add(
"SAY_INSPECTION_FAILED",
149 Say(robot,
"I am sorry but I was not able to reach the {furniture_object}",
150 furniture_object=ds.AttrDesignator(furniture_designator,
"uuid",
152 transitions={
"spoken":
"failed"})
154 @smach.cb_interface(outcomes=[
"succeeded",
"no_entities"],
155 input_keys=[
"laser_dot"])
156 def select_entity(userdata):
158 Selects the entity that the robot believes the operator has pointed to and that the robot will
161 Userdata contains key 'laser_dot' with value geometry_msgs.msg.PointStamped where the operator pointed
164 :param userdata: (dict)
165 :return: (str) outcome
167 assert userdata.laser_dot.header.frame_id.endswith(
"map"),
"Provide your laser dot in map frame"
170 entity_ids = [cr.uuid
for cr
in object_ids_des.resolve()]
171 rospy.loginfo(
"Segmented entities: {}".format(entity_ids))
174 all_entities = robot.ed.get_entities()
175 segmented_entities = [e
for e
in all_entities
if e.uuid
in entity_ids]
179 for entity
in segmented_entities:
185 size_x = max(shape.x_max - shape.x_min, 0.001)
186 size_y = max(shape.y_max - shape.y_min, 0.001)
188 if size_x > SIZE_LIMIT
or size_y > SIZE_LIMIT:
191 if not 1 / min(RATIO_LIMIT, 1000) <= size_x / size_y <= min(RATIO_LIMIT, 1000):
194 candidates.append(entity)
198 rospy.logwarn(
"No 'probable' entities left")
201 candidates_distance = []
203 closest_tuple = (
None,
None)
204 x_ref = userdata.laser_dot.point.x
205 y_ref = userdata.laser_dot.point.y
208 x_e = e.pose.frame.p.x()
209 y_e = e.pose.frame.p.y()
210 d_2d = math.hypot(x_ref - x_e, y_ref - y_e)
211 rospy.loginfo(
"Entity {} at {}, {}: distance = {}".format(e.uuid, x_e, y_e, d_2d))
212 candidates_distance.append((e, d_2d))
216 candidates_distance.sort(key=
lambda tup: tup[1], reverse=
False)
217 best_candidate = candidates_distance[0]
219 rospy.loginfo(
"Best entity: {} at {}".format(best_candidate[0].uuid, best_candidate[1]))
220 entity_designator.write(list(list(zip(*candidates_distance))[0][:max_number_items]))
224 smach.StateMachine.add(
"SELECT_ENTITY",
225 smach.CBState(select_entity),
226 transitions={
"succeeded":
"succeeded",
227 "no_entities":
"failed"})
230 if __name__ ==
"__main__":
232 rospy.init_node(
"test_furniture_inspection")
235 _robot = get_robot_from_argv(index=1)
238 furniture = ds.EdEntityDesignator(robot=_robot, uuid=
"kitchen_shelf")
239 entity_designator = ds.VariableDesignator(resolve_type=Entity)
241 ps = geometry_msgs.msg.PointStamped()
242 ps.header.frame_id =
"map"
246 user_data = smach.UserData()
247 user_data[
"laser_dot"] = ps
249 sm =
InspectFurniture(robot=_robot, furniture_designator=furniture, entity_designator=entity_designator.writeable)
250 sm.execute(user_data)