如果在android平台连接activemq服务,应该怎么实现呢?大部分网上的文章都是eclipse 提供的paho MQTT client实现。如果你只是用消息发布/订阅功能,那么用paho MQTT client就足够了。
但是MQTT协议只支持消息发布/订阅,所以如果你需要使用生产者/消费者模型,就不能用MQTT协议了。
我就遇到这样的需求,需要在android平台以消费者身份接收消息队列的数据。于是我尝试使用activemq提供的activemq-client
库来连接activemq服务.
// https://mvnrepository.com/artifact/org.apache.activemq/activemq-client
implementation 'org.apache.activemq:activemq-client:5.14.5'
然而在编译时就报错了:
错误: 无法访问Referenceable 找不到javax.naming.Referenceable的类文件
找不到 javax.naming.Referenceable
类,实际就是找不到包名前缀为javax.naming
的所有类,也就是JNDI
库(Java Naming and Directory Interface,Java命名和目录接口),在JDK中这个库是内置的。因为android使用的DVM并不是完整的JVM,所以缺少JNDI
库。所以现在的问题就变成了如何找到android平台可以用的JNDI
库.
在maven中央仓库可以搜索到JNDI
库,但却找不到对应的jar包。
https://mvnrepository.com/artifact/javax.naming/jndi/1.2.1
所以指望在maven找到JNDI库是不可能了,我又不死心通过google一通找,
在以下两个网站找到了jndi-1.2.1.jar
,经验证都能在Android平台正常使用
http://www.java2s.com/Code/JarDownload/jndi/jndi-1.2.1.jar.zip
http://treebase.sourceforge.net/maven2/javax/naming/jndi/1.2.1/jndi-1.2.1.jar
使用方法也很简单,将jndi-1.2.1.jar
复制到app/libs
文件夹下。如果你的app/build.gradle
中有定义implementation fileTree(dir: 'libs', include: ['*.jar'])
,jndi-1.2.1.jar
就被自动导入了项目,如果没有这一行就要手工加一行implementation files('libs/jndi-1.2.1.jar')
有了JNDI库,编译不再报错,测试也就正常通过了
发布消息的JUNIT测试
ActivemqPublisherTest.java
代码语言:javascript复制package gu.simplemq.activemq;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
/**
* @author guyadong
*
*/
public class ActivemqPublisherTest {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
// activemq服务主机地址
private static final String OPENWIRE_HOST = "192.168.10.226";
private static ActiveMQConnectionFactory createFactory(){
Properties props = new Properties();
props.setProperty("brokerURL","tcp://" OPENWIRE_HOST ":61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setProperties(props);
return factory;
}
@Test
public void test1() throws InterruptedException, JMSException {
ActiveMQConnectionFactory factory = createFactory();
Connection connection = null;
Session session = null;
MessageProducer p1 = null;
MessageProducer p2 = null;
try {
connection = factory.createConnection();
connection.setExceptionListener(new MyExceptionListener());
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
p1 = session.createProducer(session.createTopic("chat1"));
p2 = session.createProducer(session.createTopic("chat2"));
for(int i=0; i<100; i){
Date date = new Date();
String str = "OPENWIRE " date.toString();
TextMessage message = session.createTextMessage(str);
p1.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
p2.send(message, DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
logger.info(date.getTime() " : " date.toString());
Thread.sleep(2000);
}
} catch (JMSException e) {
e.printStackTrace();
} finally{
if(null != p2){
p2.close();
}
if(null != p1){
p1.close();
}
if(null != session){
session.close();
}
if(null != connection){
connection.close();
}
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
订阅消息的JUNIT测试
ActivemqSubscriberTest.java
代码语言:javascript复制package gu.simplemq.activemq;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Properties;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
/**
* @author guyadong
*
*/
public class ActivemqSubscriberTest {
// activemq服务主机地址
private static final String OPENWIRE_HOST = "192.168.10.226";
private static ActiveMQConnectionFactory createFactory(){
Properties props = new Properties();
props.setProperty("brokerURL","tcp://" OPENWIRE_HOST ":61616");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setProperties(props);
return factory;
}
private static Connection conn;
private static Session session;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
ActiveMQConnectionFactory factory = createFactory();
conn = factory.createConnection();
conn.setExceptionListener(new MyExceptionListener());
conn.start();
session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
session.close();
conn.close();
}
public void sub(Session session,String topic,MessageListener listener) throws JMSException {
Topic activeMQTopic = session.createTopic(topic);
MessageConsumer consumer = session.createConsumer(activeMQTopic);
consumer.setMessageListener(listener);
}
private static void waitquit(){
System.out.println("PRESS 'quit' OR 'CTRL-C' to exit");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try{
while(!"quit".equalsIgnoreCase(reader.readLine())){
}
System.exit(0);
} catch (IOException e) {
}finally {
}
}
@Test
public void test1() {
try {
sub(session,"chat1", new LogListener());
sub(session,"chat2", new LogListener());
sub(session,"chat3", new LogListener());
waitquit();
} catch (JMSException e) {
e.printStackTrace();
}
}
private static class LogListener implements MessageListener{
private String textOf(Message message) throws JMSException{
if(message instanceof TextMessage){
return ((TextMessage) message).getText();
}
if(message instanceof BytesMessage){
BytesMessage bytesMessage = (BytesMessage)message;
byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
return new String(buf);
}
throw new IllegalArgumentException(String.format("INVALID message type,%s,%s required",
TextMessage.class.getName(),
BytesMessage.class.getName()));
}
@Override
public void onMessage(Message message) {
try {
logger.info("dest {}:{}",message.getJMSDestination(),textOf(message));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
完整的测试代码参见码云仓库 ActivemqSubscriberTest.java https://gitee.com/l0km/simplemq/blob/dev/simplemq-android-test/app/src/androidTest/java/gu/simplemq/activemq/ActivemqPublisherTest.java
ActivemqSubscriberTest.java https://gitee.com/l0km/simplemq/blob/dev/simplemq-android-test/app/src/androidTest/java/gu/simplemq/activemq/ActivemqSubscriberTest.java