Hi all!
To solve the topic quiz in the ROS Basics in 5 Days (Python) I created a state machine, but maybe there is a more efficient & elegant solution I don’t know yet. Is there a standard procedure or even a template how to implement a state machine in ROS2?
Kind regards, Fadrí
topics_quiz_node.py
# import the ROS2 python libraries
import rclpy
from rclpy.node import Node
# import Quality of Service library, to set the correct profile and reliability to read sensor data.
from rclpy.qos import ReliabilityPolicy, QoSProfile
# import the Twist interface from the geometry_msgs package
# from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
# import the LaserScan module from sensor_msgs interface
from sensor_msgs.msg import LaserScan
# import the Twist interface from the geometry_msgs package
from geometry_msgs.msg import Twist
import math
import numpy as np
# enumerate states
move_along_wall_to_gap, move_to_gap_center, rotate_if_gap_left, move_through_gap = range(4)
class TopicsQuizNode(Node): # inheriting from class "Node"
def __init__(self):
# super() = constructor of parent class to initialize Node object with (param1 = node_name)
super().__init__('topics_quiz_node')
self.odom_msg = Odometry() # initialize messages
self.laser_msg = LaserScan()
self.state = move_along_wall_to_gap # initial state
self.yaw_ini = 0
self.tgt_angle = 0
# create subscriber object to Topic `/scan` using Module `LaserScan`
# with queue size of 10 messages
# send received info to laser_callback method
self.subscriber_laser = self.create_subscription(
LaserScan,
'/scan',
self.laser_callback,
QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE)) # is the most used to read LaserScan data and some sensor data.
# subscriber to Topic /odom in module Odometry, queue 10 msgs and send info to odom_callback
self.subscriber_odom = self.create_subscription(
Odometry,
'/odom',
self.odom_callback,
QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE)) # QoS to read LaserScan
# create publisher object to messages of Module `Twist` of Topic `/cmd_vel`
# with queue size of 10 messages
self.publisher_cmd_vel = self.create_publisher(Twist, 'cmd_vel', 10)
# create a timer object to trigger `motion callback` method every `timer_period` [secs]
# - timer_period = duration between 2 callbacks
# - motion callback = method to be called by timer every timer_period [secs]
self.timer_period = 0.3 # [secs]
self.timer = self.create_timer(self.timer_period, self.motion)
# create Twist message to pass on movement commands
self.cmd_msg = Twist()
def laser_callback(self, msg): # laser callback method
self.laser_msg = msg
# pass
def odom_callback(self, msg): # odom callback method
self.odom_msg = msg
# print(msg.pose)
# print(msg.twist)
def euler_from_quaternion(self, quaternion):
"""
Converts quaternion (w in last place) to euler roll, pitch, yaw
quaternion = [x, y, z, w]
Below should be replaced when porting for ROS2 Python tf_conversions is done.
"""
x = quaternion[0]
y = quaternion[1]
z = quaternion[2]
w = quaternion[3]
sinr_cosp = 2 * (w * x + y * z)
cosr_cosp = 1 - 2 * (x * x + y * y)
roll = np.arctan2(sinr_cosp, cosr_cosp)
sinp = 2 * (w * y - z * x)
pitch = np.arcsin(sinp)
siny_cosp = 2 * (w * z + x * y)
cosy_cosp = 1 - 2 * (y * y + z * z)
yaw = np.arctan2(siny_cosp, cosy_cosp)
return roll, pitch, yaw
def inf_angles(self,inf_angle_seed):
# find l_angle and r_angle circumscribing a cone of dist = inf
l_angle_diff, r_angle_diff = 0, 0
l_angle = inf_angle_seed - l_angle_diff
r_angle = inf_angle_seed + r_angle_diff
l_dist = self.laser_msg.ranges[l_angle]
r_dist = self.laser_msg.ranges[r_angle]
while math.isinf(l_dist):
l_angle_diff += 1
l_angle = inf_angle_seed - l_angle_diff
l_dist = self.laser_msg.ranges[l_angle]
while math.isinf(r_dist):
r_angle_diff += 1
r_angle = inf_angle_seed + r_angle_diff
r_dist = self.laser_msg.ranges[r_angle]
return l_angle, r_angle
# motion callback method (=state machine) triggered by timer
def motion(self):
# convert quaternions to euler angles
pose_q = [None] * 4 # initialize list of quaternion position input
pose_q[0]=self.odom_msg.pose.pose.orientation.x
pose_q[1]=self.odom_msg.pose.pose.orientation.y
pose_q[2]=self.odom_msg.pose.pose.orientation.z
pose_q[3]=self.odom_msg.pose.pose.orientation.w
yaw = self.euler_from_quaternion(pose_q)[2] # don't need roll and pitch
# [roll, pitch, yaw] = self.euler_from_quaternion(pose_q)
# self.get_logger().info('Publishing: "%s"' % str(yaw))
# state 1: move forward until next to gap middle, gap detected at dist_left = inf
if self.state == move_along_wall_to_gap:
self.cmd_msg.angular.z = 0.0 # dont rotate
self.cmd_msg.linear.x = 0.5 # move forward
dist_left = self.laser_msg.ranges[90]
if math.isinf(dist_left):
# if(dist_left==inf): stop and update state
# stop if gap detected
self.cmd_msg.linear.x = 0.0
self.state = move_to_gap_center
# state 2: move next to gap middle
elif self.state == move_to_gap_center:
# get angles of inf-cone around 90 deg (i.e. on left side)
# self.get_logger().info('l_angle: "%s"' % str(l_angle))
l_angle, r_angle = self.inf_angles(90)
# gap center if r_angle and l_angle have same difference from 90
if abs(r_angle-90) > abs(90-l_angle):
# move slowly until next to gap center
self.cmd_msg.linear.x = 0.1
l_angle, r_angle = self.inf_angles(90)
else:
# once gap middle reached, stop and trigger next state
self.cmd_msg.linear.x = 0.0
# Euler angles in rad
l_angle, r_angle = self.inf_angles(90)
self.tgt_angle = (float(l_angle) + float(r_angle))/2 *(math.pi/180) # [rad]
# self.tgt_angle = math.pi/2 # [rad]
self.yaw_ini = yaw
self.state = rotate_if_gap_left
# state 3: rotate towards gap middle
elif self.state == rotate_if_gap_left:
# rotate slowly until yaw_angle = tgt_angle
# self.get_logger().info('Publishing: "%s"' % str(yaw))
if (yaw - self.yaw_ini) < self.tgt_angle:
self.cmd_msg.angular.z = 0.3
else:
self.cmd_msg.angular.z = 0.0
self.state = move_through_gap
# state 4: move through gap and stop once we are through
elif self.state == move_through_gap:
dist_left_behind = self.laser_msg.ranges[90+70]
# move forward until wall visible left_behind
if math.isinf(dist_left_behind):
self.cmd_msg.linear.x = 0.5
else:
self.cmd_msg.linear.x = 0.0
# publish values to Topic `/cmd_vel`
self.publisher_cmd_vel.publish(self.cmd_msg)
# # print msg on console
# self.get_logger().info('Publishing: "%s"' % self.cmd_msg)
def main(args=None):
# initialize the ROS communication
rclpy.init(args=args)
# node constructor
topics_quiz_node = TopicsQuizNode()
# pause program execution, wait for request to kill node (ctrl+c)
rclpy.spin(topics_quiz_node)
# node destructor
topics_quiz_node.destroy_node()
# shutdown the ROS communication
rclpy.shutdown()
if __name__ == '__main__':
main()