RocketMQ入门

2023-03-22 22:15:38 浏览数 (2)

RocketMQ是一个分布式消息队列系统,它最初由阿里巴巴开发并开源。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,被广泛应用于金融、电商、物流、游戏等领域。

本文将介绍如何入门使用RocketMQ。

一、安装RocketMQ

1.下载RocketMQ安装包

从官网下载最新版本的RocketMQ安装包,地址:http://rocketmq.apache.org/release_notes/release-notes-4.9.0/

2.解压安装包

将下载的安装包解压到指定目录下,如:/usr/local/rocketmq

3.配置环境变量

在/etc/profile文件中添加如下配置:

代码语言:txt复制
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=PATH:ROCKETMQ_HOME/bin

执行source /etc/profile命令使配置生效。

4.启动RocketMQ

进入RocketMQ安装目录下的bin目录,执行如下命令启动RocketMQ:

代码语言:txt复制
sh mqnamesrv

sh mqbroker -n localhost:9876

二、使用RocketMQ

1.创建主题

在RocketMQ中,消息发送和接收需要指定主题(topic)。创建主题可以使用RocketMQ提供的命令行工具mqadmin,命令如下:

代码语言:txt复制
sh mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t myTopic

2.发送消息

使用RocketMQ提供的Java API发送消息,代码如下:

代码语言:txt复制
public class Producer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic、Tag和消息体
Message msg = new Message("myTopic" /* Topic /, "TagA" / Tag /, "Hello RocketMQ" .getBytes(RemotingHelper.DEFAULT_CHARSET) / Message body */);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 打印发送结果
System.out.printf("%s%n", sendResult);
// 关闭Producer实例
producer.shutdown();
}
}

3.消费消息

使用RocketMQ提供的Java API消费消息,代码如下:

代码语言:txt复制
public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("myTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

以上代码中,消费者使用了DefaultMQPushConsumer类,它是RocketMQ提供的一个消息推送消费者,可以通过注册消息监听器来消费消息。

四、总结

本文介绍了如何安装和使用RocketMQ,包括创建主题、发送消息和消费消息。RocketMQ具有高可靠性、高吞吐量、高扩展性和良好的可维护性等特点,是一个非常优秀的消息队列系统。

0 人点赞