CSE276A - ROS2

Introduction

ROS 2 (Robot Operating System 2) is a framework that runs on an existing OS. In a robot, there are separate programs that read camera data, controlling wheels, planning paths .etc. ROS 2 provides a standard way for all these independent programs to discover each other and exchange information.

Nodes

A node is a single program responsible for one specific task.

import rclpy # ROS Client Library for Python
from rclpy.node import Node

class SensorNode(Node):
    def __init__(self):
        # Initialize the node with the name 'sensor_node'
        super().__init__('sensor_node')
        self.get_logger().info('LIDAR sensor node has started.')

Topics

Topics are channels where nodes can send (publish) or receive (subscribe) data. This communication is asynchronous. E.g., the sensor_node needs to continuously broadcast sensor data to anyone who cares.

  • Publisher

    • self.publisher_ = self.create_publisher(String, 'chatter', 10): String specifies the message type, 'chatter' is topic name, 10 is the the depth of the message queue.
    • self.timer = self.create_timer(): register a timer event, the event is scheduled by rclpy.init().
    • rclpy.init(): Initializes the rclpy library, first step, mandatory.
    • rclpy.spin(node): ROS2 nodes are event-driven, this function will keep the node running and constantly listen for events. The program will stay at this line until Ctrl+C is entered.
    • node.destroy_node(): Gracefully cleans up the node’s internal resources.
    • rclpy.shutdown(): Shuts down the rclpy Library.
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    
    class Talker(Node):
        def __init__(self):
            super().__init__('talker')
            self.publisher_ = self.create_publisher(String, 'chatter', 10)
            timer_period = 0.5
            self.timer = self.create_timer(timer_period, self.timer_callback) # timer_callback is called every 0.5 seconds
            self.count = 0
    
        def timer_callback(self):
            msg = String()
            msg.data = f'Hello ROS2: {self.count}'
            self.publisher_.publish(msg)
            self.get_logger().info(f'Publishing: "{msg.data}"')
            self.count += 1
    
    def main(args=None):
        rclpy.init(args=args)
        node = Talker() # Creates an instance of the Talker node class
        rclpy.spin(node) 
        node.destroy_node()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
  • Subscriber

    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    
    class Listener(Node):
        def __init__(self):
            super().__init__('listener')
            self.subscription = self.create_subscription(
                String,          # message type
                'chatter',       # topic name
                self.listener_callback,  # callback function
                10               # queue size
            )
            self.subscription
    
        def listener_callback(self, msg):
            self.get_logger().info(f'I heard: "{msg.data}"')
    
    def main(args=None):
        rclpy.init(args=args)
        node = Listener()
        rclpy.spin(node)
        node.destroy_node()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()

Services

Services are used for two-way communication. This communication is synchronous.

  • Service Server

    import rclpy
    from rclpy.node import Node
    from example_interfaces.srv import AddTwoInts
    
    class AddTwoIntsServer(Node):
        def __init__(self):
            super().__init__('add_two_ints_server')
            # create a service
            self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
    
        def add_two_ints_callback(self, request, response):
            self.get_logger().info(f'Received request: a={request.a}, b={request.b}')
            response.sum = request.a + request.b
            self.get_logger().info(f'Returning response: sum={response.sum}')
            return response
    
    def main(args=None):
        rclpy.init(args=args)
        node = AddTwoIntsServer()
        rclpy.spin(node)  # spin to waiting for requests
        node.destroy_node()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
  • Client

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class AddTwoIntsClient(Node):
    def __init__(self):
        super().__init__('add_two_ints_client')
        self.client = self.create_client(AddTwoInts, 'add_two_ints')

        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting...')

        self.request = AddTwoInts.Request()
        self.request.a = 5
        self.request.b = 7

    def send_request(self):
        self.future = self.client.call_async(self.request)
        rclpy.spin_until_future_complete(self, self.future) # wait for response
        return self.future.result()

def main(args=None):
    rclpy.init(args=args)
    node = AddTwoIntsClient()
    response = node.send_request()
    node.get_logger().info(f'Result of addition: {response.sum}')
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

CSE276A - ROS2
http://example.com/2025/10/24/CSE276A/CSE276A-ROS2/
Author
Songlin Zhao
Posted on
October 24, 2025
Licensed under