RocketMQ是一个分布式消息队列系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,被广泛应用于金融、电商、物流、游戏等领域。
本文将介绍如何入门使用RocketMQ。
一、安装RocketMQ
1.下载RocketMQ安装包
从官网下载最新版本的RocketMQ安装包,地址:http://rocketmq.apache.org/release_notes/release-notes-4.9.0/
2.解压安装包
将下载的安装包解压到指定目录下,如:/usr/local/rocketmq
3.配置环境变量
在/etc/profile文件中添加如下配置:
代码语言:txt复制export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=PATH:ROCKETMQ_HOME/bin
执行source /etc/profile命令使配置生效。
4.启动RocketMQ
进入RocketMQ安装目录下的bin目录,执行如下命令启动RocketMQ:
代码语言:txt复制sh mqnamesrv
sh mqbroker -n localhost:9876
二、使用RocketMQ
1.创建主题
在RocketMQ中,消息发送和接收需要指定主题(topic)。创建主题可以使用RocketMQ提供的命令行工具mqadmin,命令如下:
代码语言:txt复制sh mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t myTopic
2.发送消息
使用RocketMQ提供的Java API发送消息,代码如下:
代码语言:txt复制public class Producer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic、Tag和消息体
Message msg = new Message("myTopic" /* Topic /, "TagA" / Tag /, "Hello RocketMQ" .getBytes(RemotingHelper.DEFAULT_CHARSET) / Message body */);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 打印发送结果
System.out.printf("%s%n", sendResult);
// 关闭Producer实例
producer.shutdown();
}
}
3.消费消息
使用RocketMQ提供的Java API消费消息,代码如下:
代码语言:txt复制public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("myTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
以上代码中,消费者使用了DefaultMQPushConsumer类,它是RocketMQ提供的一个消息推送消费者,可以通过注册消息监听器来消费消息。
四、总结
本文介绍了如何安装和使用RocketMQ,包括创建主题、发送消息和消费消息。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,是一个非常优秀的消息队列系统。