抽取工具类
代码语言:javascript
复制public class untils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.231.132");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
启动两个工作线程
代码语言:javascript
复制public class work03 {
public final static String QUEUE_NAME="hello3";
public static void main(String[] args) throws Exception {
System.out.println("c2应答短....");
Channel channel = untils.getChannel();
/**
* 消费者信息
* 1.消费哪个队列
* 2.消费成功以后是否要自动应答,true自动应答,false手动挡
* 3.消费者未成功消费的回调内容1
* 4.消费者取消的回调
*
*/
//声明 接收消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
System.out.println("开始休眠1s...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1.消息标记
// 2.false 代表只应答接收到的哪个传递的信息,true为应答所有的消息包括传递过来的消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("接收到的消息" new String(delivery.getBody()));
};
//取消 消息的回调
CancelCallback cancelCallback= consumerTag -> {
System.out.println(consumerTag "消息消费者中断");
};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
代码语言:javascript
复制public class work03 {
public final static String QUEUE_NAME="hello4";
public static void main(String[] args) throws Exception {
System.out.println("c2应答长....");
Channel channel = untils.getChannel();
/**
* 消费者信息
* 1.消费哪个队列
* 2.消费成功以后是否要自动应答,true自动应答,false手动挡
* 3.消费者未成功消费的回调内容1
* 4.消费者取消的回调
*
*/
//声明 接收消息
DeliverCallback deliverCallback=(consumerTag, delivery)->{
System.out.println("开始休眠10s...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1.消息标记
// 2.false 代表只应答接收到的哪个传递的信息,true为应答所有的消息包括传递过来的消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("接收到的消息" new String(delivery.getBody()));
};
//取消 消息的回调
CancelCallback cancelCallback= consumerTag -> {
System.out.println(consumerTag "消息消费者中断");
};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
生产者
代码语言:javascript
复制public class produce03 {
public static final String QUEUE_NAME="hello4";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = untils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制太中接受消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext())
{
String message=scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));;
System.out.println("发送消息完成" message);
}
}
}
结果