2.1 创建 ROS 功能包
ROS(Robot Operating System)是一种开源的机器人软件框架,广泛用于机器人开发中。通过使用 ROS,开发者可以轻松创建和管理机器人应用程序。在本节中,我们将介绍如何创建一个 ROS 功能包并实现一些基本功能。
2.1.1 使用 ROS 主题
ROS 主题(Topic)是一种发布/订阅机制,允许节点之间进行通信。每个节点可以发布主题消息或订阅主题消息来获取数据。以下是如何使用 ROS 主题的步骤:
创建功能包
首先,我们需要创建一个新的 ROS 功能包。在终端中运行以下命令:
代码语言:javascript复制catkin_create_pkg de_ws my_robot rospy roscpp
此命令创建一个名为 my_robot
的功能包,并声明了对 std_msgs
、rospy
和 roscpp
的依赖。
创建发布者节点
接下来,我们在功能包中创建一个发布者节点。新建一个名为 talker.py
的文件,并添加以下内容:
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def talker():
pub = rospy.Publisher('chatter', String, queue_size=10)
rospy.init_node('talker', anonymous=True)
rate = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
此代码定义了一个发布者节点 talker
,它每秒钟发布一条 "hello world" 消息到主题 chatter
。
2.1.2 创建 ROS 节点
ROS 节点是 ROS 系统中的基本执行单元。每个节点可以执行一个任务,如传感器数据处理、运动控制等。下面我们创建一个订阅者节点来接收 talker
节点发布的消息。
创建订阅者节点
新建一个名为 listener.py
的文件,并添加以下内容:
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def callback(data):
rospy.loginfo(rospy.get_caller_id() " I heard %s", data.data)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber('chatter', String, callback)
rospy.spin()
if __name__ == '__main__':
listener()
此代码定义了一个订阅者节点 listener
,它接收主题 chatter
上的消息并打印出来。
2.1.3 编译节点
在我们运行节点之前,需要编译功能包。确保在功能包的 CMakeLists.txt
和 package.xml
文件中正确配置了依赖项。然后在终端中运行以下命令:
cd ~/catkin_ws
catkin_make
编译完成后,可以运行节点:
代码语言:javascript复制roscore
rosrun my_robot talker.py
rosrun my_robot listener.py
此时,你应该可以看到 listener
节点打印出 talker
节点发布的消息。
添加自定义的 .msg 文件和 .srv 文件
在 ROS 中,自定义消息类型和服务类型是很常见的需求。我们可以定义自己的消息和服务文件来满足特定的应用需求。
创建自定义 .msg 文件
首先,在 my_robot
功能包的 msg
目录下创建一个新的消息文件,例如 CustomMessage.msg
:
string content
int32 number
然后,在 CMakeLists.txt
文件中添加以下内容:
add_message_files(
FILES
CustomMessage.msg
)
在 package.xml
文件中添加依赖:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
最后,重新编译功能包:
代码语言:javascript复制catkin_make
创建自定义 .srv 文件
类似地,我们可以在 srv
目录下创建一个新的服务文件,例如 CustomService.srv
:
string request
---
string response
然后,在 CMakeLists.txt
文件中添加以下内容:
add_service_files(
FILES
CustomService.srv
)
在 package.xml
文件中添加依赖:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
重新编译功能包:
代码语言:javascript复制catkin_make
2.3 使用 ROS 服务
ROS 服务是一种请求/响应机制,允许节点之间进行同步通信。
2.3.1 使用 ROS actionlib
actionlib 是 ROS 中用于处理长时间运行任务的库。它提供了一种客户端-服务器架构,允许客户端请求服务器执行某些任务,并在任务完成时收到通知。
创建动作服务器
在 my_robot
功能包中创建一个新的 Python 文件 action_server.py
:
#!/usr/bin/env python
import rospy
import actionlib
from my_robot.msg import CustomAction, CustomActionFeedback, CustomActionResult
class CustomActionServer(object):
_feedback = CustomActionFeedback()
_result = CustomActionResult()
def __init__(self):
self._as = actionlib.SimpleActionServer("custom_action", CustomAction, self.execute_cb, False)
self._as.start()
def execute_cb(self, goal):
rospy.loginfo('Executing goal: %s', goal)
success = True
for i in range(1, goal.order):
if self._as.is_preempt_requested():
rospy.loginfo('Goal preempted')
self._as.set_preempted()
success = False
break
self._feedback.sequence = i
self._as.publish_feedback(self._feedback)
rospy.sleep(1.0)
if success:
self._result.sequence = goal.order
self._as.set_succeeded(self._result)
if __name__ == '__main__':
rospy.init_node('custom_action_server')
server = CustomActionServer()
rospy.spin()
创建动作客户端
在 my_robot 功能包中创建一个新的 Python 文件 action_client.py:
#!/usr/bin/env python
import rospy
import actionlib
from my_robot.msg import CustomAction, CustomActionGoal
def feedback_cb(feedback):
rospy.loginfo('Feedback: %s', feedback)
if __name__ == '__main__':
rospy.init_node('custom_action_client')
client = actionlib.SimpleActionClient('custom_action', CustomAction)
client.wait_for_server()
goal = CustomActionGoal()
goal.order = 10
client.send_goal(goal, feedback_cb=feedback_cb)
client.wait_for_result()
rospy.loginfo('Result: %s', client.get_result())
2.3.2 编译 ROS 动作服务器和客户端
在编译功能包之前,确保在 CMakeLists.txt
和 package.xml
中添加了对 actionlib
和自定义消息的依赖。
然后在终端中运行以下命令:
代码语言:javascript复制catkin_make
启动动作服务器和客户端:
代码语言:javascript复制rosrun my_robot action_server.py
rosrun my_robot action_client.py
2.4 创建启动文件
启动文件用于同时启动多个 ROS 节点,简化了复杂系统的启动过程。
创建一个新的启动文件 my_robot.launch
:
<launch>
<node pkg="my_robot" type="talker.py" name="talker" output="screen"/>
<node pkg="my_robot" type="listener.py" name="listener" output="screen"/>
</launch>
运行启动文件:
代码语言:javascript复制roslaunch my_robot my_robot.launch
2.5 主题、服务和 actionlib 的应用
在实际应用中,主题、服务和 actionlib 可以结合使用,实现复杂的机器人行为。例如,一个机器人可以通过主题获取传感器数据,通过服务进行路径规划,通过 actionlib 执行长时间的导航任务。
以下是一个综合应用示例:
- 主题用于发布传感器数据。
- 服务用于路径规划。
- actionlib 用于执行导航任务。
2.6 总结
本文介绍了如何创建 ROS 功能包,并使用主题、服务和 actionlib 实现机器人功能。通过这些基础知识,您可以构建复杂的机器人应用程序。
2.7 问题
在学习和使用 ROS 的过程中,可能会遇到以下问题:
- 功能包无法编译:检查依赖是否正确添加。
- 节点无法通信:确保主题和服务名称一致。
- 动作服务器和客户端无法连接:检查 actionlib 配置是否正确。