生产组:用于消息的发送。 消费组:用于消息的订阅处理。 生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员。每次只会随机的发给每个集群中的一员。
生产者使用
- 创建生产者对象 DefaultMQProducer
- 设置NamesrvAddr
- 启动生产者服务
- 创建消息并发送
代码实现如下: 同步发送:
代码语言:javascript复制// 创建DefaultMQProducer消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
//设置NameServer节点地址,多个节点间用分号分割
producer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
//与NameServer建立长连接
producer.start();
for(int i = 0 ; i <5; i ) {
// 1. 创建消息
Message message = new Message("test_quick_topic", // 主题
"TagA", // 标签
"key" i, // 用户自定义的key ,唯一的标识
("Hello RocketMQ" i).getBytes()); // 消息内容实体(byte[])
// 2.1 同步发送消息
// if(i == 1) {
// message.setDelayTimeLevel(3);
// }
// 发送消息,获取发送结果
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
// SendResult sr = producer.send(message);
// SendStatus status = sr.getSendStatus();
// System.err.println(status);
//System.err.println("消息发出: " sr);
}
// 消息发送完毕关闭连接
producer.shutdown();
异步发送:
代码语言:javascript复制 // 2.2 异步发送消息
producer.send(message, new SendCallback() {
//rabbitmq急速入门的实战: 可靠性消息投递
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msgId: " sendResult.getMsgId() ", status: " sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
System.err.println("------发送失败");
}
});
执行生产者发送消息,可以看到控制台输出如下:
在对应的控制台可以查看到对应的消息主题
在消息页签可以通过topic查询到消息,也可以通过message_key和message_id查询。
消费者使用
- 创建消费者对象 DefaultMQPushConsumer
- 设置NamesrvAddr及其消费位置ConsumeFromWhere
- 设置订阅主题subscribe
- 注册监听并消费registerMessageListener
具体代码实现如下:
代码语言:javascript复制 // 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
// 设置NameServer节点
consumer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
// 设置消费位置,从哪个点开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/*订阅主题,
consumer.subscribe包含两个参数:
topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。
subExpression: 子表达式用于筛选tags。
同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
例如:设置为*,则代表接收所有tags数据。
例如:设置为2022S1,则Broker中只有tags=2022S1的消息会被接收,而2022S2就会被排除在外。
*/
consumer.subscribe("test_quick_topic", "*");
// 创建监听,当有新的消息监听程序会及时捕捉并加以处理。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
// if(keys.equals("key1")) {
// System.err.println("消息消费失败..");
// int a = 1/0;
// }
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " topic ",tags: " tags ", keys: " keys ",body: " msgBody);
} catch (Exception e) {
e.printStackTrace();
// int recousumeTimes = me.getReconsumeTimes();
// System.err.println("recousumeTimes: " recousumeTimes);
// if(recousumeTimes == 3) {
// // 记录日志....
// // 做补偿处理
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者,与Broker建立长连接,开始监听。
consumer.start();
System.err.println("consumer start...");
上述注释代码模拟了消息出现异常的情况,如果连续三次消费失败,则记录日志做补偿处理,并返回成功。
本文内容到此结束了, 如有收获欢迎点赞