amq简单使用_rabbitmq发送消息

2022-08-01 08:31:41 浏览数 (1)

大家好,我是架构君,一个会写代码吟诗的架构师。今天说一说amq简单使用_rabbitmq发送消息,希望能够帮助大家进步!!!

queue sender

代码语言:javascript复制
package org.arrow.amq.test;
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnection;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.apache.activemq.jms.pool.PooledProducer;
import org.apache.activemq.jms.pool.PooledSession;

import java.util.Random;


public class Sender { 
   

    public static void main(String[] args) throws JMSException, InterruptedException {
        final String URL = "failover:(tcp://192.168.2.44:61616,tcp://192.168.2.48:61616,tcp://192.168.2.49:61616)";


        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, URL);

        PooledConnectionFactory fact2 = new PooledConnectionFactory();
        fact2.setConnectionFactory(factory);


        Connection connection = fact2.createConnection();

        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);


        Destination desc = session.createQueue("TestQueue");
        MessageProducer producer = session.createProducer(desc);


        Queue replyTo = session.createQueue("TestReplyToQueue");

        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

       while(true){
           int flag = new Random().nextInt(1024);
           ObjectMessage msg = session.createObjectMessage("hello world..."   flag);
            // 使用不同的属性,消费时可以只消费指定的消息
            msg.setStringProperty("group", (flag % 2)   "" );
            // 可以在此放一个replyTo, 告诉消费者回复到哪个消息队列
            // 消费者可以在onMessage时取出来,并手动生成一个生成者发送数据到该队列
            msg.setJMSReplyTo(replyTo);
            producer.send(msg);
           Thread.sleep(1000);
        }
// session.commit();
// System.out.println("sent...");
// session.close();
// connection.close();
    }

}

只听到从架构师办公室传来架构君的声音:

小桃无主自开花,烟草茫茫带晚鸦。有谁来对上联或下联?

receiver

代码语言:javascript复制
package org.arrow.amq.test;
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver { 
   

    static int i = 0;

    public static void main(String[] args) throws JMSException {
        final String URL = "failover:(tcp://192.168.2.44:61616,tcp://192.168.2.49:61616,tcp://192.168.2.49:61616)";
        // 连接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, URL);
        // 获取连接
        final Connection connection = factory.createConnection();
        connection.start();
        // 生成session, 参数1true,开启事务,必须commit确认消费,false时,自动应答
        // true   Session.AUTO_ACKNOWLEDGE : 不commit时不会确认消费
        // false   Session.AUTO_ACKNOWLEDGE : 接受到即确认消费
        // false   Session.CLIENT_ACKNOWLEDGE: 不自动确认,需要手动确认 message.acknowledge()确认消费
        // false   Session.DUPS_OK_ACKNOWLEDGE 不需要确认,也会自动消费

        // AUTO_ACKNOWLEDGE:自动确认模式。
        // DUPS_OK_ACKNOWLEDGE:允许确认模式的副本。接收应用程序来处理在会话对象的方法调用返回的消息后会收到一条确认消息,并允许重复确认。
        // CLIENT_ACKNOWLEDGE 客户端手动确认
        // 重要: true时将忽略b值,自动被设为SESSION_TRANSACTED

        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 操作目标队列或主题 queue("TestQueue") 或 session.createTopic("TestTopic")
        Destination destination = session.createQueue("TestQueue");
        // 生成消费者,同一队列中两个消息者消费不同属性的消息
        MessageConsumer consumer1 = session.createConsumer(destination, "group='1'");
        MessageConsumer consumer2 = session.createConsumer(destination, "group='0'");
        // 方式1,使用consumer.receive()
// ObjectMessage message = (ObjectMessage)consumer.receive();
// if (message != null) { 
   
// String messageString = (String)message.getObject();
// System.out.println("Receive1 : "   messageString);
// }
        // 方式2:注册一个listener
        consumer1.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                Object object = null;
                try {
                    object = ((ObjectMessage) message).getObject();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                System.out.println(i       "Receive2-1 : "   (String)object);
            }
        });

        consumer2.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                Object object = null;
                try {
                    object = ((ObjectMessage) message).getObject();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                System.out.println(i      "Receive2-2 : "   (String)object);
            }
        });


// session.close();
// connection.close();
    }
}

今天文章到此就结束了,感谢您的阅读,Java架构师必看祝您升职加薪,年年好运。

0 人点赞