流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。
现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。
我们先给Order接口添加一个发送消息的方法。
代码语言:javascript复制public interface Order {
public void makeOrder(Order order);
public OrderSuccessResult getResult(Order order);
public void postOrder(Order order);
}
实现类实现该方法
代码语言:javascript复制@Data
@AllArgsConstructor
@NoArgsConstructor
@ServiceOrderVersion(value = 1)
@RequiredArgsConstructor
public class ServiceOrder extends AbstractOrder {
private Long id;
@NonNull
private String code;
@NonNull
private Store store;
@NonNull
private ProviderService service;
@NonNull
private Car car;
@NonNull
private Date serviceDate;
@NonNull
private String contact;
@NonNull
private String contactTel;
private AppUser user;
@NonNull
private String content;
private int status;
private Date createDate;
@Override
public void makeOrder(Order order) {
ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);
IdService idService = SpringBootUtil.getBean(IdService.class);
((ServiceOrder)order).setId(idService.genId());
((ServiceOrder)order).setCode(getCodeInfo(idService));
AppUser loginAppUser = AppUserUtil.getLoginAppUser();
AppUser user = new AppUser();
user.setId(loginAppUser.getId());
user.setUsername(loginAppUser.getUsername());
((ServiceOrder)order).setUser(user);
((ServiceOrder)order).setStatus(1);
((ServiceOrder)order).setCreateDate(new Date());
serviceOrderDao.save((ServiceOrder) order);
}
@Override
public OrderSuccessResult getResult(Order order) {
ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);
this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();
return this.orderSuccessResult.getResult(order);
}
@Override
public void postOrder(Order order) {
MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
CompletableFuture.runAsync(() ->
sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,
OwnerCarCenterMq.ROUTING_KEY_ORDER,
order)
);
}
private String getCodeInfo(IdService idService) {
String flow = String.valueOf(idService.genId());
flow = flow.substring(14,flow.length());
String pre = DateUtils.format(new Date(), DateUtils.pattern9);
return pre flow;
}
}
其中我们定义了这么一组队列名,交换机,和路由
代码语言:javascript复制public interface OwnerCarCenterMq {
/**
* 队列名
*/
String ORDER_QUEUE = "order";
/**
* 服务系统exchange名
*/
String MQ_EXCHANGE_ORDER = "order.topic.exchange";
/**
* 服务添加routing key
*/
String ROUTING_KEY_ORDER = "post.order";
}
为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。
代码语言:javascript复制@Configuration
public class RabbitmqConfig {
@Bean
public List<Queue> orderQueues() {
List<Queue> queues = new ArrayList<>();
for (int i = 1;i < 11;i ) {
Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE "_" i);
queues.add(queue);
}
return queues;
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);
}
@Bean
public List<Binding> bindingOrders() {
List<Binding> bindings = new ArrayList<>();
for (int i = 1;i < 11;i ) {
Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
.with(OwnerCarCenterMq.ROUTING_KEY_ORDER "_" i);
bindings.add(binding);
}
return bindings;
}
}
重新封装消息提供者,每次发送都随机选取一个路由来进行发送。
代码语言:javascript复制@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchange,String routingKey,Object content) {
log.info("send content=" content);
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
ThreadLocalRandom random = ThreadLocalRandom.current();
this.rabbitTemplate.convertAndSend(exchange,routingKey "_" random.nextInt(1,11),serialize(content));
}
/**
* 确认后回调:
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send ack fail, cause = " cause);
} else {
log.info("send ack success");
}
}
/**
* 失败后return回调:
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("send fail return-message = " new String(message.getBody()) ", replyCode: " replyCode ", replyText: " replyText ", exchange: " exchange ", routingKey: " routingKey);
}
/**
* 对消息对象进行二进制序列化
* @param o
* @return
*/
private byte[] serialize(Object o) {
Kryo kryo = new Kryo();
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Output output = new Output(stream);
kryo.writeObject(output, o);
output.close();
return stream.toByteArray();
}
}
我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。
Controller如下
代码语言:javascript复制@Slf4j
@RestController
public class OrderController {
private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();
private ThreadLocal<Order> orderService = new ThreadLocal<>();
@Autowired
private OrderBean orderBean;
@Transactional
@SuppressWarnings("unchecked")
@PostMapping("/makeeorder")
public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {
log.info(orderStr);
Order order = setOrderFactory(orderStr,type);
orderService.get().makeOrder(order);
orderService.get().postOrder(order);
return Result.success(orderService.get().getResult(order));
}
/**
* 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂
* @param orderStr
* @return
*/
private Order setOrderFactory(String orderStr,String type) {
Class<?> classType = orderBean.getOrderMap().get(type);
Object order = JSONObject.parseObject(orderStr, classType);
// if (orderStr.contains("service")) {
// order = JSON.parseObject(orderStr, ServiceOrder.class);
// }else if (orderStr.contains("product")) {
// order = JSON.parseObject(orderStr, ProductOrder.class);
// }
Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type "Factory");
this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));
// if (order instanceof ServiceOrder) {
// this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));
// }else if (order instanceof ProductOrder) {
// this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));
// }
orderService.set(orderFactory.get().getOrder());
return (Order) order;
}
}
最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控
代码语言:javascript复制@Slf4j
@Component
@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE "_" 1,
OwnerCarCenterMq.ORDER_QUEUE "_" 2,
OwnerCarCenterMq.ORDER_QUEUE "_" 3,
OwnerCarCenterMq.ORDER_QUEUE "_" 4,
OwnerCarCenterMq.ORDER_QUEUE "_" 5,
OwnerCarCenterMq.ORDER_QUEUE "_" 6,
OwnerCarCenterMq.ORDER_QUEUE "_" 7,
OwnerCarCenterMq.ORDER_QUEUE "_" 8,
OwnerCarCenterMq.ORDER_QUEUE "_" 9,
OwnerCarCenterMq.ORDER_QUEUE "_" 10})
public class ServiceOrderConsummer {
@Getter
private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();
@RabbitHandler
public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {
try {
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
ServiceOrder order = unSerialize(data);
this.serviceOrders.add(order);
log.info(String.valueOf(order));
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
log.info("receiver fail");
}
}
/**
* 反序列化
* @param data
* @return
*/
private ServiceOrder unSerialize(byte[] data) {
Input input = null;
try {
Kryo kryo = new Kryo();
input = new Input(new ByteArrayInputStream(data));
return kryo.readObject(input,ServiceOrder.class);
}
finally {
input.close();
}
}
}