Topic quiz state machine programming template

Hi all!
To solve the topic quiz in the ROS Basics in 5 Days (Python) I created a state machine, but maybe there is a more efficient & elegant solution I don’t know yet. Is there a standard procedure or even a template how to implement a state machine in ROS2?
Kind regards, Fadrí

topics_quiz_node.py

# import the ROS2 python libraries
import rclpy
from rclpy.node import Node
# import Quality of Service library, to set the correct profile and reliability to read sensor data.
from rclpy.qos import ReliabilityPolicy, QoSProfile

# import the Twist interface from the geometry_msgs package
# from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry

# import the LaserScan module from sensor_msgs interface
from sensor_msgs.msg import LaserScan

# import the Twist interface from the geometry_msgs package
from geometry_msgs.msg import Twist

import math 
import numpy as np

# enumerate states
move_along_wall_to_gap, move_to_gap_center, rotate_if_gap_left, move_through_gap = range(4)

class TopicsQuizNode(Node):  # inheriting from class "Node"

    def __init__(self):
        # super() = constructor of parent class to initialize Node object with (param1 = node_name)
        super().__init__('topics_quiz_node')  
        self.odom_msg = Odometry()  # initialize messages
        self.laser_msg = LaserScan()  
        self.state = move_along_wall_to_gap  # initial state
        self.yaw_ini = 0
        self.tgt_angle = 0

        # create subscriber object to Topic `/scan` using Module `LaserScan`
        # with queue size of 10 messages
        # send received info to laser_callback method
        self.subscriber_laser = self.create_subscription(
            LaserScan,
            '/scan',
            self.laser_callback,
            QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))  # is the most used to read LaserScan data and some sensor data.
            
        # subscriber to Topic /odom in module Odometry, queue 10 msgs and send info to odom_callback
        self.subscriber_odom = self.create_subscription(
            Odometry,
            '/odom',  
            self.odom_callback,
            QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))  # QoS to read LaserScan

        # create publisher object to messages of Module `Twist` of Topic `/cmd_vel`
        # with queue size of 10 messages
        self.publisher_cmd_vel = self.create_publisher(Twist, 'cmd_vel', 10)

        # create a timer object to trigger `motion callback` method every `timer_period` [secs]
        # - timer_period = duration between 2 callbacks
        # - motion callback = method to be called by timer every timer_period [secs]
        self.timer_period = 0.3  # [secs]
        self.timer = self.create_timer(self.timer_period, self.motion)

        # create Twist message to pass on movement commands
        self.cmd_msg = Twist()

    def laser_callback(self, msg):  # laser callback method
        self.laser_msg = msg
        # pass

    def odom_callback(self, msg):  # odom callback method
        self.odom_msg = msg
        # print(msg.pose)
        # print(msg.twist)

    def euler_from_quaternion(self, quaternion):
            """
            Converts quaternion (w in last place) to euler roll, pitch, yaw
            quaternion = [x, y, z, w]
            Below should be replaced when porting for ROS2 Python tf_conversions is done.
            """
            x = quaternion[0]
            y = quaternion[1]
            z = quaternion[2]
            w = quaternion[3]

            sinr_cosp = 2 * (w * x + y * z)
            cosr_cosp = 1 - 2 * (x * x + y * y)
            roll = np.arctan2(sinr_cosp, cosr_cosp)

            sinp = 2 * (w * y - z * x)
            pitch = np.arcsin(sinp)

            siny_cosp = 2 * (w * z + x * y)
            cosy_cosp = 1 - 2 * (y * y + z * z)
            yaw = np.arctan2(siny_cosp, cosy_cosp)

            return roll, pitch, yaw


    def inf_angles(self,inf_angle_seed):
        # find l_angle and r_angle circumscribing a cone of dist = inf
        l_angle_diff, r_angle_diff = 0, 0
        l_angle = inf_angle_seed - l_angle_diff
        r_angle = inf_angle_seed + r_angle_diff
        l_dist = self.laser_msg.ranges[l_angle]  
        r_dist = self.laser_msg.ranges[r_angle]  

        while math.isinf(l_dist):
            l_angle_diff += 1 
            l_angle = inf_angle_seed - l_angle_diff
            l_dist = self.laser_msg.ranges[l_angle]
        
        while math.isinf(r_dist):
            r_angle_diff += 1 
            r_angle = inf_angle_seed + r_angle_diff
            r_dist = self.laser_msg.ranges[r_angle]
        
        return l_angle, r_angle


    # motion callback method (=state machine) triggered by timer 
    def motion(self):

        # convert quaternions to euler angles
        pose_q = [None] * 4  # initialize list of quaternion position input
        pose_q[0]=self.odom_msg.pose.pose.orientation.x
        pose_q[1]=self.odom_msg.pose.pose.orientation.y
        pose_q[2]=self.odom_msg.pose.pose.orientation.z
        pose_q[3]=self.odom_msg.pose.pose.orientation.w
        yaw = self.euler_from_quaternion(pose_q)[2]  # don't need roll and pitch
        # [roll, pitch, yaw] = self.euler_from_quaternion(pose_q)
        # self.get_logger().info('Publishing: "%s"' % str(yaw))

        # state 1: move forward until next to gap middle, gap detected at dist_left = inf
        if self.state == move_along_wall_to_gap:
            self.cmd_msg.angular.z = 0.0  # dont rotate
            self.cmd_msg.linear.x = 0.5  # move forward

            dist_left = self.laser_msg.ranges[90]
            if math.isinf(dist_left):  
                # if(dist_left==inf): stop and update state

                # stop if gap detected
                self.cmd_msg.linear.x = 0.0
                self.state = move_to_gap_center
            
        # state 2: move next to gap middle
        elif self.state == move_to_gap_center:

            # get angles of inf-cone around 90 deg (i.e. on left side)
            # self.get_logger().info('l_angle: "%s"' % str(l_angle))
            l_angle, r_angle = self.inf_angles(90)  

            # gap center if r_angle and l_angle have same difference from 90
            if abs(r_angle-90) > abs(90-l_angle):  
                # move slowly until next to gap center
                self.cmd_msg.linear.x = 0.1
                l_angle, r_angle = self.inf_angles(90)
            else:
                # once gap middle reached, stop and trigger next state
                self.cmd_msg.linear.x = 0.0
                # Euler angles in rad
                l_angle, r_angle = self.inf_angles(90)
                self.tgt_angle = (float(l_angle) + float(r_angle))/2 *(math.pi/180)   # [rad]
                # self.tgt_angle = math.pi/2   # [rad]
                self.yaw_ini = yaw  
                self.state = rotate_if_gap_left

        # state 3: rotate towards gap middle
        elif self.state == rotate_if_gap_left:

            # rotate slowly until yaw_angle = tgt_angle
            # self.get_logger().info('Publishing: "%s"' % str(yaw))
            if (yaw - self.yaw_ini) < self.tgt_angle:
                self.cmd_msg.angular.z = 0.3
            else:
                self.cmd_msg.angular.z = 0.0
                self.state = move_through_gap
        
        # state 4: move through gap and stop once we are through
        elif self.state == move_through_gap:
            dist_left_behind = self.laser_msg.ranges[90+70]

            # move forward until wall visible left_behind
            if math.isinf(dist_left_behind):  
                self.cmd_msg.linear.x = 0.5
            else:
                self.cmd_msg.linear.x = 0.0


        # publish values to Topic `/cmd_vel` 
        self.publisher_cmd_vel.publish(self.cmd_msg)

        # # print msg on console
        # self.get_logger().info('Publishing: "%s"' % self.cmd_msg)

def main(args=None):
    # initialize the ROS communication
    rclpy.init(args=args)
    # node constructor
    topics_quiz_node = TopicsQuizNode()
    # pause program execution, wait for request to kill node (ctrl+c)
    rclpy.spin(topics_quiz_node)
    # node destructor
    topics_quiz_node.destroy_node()
    # shutdown the ROS communication
    rclpy.shutdown()

if __name__ == '__main__':
    main()

There is a package called SMACH for python in ROS1. Its a state machine execution framework used extensively for composing robot behaviours. Its been ported to ROS2 by DeepX-inc which goes by the package name “executive_smach”.

There is yet another library called “YASMIN” by ulerobotics group.
GitHub - uleroboticsgroup/yasmin: YASMIN (Yet Another State MachINe).
You can check these links out for structuring your State machine in a more modular way.
Hope it helps!!

1 Like

This topic was automatically closed after 7 days. New replies are no longer allowed.