test_tools
src/test_tools/odometer.py
Go to the documentation of this file.
1 #! /usr/bin/python
2 
3 from __future__ import print_function
4 
5 import csv
6 import os
7 import socket
8 import time
9 from fnmatch import fnmatch
10 from math import pi, pow, sqrt
11 
12 import rospy
13 from nav_msgs.msg import Odometry
14 from tf_conversions import transformations
15 
16 DEFAULT_PATH = "~/odometer"
17 DEFAULT_FILENAME = 'odometer'
18 EXT = '.csv'
19 ROUND_LEVEL = 5
20 
21 
22 class Odometer:
23  """
24  Odometer logs odometry. The odometry is measured in a callback function and sampled to a data storage in sample().
25  The data is written to a csv file, which is appended with a date. This is done is write(). In shutdown(), sample() and
26  write() are called to prevent data lost. This function is called on rospy shutdown. You can activate periodic
27  writing by calling active_write() in a loop, which the maximum length of your data storage as argument.
28  """
29  def __init__(self, path=DEFAULT_PATH, filename=DEFAULT_FILENAME):
30  """
31  Constructor
32  In the constructor old data is retrieved, if possible. Otherwise it starts from zero.
33 
34  :param path: Path to store the data. Path is expanded by hostname(lowercase). Can be relative or in home folder
35  :type path: str
36  :param filename: Filename of data file. Filenames are appended with date and extension.
37  :type filename: str
38  """
39  if fnmatch(path, "~*"): # if path is in home folder
40  path = os.path.expanduser(path)
41  path = os.path.abspath(path) # returns abs path, also when path is already abs.
42 
43  hostname = socket.gethostname()
44  date = time.strftime("%Y_%m_%d")
45 
46  hostfolderpath = os.path.join(os.path.expanduser(path), hostname.lower())
47  self.newfilepath = os.path.join(hostfolderpath, filename + "_" + date + EXT)
48  lastfilepath = ""
49 
50  self.file_has_header = False
51 
52  self.total_time = 0
53  self.total_distance = 0
54  self.total_rotation = 0
55 
56  self.data = [] # list of dicts with data
57  self.last_pose = None
58  self.last_time = rospy.Time.now().secs
59 
60  # Check if there exist a previous file to read from
61  if os.path.exists(hostfolderpath):
62  if os.path.exists(self.newfilepath):
63  lastfilepath = self.newfilepath
64  rospy.logdebug("Last data file is today's file")
65  else:
66  files = [item for item in sorted(os.listdir(hostfolderpath), reverse=True) if
67  os.path.isfile(os.path.join(hostfolderpath, item))]
68  if files:
69  for file in files:
70  if fnmatch(file, filename+"_*"+EXT):
71  filepath = os.path.join(hostfolderpath, file)
72  lastfilepath = filepath
73  rospy.logdebug("Found last file: {}".format(lastfilepath))
74  break
75 
76  if not lastfilepath:
77  rospy.logdebug("Not found a correct data file in the folder: {}".format(hostfolderpath))
78  else:
79  rospy.logdebug("No data files there yet in: {}".format(hostfolderpath))
80  else:
81  os.makedirs(hostfolderpath)
82  rospy.logdebug("No folder for hostname: '{}' found".format(hostname))
83 
84  # look for data in last data file, if found
85  if not lastfilepath:
86  rospy.logdebug("No previous data file found. Starting from zero")
87  else:
88  rospy.logdebug("Reading from last data file: {}".format(lastfilepath))
89  with open(lastfilepath, "r") as f:
90  reader = csv.reader(f)
91  # in case an empty file is there. The first line will stay empty. Therefore the header needs to be found
92  found_header = False
93  for header in reader:
94  if header == ['timestamp', 'distance', 'rotation', 'time']:
95  found_header = True
96  break
97 
98  if found_header:
99  # Iterate over all lines and get the last valid line. So it continues after an invalid line.
100  last_row = None
101  while True:
102  try:
103  last_row = next(reader)
104  except csv.Error:
105  pass
106  except StopIteration:
107  break
108  if last_row:
109  last_row = dict(zip(header, last_row))
110  try:
111  self.total_distance = float(last_row['distance'])
112  self.total_rotation = float(last_row['rotation'])
113  self.total_time = int(float(last_row['time']))
114  if lastfilepath == self.newfilepath:
115  self.file_has_header = True
116  rospy.logdebug("Loaded data from file: {}".format(lastfilepath))
117  except Exception as e:
118  rospy.logerr(e)
119  rospy.signal_shutdown("Unable to read last data. Last data is corrupt")
120  else:
121  rospy.logerr("No header found in file: {}".format(lastfilepath))
122  rospy.signal_shutdown("Shutdown, because last data file has no header")
123 
124  rospy.loginfo("Logging odometry to file: {}".format(self.newfilepath))
125 
126  rospy.Subscriber("odom", Odometry, self.callback)
127  rospy.on_shutdown(lambda: self.shutdown())
128 
129  def sample(self):
130  """
131  Current measurements are stored in self.data
132 
133  :return: no return
134  """
135  new_time = rospy.Time.now().secs
136  time_delta = new_time - self.last_time
137  self.total_time += time_delta
138  self.last_time = new_time
139 
140  timestamp = time.strftime("%Y_%m_%d_%H_%M_%S")
141  dist = round(self.total_distance, ROUND_LEVEL)
142  rot = round(self.total_rotation, ROUND_LEVEL)
143  t = self.total_time
144  self.data.append({'timestamp': timestamp, 'distance': dist, 'rotation': rot, 'time': t})
145 
146  def write(self):
147  """
148  Writing all data in self.data to the data file and closing it again. This should prevent file corruption.
149 
150  :return: no return
151  """
152  # Create today's file if not already there
153  if os.path.exists(self.newfilepath):
154  rospy.logdebug("Today's file already exists")
155  else:
156  rospy.logdebug("First time writing in today's file")
157 
158  with open(self.newfilepath, "a", 1) as new_file: # 1=line-buffered
159  try:
160  writer = csv.DictWriter(new_file, fieldnames=['timestamp', 'distance', 'rotation', 'time'])
161  if not self.file_has_header:
162  rospy.logdebug("Printing header of csv file")
163  writer.writeheader()
164  self.file_has_header = True
165  if self.data:
166  rospy.logdebug("Writing data to csv file")
167  writer.writerows(self.data)
168  self.data = []
169 
170  except Exception as e:
171  rospy.logerr(e)
172 
173  def callback(self, msg):
174  """
175  Measuring the displacement based on the new position
176 
177  :param msg: Odometry msg with position and rotation information
178  :type msg: nag_msgs.msg.Odometry
179  :return: no return
180  """
181  if self.last_pose:
182  new_pose = msg.pose.pose
183  pos = new_pose.position
184  pos_old = self.last_pose.position
185  distance_delta = sqrt(pow(pos.x-pos_old.x, 2) + pow(pos.y-pos_old.y, 2))
186  if distance_delta < 0.5: # If delta is too big, it is incorrect. Not doing anything with this data
187  self.total_distance += distance_delta
188 
189  new_orientation = new_pose.orientation
190  old_orientation = self.last_pose.orientation
191  new_rotation = transformations.euler_from_quaternion([new_orientation.x,
192  new_orientation.y,
193  new_orientation.z,
194  new_orientation.w])
195  old_rotation = transformations.euler_from_quaternion([old_orientation.x,
196  old_orientation.y,
197  old_orientation.z,
198  old_orientation.w])
199  rotation_delta = new_rotation[2] - old_rotation[2]
200  if rotation_delta >= pi:
201  rotation_delta -= 2*pi
202  elif rotation_delta <= -pi:
203  rotation_delta += 2*pi
204  self.total_rotation += abs(rotation_delta)
205  else:
206  rospy.logerr("Distance delta too big (%f m), ignoring this step" % distance_delta)
207 
208  self.last_pose = msg.pose.pose
209 
210  def activate_write(self, length):
211  """
212  If self.data contains more than X samples, all the data is written to the file
213 
214  :param length:
215  :type length: int
216  :return: no return
217  """
218  if len(self.data) >= length:
219  self.write()
220 
221  def shutdown(self):
222  """
223  To prevent data loss between last sample and shutdown, this function is called to sample and write all the data.
224 
225  :return: no return
226  """
227  self.sample()
228  self.write()
test_tools.odometer.Odometer.shutdown
def shutdown(self)
Definition: src/test_tools/odometer.py:221
test_tools.odometer.Odometer.total_distance
total_distance
Definition: src/test_tools/odometer.py:53
test_tools.odometer.Odometer.callback
def callback(self, msg)
Definition: src/test_tools/odometer.py:173
test_tools.odometer.Odometer.newfilepath
newfilepath
Definition: src/test_tools/odometer.py:47
test_tools.odometer.Odometer.file_has_header
file_has_header
Definition: src/test_tools/odometer.py:50
test_tools.odometer.Odometer.activate_write
def activate_write(self, length)
Definition: src/test_tools/odometer.py:210
test_tools.odometer.Odometer.total_rotation
total_rotation
Definition: src/test_tools/odometer.py:54
test_tools.odometer.Odometer.sample
def sample(self)
Definition: src/test_tools/odometer.py:129
test_tools.odometer.Odometer.total_time
total_time
Definition: src/test_tools/odometer.py:52
test_tools.odometer.Odometer.last_pose
last_pose
Definition: src/test_tools/odometer.py:57
test_tools.odometer.Odometer.last_time
last_time
Definition: src/test_tools/odometer.py:58
test_tools.odometer.Odometer.write
def write(self)
Definition: src/test_tools/odometer.py:146
test_tools.odometer.Odometer.data
data
Definition: src/test_tools/odometer.py:56
test_tools.odometer.Odometer
Definition: src/test_tools/odometer.py:22
test_tools.odometer.Odometer.__init__
def __init__(self, path=DEFAULT_PATH, filename=DEFAULT_FILENAME)
Definition: src/test_tools/odometer.py:29