CSE276A - ROS2
Introduction
ROS 2 (Robot Operating System 2) is a framework that runs on an existing OS. In a robot, there are separate programs that read camera data, controlling wheels, planning paths .etc. ROS 2 provides a standard way for all these independent programs to discover each other and exchange information.
Nodes
A node is a single program responsible for one specific task.
import rclpy # ROS Client Library for Python
from rclpy.node import Node
class SensorNode(Node):
def __init__(self):
# Initialize the node with the name 'sensor_node'
super().__init__('sensor_node')
self.get_logger().info('LIDAR sensor node has started.')Topics
Topics are channels where nodes can send (publish) or receive (subscribe) data. This communication is asynchronous. E.g., the sensor_node needs to continuously broadcast sensor data to anyone who cares.
-
Publisher
self.publisher_ = self.create_publisher(String, 'chatter', 10):Stringspecifies the message type,'chatter'is topic name,10is the the depth of the message queue.self.timer = self.create_timer(): register a timer event, the event is scheduled byrclpy.init().rclpy.init(): Initializes the rclpy library, first step, mandatory.rclpy.spin(node): ROS2 nodes are event-driven, this function will keep the node running and constantly listen for events. The program will stay at this line untilCtrl+Cis entered.node.destroy_node(): Gracefully cleans up the node’s internal resources.rclpy.shutdown(): Shuts down the rclpy Library.
import rclpy from rclpy.node import Node from std_msgs.msg import String class Talker(Node): def __init__(self): super().__init__('talker') self.publisher_ = self.create_publisher(String, 'chatter', 10) timer_period = 0.5 self.timer = self.create_timer(timer_period, self.timer_callback) # timer_callback is called every 0.5 seconds self.count = 0 def timer_callback(self): msg = String() msg.data = f'Hello ROS2: {self.count}' self.publisher_.publish(msg) self.get_logger().info(f'Publishing: "{msg.data}"') self.count += 1 def main(args=None): rclpy.init(args=args) node = Talker() # Creates an instance of the Talker node class rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() -
Subscriber
import rclpy from rclpy.node import Node from std_msgs.msg import String class Listener(Node): def __init__(self): super().__init__('listener') self.subscription = self.create_subscription( String, # message type 'chatter', # topic name self.listener_callback, # callback function 10 # queue size ) self.subscription def listener_callback(self, msg): self.get_logger().info(f'I heard: "{msg.data}"') def main(args=None): rclpy.init(args=args) node = Listener() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
Services
Services are used for two-way communication. This communication is synchronous.
-
Service Server
import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class AddTwoIntsServer(Node): def __init__(self): super().__init__('add_two_ints_server') # create a service self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback) def add_two_ints_callback(self, request, response): self.get_logger().info(f'Received request: a={request.a}, b={request.b}') response.sum = request.a + request.b self.get_logger().info(f'Returning response: sum={response.sum}') return response def main(args=None): rclpy.init(args=args) node = AddTwoIntsServer() rclpy.spin(node) # spin to waiting for requests node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() -
Client
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsClient(Node):
def __init__(self):
super().__init__('add_two_ints_client')
self.client = self.create_client(AddTwoInts, 'add_two_ints')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
self.request = AddTwoInts.Request()
self.request.a = 5
self.request.b = 7
def send_request(self):
self.future = self.client.call_async(self.request)
rclpy.spin_until_future_complete(self, self.future) # wait for response
return self.future.result()
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsClient()
response = node.send_request()
node.get_logger().info(f'Result of addition: {response.sum}')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()CSE276A - ROS2
http://example.com/2025/10/24/CSE276A/CSE276A-ROS2/