我们都知道事务的四大特性,但是那是针对的数据库的事务。但是Rabbitmq的事务到底是表达何种意思?根据一般概念的规律来说,mq的事务和数据库事务是类似的。我们可以将mq看做是数据库。
rabbitmq提供了与三个事务相关的命令:select、commit、rollback
其中select表示将当前模式设置为标准事务模式,commit表示提交当前事务,rollback表示事物回滚。也就是说select开启事务,通过commit操作之后publish的消息一定在消息队列中,当然如果发生rollback回滚,那么消息队列中的消息就会被撤销掉。AMQP事务大概过程如下图所示:
大概得代码如下:
代码语言:javascript复制@Slf4j
@Configuration
public class RabbitConfig {
/**
* 消息转化
* @return
*/
@Bean
public MessageConverter customMessageConvert() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue directOneQueue() {
Map map=new HashMap<>();
map.put("x-max-priority",10);
return new Queue("DDD",true,false,false,map);
}
@Bean
public Queue directTwoQueue() {
Map map=new HashMap<>();
return new Queue("EEE",true,false,false,map);
}
/**
* 定义一个rabbitmq消息发送器
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
//mq事务是通过事务管理器提交的,这块不能设置为手动提交
// connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setChannelTransacted(true);
//这块也和发送消息确认有关系
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
/**
* 配置启用rabbitmq事务
* @param connectionFactory
* @return
*/
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
发送消息测试:
代码语言:javascript复制 //通过id是否为0决定是否抛出异常
@GetMapping(value = "/test/{id}")
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void test(@PathVariable int id) throws Exception {
try {
Test t=new Test();
t.setName("tianjingle-ceshi");
byte[] body = JSON.toJSONBytes(t, SerializeConfig.globalInstance);
//设置消息相关属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
messageProperties.setPriority(10);
messageProperties.setCorrelationId("tianjingle");
messageProperties.setReplyTo("EEE");
Message message1 =new Message(body, messageProperties);
rabbitTemplate.convertAndSend("DDD",message1);
int z=1/id;
}catch (Exception e){
throw new Exception("12");
}
}
事务回滚的情况。
事务提交的情况
总结:通过上述实践,我们认为AMQP的事务是完全可靠的,但是事务的加入势必会让消息队列的性能上有所损耗,因为每个步骤都需要broker做出响应。