分析结果
先看看最终由本博客分析后绘制出来的总体流程结构图
activeMQ流程
创建连接
代码语言:javascript复制 //1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
创建Session时,第一个传入是否开启事务,第二个传入session提交消费消息的方式 接下来看源码处理,生产者id对象由当前sessionID加上使用内部session序列号生成器(long类型自增)生成的id组合构成
代码语言:javascript复制public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
this.checkClosedOrFailed();
this.ensureConnectionInfoSent();
if (!transacted) {
if (acknowledgeMode == 0) {
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
}
if (acknowledgeMode < 0 || acknowledgeMode > 4) {
throw new JMSException("...异常信息很长,省略了");
}
}
return new ActiveMQSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync());
}
protected ProducerId getNextProducerId() {
return new ProducerId(this.info.getSessionId(), this.producerIdGenerator.getNextSequenceId());
}
最终调用ActiveMQSession的构造方法 new ActiveMQSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync()) 也就是说,如果传入了开启事务,使用ack为0的模式创建session,否则使用给进来的ack参数,根据不同ack,使用不同的提交确认策略
AUTO_ACKNOWLEDGE = 1 自动确认 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 SESSION_TRANSACTED = 0 事务提交并确认 INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
翻阅了session里面无果,决定先从connection和connectionFactory入手,factory中也没开启发送相关的参数,进入connection源码查看 Connection connection = connectionFactory.createConnection(); 最终会create一个Transport 对象,并调用他的start方法(有内味了),继续
代码语言:javascript复制protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (this.brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
} else {
ActiveMQConnection connection = null;
try {
Transport transport = this.createTransport();
connection = this.createActiveMQConnection(transport, this.factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
this.configureConnection(connection);
transport.start();
if (this.clientID != null) {
connection.setDefaultClientID(this.clientID);
}
return connection;
} catch (JMSException var8) {
try {
connection.close();
} catch (Throwable var6) {
}
throw var8;
} catch (Exception var9) {
try {
connection.close();
} catch (Throwable var7) {
}
throw JMSExceptionSupport.create("Could not connect to broker URL: " this.brokerURL ". Reason: " var9, var9);
}
}
}
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = this.brokerURL;
String scheme = this.brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" this.brokerURL "]");
} else {
if (scheme.equals("auto")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto ssl")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto ssl", "ssl"));
} else if (scheme.equals("auto nio")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto nio", "nio"));
} else if (scheme.equals("auto nio ssl")) {
connectBrokerUL = new URI(this.brokerURL.toString().replace("auto nio ssl", "nio ssl"));
}
return TransportFactory.connect(connectBrokerUL);
}
} catch (Exception var3) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " var3, var3);
}
}
先根据传进来的broker的url解析成新的url,接着调用TransportFactory.connect(connectBrokerUL),在里面根据url解析得到一个wireFormat对象(主要是对流的字节数据的解析协议,比如有mqtt协议的wireformat,有amqp协议的wireformat),接着使用wireformat和broker的url信息创建一个Transport
Transport包装
代码语言:javascript复制public static Transport connect(URI location) throws Exception {
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap(URISupport.parseParameters(location));
if (!options.containsKey("wireFormat.host")) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = this.createWireFormat(options);
Transport transport = this.createTransport(location, wf);
Transport rc = this.configure(transport, wf, options);
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " options);
} else {
return rc;
}
} catch (URISyntaxException var6) {
throw IOExceptionSupport.create(var6);
}
}
源码中使用了装饰器模式,首先,使用WireFormat创建一个Transport
代码语言:javascript复制//TcpTransportFactory.class
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
String path = location.getPath();
if (path != null && path.length() > 0) {
int localPortIndex = path.indexOf(58);
try {
Integer.parseInt(path.substring(localPortIndex 1, path.length()));
String localString = location.getScheme() ":/" path;
localLocation = new URI(localString);
} catch (Exception var7) {
LOG.warn("path isn't a valid local location for TcpTransport to use", var7.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Failure detail", var7);
}
}
}
SocketFactory socketFactory = this.createSocketFactory();
return this.createTcpTransport(wf, socketFactory, location, localLocation);
}
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new TcpTransport(wf, socketFactory, location, localLocation);
}
最终构建了一个TcpTransport,TcpTransport的构造方法中创建了一个socket(哪里跑),此处传进来的SocketFactory是在TcpTransportFactory中创建的,使用的是DefaultSocketFactory
代码语言:javascript复制 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
this.connectionTimeout = 30000;
this.socketBufferSize = 65536;
this.ioBufferSize = 8192;
this.closeAsync = true;
this.buffOut = null;
this.trafficClass = 0;
this.trafficClassSet = false;
this.diffServChosen = false;
this.typeOfServiceChosen = false;
this.trace = false;
this.logWriterName = TransportLoggerSupport.defaultLogWriterName;
this.dynamicManagement = false;
this.startLogging = true;
this.jmxPort = 1099;
this.useLocalHost = false;
this.stoppedLatch = new AtomicReference();
this.soLinger = -2147483648;
this.wireFormat = wireFormat;
this.socketFactory = socketFactory;
try {
this.socket = socketFactory.createSocket();
} catch (SocketException var6) {
this.socket = null;
}
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
this.initBuffer = null;
this.setDaemon(false);
}
在DefaultSocketFactory中,创建方法就是单纯实例化一个socket
代码语言:javascript复制 DefaultSocketFactory() {
}
public Socket createSocket() {
return new Socket();
}
回到TransportFactory的方法Connect,这时往下
代码语言:javascript复制public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap(URISupport.parseParameters(location));
if (!options.containsKey("wireFormat.host")) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = this.createWireFormat(options);
Transport transport = this.createTransport(location, wf);
Transport rc = this.configure(transport, wf, options);
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " options);
} else {
return rc;
}
} catch (URISyntaxException var6) {
throw IOExceptionSupport.create(var6);
}
}
继续执行this.configure(transport, wf, options);
代码语言:javascript复制public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = this.compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
}
发现由对刚刚生成的TcpTransport进行三次包装 第一层包装加入TcpTransport发送数据超时的控制过滤器
代码语言:javascript复制public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
if (options.containsKey("soWriteTimeout")) {
transport = new WriteTimeoutFilter((Transport)transport);
String soWriteTimeout = (String)options.remove("soWriteTimeout");
if (soWriteTimeout != null) {
((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
}
}
IntrospectionSupport.setProperties(transport, options);
return (Transport)transport;
}
看看WriteTimeoutFilter源码,其存在一个静态成员,完成了加载一个TimeoutThread,并且该类时线程子类,并在构造中完成了对自己的启动
代码语言:javascript复制 public class WriteTimeoutFilter extends TransportFilter {
protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue();
protected static WriteTimeoutFilter.TimeoutThread timeoutThread = new WriteTimeoutFilter.TimeoutThread();
public WriteTimeoutFilter(Transport next) {
super(next);
}
protected static class TimeoutThread extends Thread {
static AtomicInteger instance = new AtomicInteger(0);
boolean run = true;
public TimeoutThread() {
this.setName("WriteTimeoutFilter-Timeout-" instance.incrementAndGet());
this.setDaemon(true);
this.setPriority(1);
this.start();
}
public void run() {
while(this.run) {
boolean error = false;
try {
if (!interrupted()) {
Iterator filters = WriteTimeoutFilter.writers.iterator();
label48:
while(true) {
WriteTimeoutFilter filter;
do {
if (!this.run || !filters.hasNext()) {
break label48;
}
filter = (WriteTimeoutFilter)filters.next();
} while(filter.getWriteTimeout() <= 0L);
long writeStart = filter.getWriter().getWriteTimestamp();
long delta = filter.getWriter().isWriting() && writeStart > 0L ? System.currentTimeMillis() - writeStart : -1L;
if (delta > filter.getWriteTimeout()) {
WriteTimeoutFilter.deRegisterWrite(filter, true, (IOException)null);
}
}
}
try {
Thread.sleep(WriteTimeoutFilter.getSleep());
error = false;
} catch (InterruptedException var8) {
}
} catch (Throwable var9) {
if (!error) {
WriteTimeoutFilter.LOG.error("WriteTimeout thread unable validate existing sockets.", var9);
error = true;
}
}
}
}
}
}
可以看到WriteTimeoutFilter内部维护了一个ConcurrentLinkedQueue保存所有构造的WriteTimeoutFilter对象,在启动的TimeoutThread中,循环迭代检测每个WriteTimeoutFilter对象的输出流是否已经超时,其时通过next.narrow(TimeStampStream.class),也就是TcpTransport中给方法获得一个TimeStampStream对象,从而获得其开始输出的时间
代码语言:javascript复制 protected TimeStampStream getWriter() {
return (TimeStampStream)this.next.narrow(TimeStampStream.class);
}
代码语言:javascript复制 //TcpTransport.class
public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(this.socket);
} else {
return target == TimeStampStream.class ? target.cast(this.buffOut) : super.narrow(target);
}
}
如果超时了就会调用deRegisterWrite方法,将WriteTimeoutFilter的socker关闭并从维护的内部集合中删除
代码语言:javascript复制 protected Socket getSocket() {
return (Socket)this.next.narrow(Socket.class);
}
protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
boolean result = writers.remove(filter);
if (result && fail) {
String message = "Forced write timeout for:" filter.getNext().getRemoteAddress();
LOG.warn(message);
Socket sock = filter.getSocket();
if (sock == null) {
LOG.error("Destination socket is null, unable to close socket.(" message ")");
} else {
try {
sock.close();
} catch (IOException var7) {
}
}
}
return result;
}
最终获取到的socket也就是使用TcpTransport中的narrow方法,方法中主要时做一下校验,没问题就返回真实的socket对象(也就是一开始构造时创建的)
代码语言:javascript复制 public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(this.socket);
} else {
return target == TimeStampStream.class ? target.cast(this.buffOut) : super.narrow(target);
}
}
@HotSpotIntrinsicCandidate
public T cast(Object obj) {
if (obj != null && !this.isInstance(obj)) {
throw new ClassCastException(this.cannotCastMsg(obj));
} else {
return obj;
}
}
接下来看到构造函数中调用了super(next),看看其父类TransportFilter的代码
代码语言:javascript复制public class TransportFilter implements TransportListener, Transport {
protected final Transport next;
protected TransportListener transportListener;
public TransportFilter(Transport next) {
this.next = next;
}
public TransportListener getTransportListener() {
return this.transportListener;
}
public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
this.next.setTransportListener((TransportListener)null);
} else {
this.next.setTransportListener(this);
}
}
public void start() throws Exception {
if (this.next == null) {
throw new IOException("The next channel has not been set.");
} else if (this.transportListener == null) {
throw new IOException("The command listener has not been set.");
} else {
this.next.start();
}
}
public void onCommand(Object command) {
this.transportListener.onCommand(command);
}
public void oneway(Object command) throws IOException {
this.next.oneway(command);
}
.....
}
TransportFilter 其实是一个提供模板代码的父类,实现了Transport接口,其中聚合了一个Transport对象作为自己的next(这也符合其Filter的名称,类似过滤器链,带Filter就是带next),TransportFilter 对于接口的实现都是调用next对象的对应方法(因为TransportFilter 聚合的是下一个Transport ),所以真正的实现得看当前具体是TransportFilter的哪个实现,因此我们先记一下Transport 链条的顺序,待会再看是先执行什么方法,毫无疑问的,剩下两个包装类MutexTransport 和ResponseCorrelator 也是TransportFilter的子类(呜呜呜,只有TcpTransport是亲儿子,不用继承TransportFilter,因为传输层是最后的一个呀,不需要next了O(∩_∩)O)
代码语言:javascript复制 public class MutexTransport extends TransportFilter {
private final ReentrantLock writeLock = new ReentrantLock();
private boolean syncOnCommand;
public MutexTransport(Transport next) {
super(next);
this.syncOnCommand = false;
}
}
public class ResponseCorrelator extends TransportFilter {
private final Map<Integer, FutureResponse> requestMap;
private IntSequenceGenerator sequenceGenerator;
private IOException error;
public ResponseCorrelator(Transport next) {
this(next, new IntSequenceGenerator());
}
}
最终包装下来的Transport就是如下图
Transport包装顺序
transport.start
回到一开始创建connection那里,发现接下来执行的就是transport的start方法
代码语言:javascript复制protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (this.brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
} else {
ActiveMQConnection connection = null;
Transport transport = this.createTransport();
connection = this.createActiveMQConnection(transport, this.factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
this.configureConnection(connection);
transport.start();
if (this.clientID != null) {
connection.setDefaultClientID(this.clientID);
}
return connection;
}
这里有个大坑,先是通过createActiveMQConnection创建connecttion,其需要聚合transport,然后我们看connection的实际情况(居然丧心病狂的在构造里将connection的this作为transport的transportListener绑定,这波循环引用,害我找了好久的transportListener),同时TransportFilter的子类对象的setTransportListener方法部署简单的setter,同时会将自己作为next的TransportListener进行绑定(者代码实在难懂)
代码语言:javascript复制 //ActiveMQConnectionFactory.class
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
ActiveMQConnection connection = new ActiveMQConnection(transport, this.getClientIdGenerator(), this.getConnectionIdGenerator(), stats);
return connection;
}
代码语言:javascript复制 //ActiveMQConnection.class
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
this.maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
this.rejectedTaskHandler = null;
this.trustedPackages = new ArrayList();
this.trustAllPackages = false;
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
this.executor = new ThreadPoolExecutor(1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " transport);
return thread;
}
});
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(this.info.getConnectionId(), -1L);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl(this.sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
}
代码语言:javascript复制 //TransportFilter.class
public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
this.next.setTransportListener((TransportListener)null);
} else {
this.next.setTransportListener(this);
}
}
所以这时结构如下
Transport结构 按照包装顺序,此时的transport应该是ResponseCorrelator ,该类中没有该方法,代表这个TransportFilter在start时不需要额外操作,执行父类的start方法,直接调用next的start
代码语言:javascript复制 //TransportFilter.class
public void start() throws Exception {
if (this.next == null) {
throw new IOException("The next channel has not been set.");
} else if (this.transportListener == null) {
throw new IOException("The command listener has not been set.");
} else {
this.next.start();
}
}
此时就到了MutexTransport,发现里面也没有start,也就是再次走父类TransportFilter的start,继续到next,也就是WriteTimeoutFilter
代码语言:javascript复制 public void start() throws Exception {
super.start();
}
public void stop() throws Exception {
super.stop();
}
???黑人问号,这里也是没事情做,直接走父类TransportFilter的方法,就是不知道为啥在这里不省略这两个方法,反而是实现,结果还是调用super(秀技?),结果这时就来到TcpTransport了,因为TcpTransport继承了TransportThreadSupport,TransportThreadSupport又继承了TransportSupport,TransportSupport又继承了ServiceTransport,这几个类中唯一实现了start方法的是ServiceTransport,于是进入里面源码
代码语言:javascript复制 public void start() throws Exception {
if (this.started.compareAndSet(false, true)) {
boolean success = false;
this.stopped.set(false);
try {
this.preStart();
this.doStart();
success = true;
} finally {
this.started.set(success);
}
Iterator var2 = this.serviceListeners.iterator();
while(var2.hasNext()) {
ServiceListener l = (ServiceListener)var2.next();
l.started(this);
}
}
}
也就是会先调用preStart方法(空实现),然后调用doStart方法,此时执行TcpTransport的doStart方法,完成连接到一开始经过ConnectionFactory传递到TransportFactory再到TcpTransport里的mqtt远程的地址和端口,完成对端口和流的初始化
代码语言:javascript复制 //TcpTransport.class
protected void doStart() throws Exception {
this.connect();
this.stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
protected void connect() throws Exception {
if (this.socket == null && this.socketFactory == null) {
throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
} else {
InetSocketAddress localAddress = null;
InetSocketAddress remoteAddress = null;
if (this.localLocation != null) {
localAddress = new InetSocketAddress(InetAddress.getByName(this.localLocation.getHost()), this.localLocation.getPort());
}
if (this.remoteLocation != null) {
String host = this.resolveHostName(this.remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, this.remoteLocation.getPort());
}
this.trafficClassSet = this.setTrafficClass(this.socket);
if (this.socket != null) {
if (localAddress != null) {
this.socket.bind(localAddress);
}
if (remoteAddress != null) {
if (this.connectionTimeout >= 0) {
this.socket.connect(remoteAddress, this.connectionTimeout);
} else {
this.socket.connect(remoteAddress);
}
}
} else if (localAddress != null) {
this.socket = this.socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
} else {
this.socket = this.socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
this.initialiseSocket(this.socket);
this.initializeStreams();
}
}
接着再调用super.doStart方法也就是TransportThreadSupport中的方法,开启一个线程,将this作为runnable传递并运行
代码语言:javascript复制 protected void doStart() throws Exception {
this.runner = new Thread((ThreadGroup)null, this, "ActiveMQ Transport: " this.toString(), this.stackSize);
this.runner.setDaemon(this.daemon);
this.runner.start();
}
因为当前是TcpTransport,所以线程的运行方法是TcpTransport中的run方法,在里面会循环调用doRun方法,根据一开始我们传递进来的协议格式(wireFormat)读取接收到的命令,调用doConsume方法去处理收到的服务端命令消息
代码语言:javascript复制 public void run() {
LOG.trace("TCP consumer thread for " this " starting");
this.runnerThread = Thread.currentThread();
try {
while(!this.isStopped()) {
this.doRun();
}
} catch (IOException var7) {
((CountDownLatch)this.stoppedLatch.get()).countDown();
this.onException(var7);
} catch (Throwable var8) {
((CountDownLatch)this.stoppedLatch.get()).countDown();
IOException ioe = new IOException("Unexpected error occurred: " var8);
ioe.initCause(var8);
this.onException(ioe);
} finally {
((CountDownLatch)this.stoppedLatch.get()).countDown();
}
}
protected void doRun() throws IOException {
try {
Object command = this.readCommand();
this.doConsume(command);
} catch (SocketTimeoutException var2) {
} catch (InterruptedIOException var3) {
}
}
也就是说,此时会开启一个线程,持续读取服务端的命令消息,并处理该消息,因为这时服务端给到客户端的,先了解完客户端发送的逻辑再回来看这个,接下来就到了
代码语言:javascript复制 //6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(queue);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、发送消息
producer.send(textMessage);
发送消息
前面两个都是创建对象,没啥好说的,主要吧session聚合上了,看看发送消息源码
代码语言:javascript复制 //ActiveMQMessageProducerSupport.class
private MemoryUsage producerWindow;
public void send(Message message) throws JMSException {
this.send(this.getDestination(), message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
this.checkClosed();
if (destination == null) {
if (this.info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
} else {
throw new InvalidDestinationException("Don't understand null destinations");
}
} else {
ActiveMQDestination dest;
if (destination.equals(this.info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else {
if (this.info.getDestination() != null) {
throw new UnsupportedOperationException("This producer can only send messages to: " this.info.getDestination().getPhysicalName());
}
dest = ActiveMQDestination.transform(destination);
}
if (dest == null) {
throw new JMSException("No destination specified");
} else {
if (this.transformer != null) {
Message transformedMessage = this.transformer.producerTransform(this.session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (this.producerWindow != null) {
try {
this.producerWindow.waitForSpace();
} catch (InterruptedException var10) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
this.stats.onMessage();
}
}
}
首先调用checkClose检查会话状态,确保不会被关闭了,接着第一步主要是根据你用的Destination是什么?(queuetopic)进行了一下转化为通用的ActiveMQDestination,如果有指定特定的transformer则再进行一下消息的producerTransform,这里没有跳过,接下来核心代码就是
代码语言:javascript复制 if (this.producerWindow != null) {
try {
this.producerWindow.waitForSpace();
} catch (InterruptedException var10) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
this.stats.onMessage();
producerWindow是一个MemoryUsage的对象,主要是记录当前消息堆积了多少内存大小的空间,进行内存的控制,我们看看其部分代码
代码语言:javascript复制 public class MemoryUsage extends Usage<MemoryUsage> {
private long usage;
public void waitForSpace() throws InterruptedException {
if (this.parent != null) {
((MemoryUsage)this.parent).waitForSpace();
}
this.usageLock.readLock().lock();
try {
if (this.percentUsage >= 100 && this.isStarted()) {
this.usageLock.readLock().unlock();
this.usageLock.writeLock().lock();
try {
while(this.percentUsage >= 100 && this.isStarted()) {
this.waitForSpaceCondition.await();
}
} finally {
this.usageLock.writeLock().unlock();
this.usageLock.readLock().lock();
}
}
if (this.percentUsage >= 100 && !this.isStarted()) {
throw new InterruptedException("waitForSpace stopped during wait.");
}
} finally {
this.usageLock.readLock().unlock();
}
}
}
其内部维护一个long类型的成员代表占用内存的字节大小,waitForSpace方法获得字节的读锁进行加锁,判断如果当前使用的内存超过限定内存的100%则改为使用写锁,释放读锁获取写锁,一直等待直到使用的内存小于100%(有空闲可用),此处之所以是>=是因为,没办法刚好处理到100%,可能到了99%,多加一条消息,消息大小超过1%的内存,总而言之,waitForSpace是在等待有空闲的内存,防止消息堆积过多,接下来就进入发送了this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
代码语言:javascript复制 //ActiveMQSession.class
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
this.checkClosed();
if (destination.isTemporary() && this.connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " destination);
} else {
synchronized(this.sendMutex) {
this.doStartTransaction();
TransactionId txid = this.transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0L) {
expiration = timeToLive timeStamp;
}
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
message.setJMSRedelivered(false);
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, this.connection);
msg.setDestination(destination);
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
message.setJMSDestination(destination);
}
msg.setBrokerPath((BrokerId[])null);
msg.setTransactionId(txid);
if (this.connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(this.connection);
msg.onSend();//将消息变为只读
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(this.getSessionId() " sending message: " msg);
}
if (onComplete != null || sendTimeout > 0 || msg.isResponseRequired() || this.connection.isAlwaysSyncSend() || msg.isPersistent() && !this.connection.isUseAsyncSend() && txid == null) {
if (sendTimeout > 0 && onComplete == null) {
this.connection.syncSendPacket(msg, sendTimeout);
} else {
this.connection.syncSendPacket(msg, onComplete);
}
} else {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
int size = msg.getSize();
producerWindow.increaseUsage((long)size);
}
}
}
}
}
可以看到,发送都是会加锁,锁是session里面的sendMutex对象,也就是session是同步处理消息的,然后根据一些配置,选择syncSendPacket(同步发送)或者asyncSendPacket(异步发送),如果使用了异步发送并且有限制堆积的消息大小,此时会维护producerWindow已经使用的内存大小(即堆积的待发送完成消息大小),先看看异步发送吧
代码语言:javascript复制 //ActiveMQConnection.class
public void asyncSendPacket(Command command) throws JMSException {
if (this.isClosed()) {
throw new ConnectionClosedException();
} else {
this.doAsyncSendPacket(command);
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException var3) {
throw JMSExceptionSupport.create(var3);
}
}
前面讲过,最外层第一个transport应该是ResponseCorrelator,查看其oneway方法
代码语言:javascript复制 public void oneway(Object o) throws IOException {
Command command = (Command)o;
command.setCommandId(this.sequenceGenerator.getNextSequenceId());
command.setResponseRequired(false);
this.next.oneway(command);
}
标记了不需要回调,然后进入下一个oneway,也就是MutexTransport,MutexTransport中主要是对其加了一个MutexTransport的写锁,也就是保证了同个connection下消息的同步控制
代码语言:javascript复制 public void oneway(Object command) throws IOException {
this.writeLock.lock();
try {
this.next.oneway(command);
} finally {
this.writeLock.unlock();
}
}
接下来到了WriteTimeoutFilter中,前面讲过其维护一个内部集合,并开启一个线程,定期检查该集合中的WriteTimeoutFilter是否超时,此时就是将当前消息加入该集合,再调用下一个oneway(super的实现)
代码语言:javascript复制 public void oneway(Object command) throws IOException {
try {
registerWrite(this);
super.oneway(command);
} catch (IOException var6) {
throw var6;
} finally {
deRegisterWrite(this, false, (IOException)null);
}
}
这时就来的TcpTransport了,进行数据的传输
代码语言:javascript复制 public void oneway(Object command) throws IOException {
this.checkStarted();
this.wireFormat.marshal(command, this.dataOut);
this.dataOut.flush();
}
因为不需要回调,所以在这里直接就返回了,异步去IO。接下来看看同步发送消息的,可以看到走的是request方法而不是oneway
代码语言:javascript复制 public Response syncSendPacket(Command command, int timeout) throws JMSException {
if (this.isClosed()) {
throw new ConnectionClosedException();
} else {
try {
Response response = (Response)((Response)(timeout > 0 ? this.transport.request(command, timeout) : this.transport.request(command)));
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
}
if (this.isClosed() || this.closing.get()) {
LOG.debug("Received an exception but connection is closing");
}
JMSException jmsEx = null;
try {
jmsEx = JMSExceptionSupport.create(er.getException());
} catch (Throwable var8) {
LOG.error("Caught an exception trying to create a JMSException for " er.getException(), var8);
}
if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo) {
try {
this.forceCloseOnSecurityException(er.getException());
} catch (Throwable var7) {
}
}
if (jmsEx != null) {
throw jmsEx;
}
}
return response;
} catch (IOException var9) {
throw JMSExceptionSupport.create(var9);
}
}
}
那么进入ResponseCorrelator的request方法,发现实际也是使用asyncRequest异步IO发送,将这个future加入一个内部map,利用Future模型等待结果
代码语言:javascript复制 public Object request(Object command, int timeout) throws IOException {
FutureResponse response = this.asyncRequest(command, (ResponseCallback)null);
return response.getResult(timeout);
}
public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
Command command = (Command)o;
command.setCommandId(this.sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback, this);
IOException priorError = null;
synchronized(this.requestMap) {
priorError = this.error;
if (priorError == null) {
this.requestMap.put(new Integer(command.getCommandId()), future);
}
}
if (priorError != null) {
future.set(new ExceptionResponse(priorError));
throw priorError;
} else {
this.next.oneway(command);
return future;
}
}
回调处理
Future模型收到回调的逻辑就得回到前面我们讲到的OnCommand方法了,前面讲到,在connection调用start后,外面几个Transport都没干事情,直到TcpTransport的start方法,该start方法会完成连接,并开启一个线程持续调用doRun方法,doRun方法调用readCommand读取收到的服务端发来的命令消息,然后调用doConsume方法,doConsume方法在父类TransportSupport中实现,通过transportListener去处理命令
代码语言:javascript复制 //TransportSupport.class
public void doConsume(Object command) {
if (command != null) {
if (this.transportListener != null) {
this.transportListener.onCommand(command);
} else {
LOG.error("No transportListener available to process inbound command: " command);
}
}
}
因为这时TcpTransport的transportListener也就是WriteTimeoutFilter,因为WriteTimeoutFilter没有重写,所以直接走TransportFilter中的实现,继续调用自己的transportListener的onCommand方法
代码语言:javascript复制 //TransportFilter.class
public void onCommand(Object command) {
this.transportListener.onCommand(command);
}
这时就到了MutexTransport中的onCommand方法
代码语言:javascript复制 public void onCommand(Object command) {
if (this.syncOnCommand) {
this.writeLock.lock();
try {
this.transportListener.onCommand(command);
} finally {
this.writeLock.unlock();
}
} else {
this.transportListener.onCommand(command);
}
}
先是判断是否配置了同步的处理回调,如果是就要先拿写锁的锁,最终进入MutexTransport的transportListener的onCommand方法,此时就来到了Transport最外层的ResponseCorrelator的onCommand方法
代码语言:javascript复制 //ResponseCorrelator.class
//key : commandId
private final Map<Integer, FutureResponse> requestMap;
public void onCommand(Object o) {
Command command = null;
if (o instanceof Command) {
command = (Command)o;
if (command.isResponse()) {
Response response = (Response)command;
FutureResponse future = null;
synchronized(this.requestMap) {
future = (FutureResponse)this.requestMap.remove(response.getCorrelationId());
}
if (future != null) {
future.set(response);
} else if (this.debug) {
LOG.debug("Received unexpected response: {" command "}for command id: " response.getCorrelationId());
}
} else {
this.getTransportListener().onCommand(command);
}
} else {
throw new ClassCastException("Object cannot be converted to a Command, Object: " o);
}
}
ResponseCorrelator内部维护了一个存储每个待响应request的集合,再收到服务端消息进入onCommand方法后,根据id获得对应command的future,对该Future进行返回值设置和解锁处理,然后再进入ActiveMQConnection的onCommand方法
代码语言:javascript复制 public void onCommand(Object o) {
final Command command = (Command)o;
if (!this.closed.get() && command != null) {
try {
command.visit(new CommandVisitorAdapter() {
public Response processMessageDispatch(MessageDispatch md) throws Exception {
ActiveMQConnection.this.waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage((MemoryUsage)null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
} else {
ActiveMQConnection.LOG.debug("{} no dispatcher for {} in {}", new Object[]{this, md, ActiveMQConnection.this.dispatchers});
}
return null;
}
public Response processProducerAck(ProducerAck pa) throws Exception {
if (pa != null && pa.getProducerId() != null) {
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)ActiveMQConnection.this.producers.get(pa.getProducerId());
if (producer != null) {
producer.onProducerAck(pa);
}
}
return null;
}
public Response processBrokerInfo(BrokerInfo info) throws Exception {
ActiveMQConnection.this.brokerInfo = info;
ActiveMQConnection.this.brokerInfoReceived.countDown();
ActiveMQConnection.this.optimizeAcknowledge = ActiveMQConnection.this.optimizeAcknowledge & !ActiveMQConnection.this.brokerInfo.isFaultTolerantConfiguration();
ActiveMQConnection.this.getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
return null;
}
public Response processConnectionError(final ConnectionError error) throws Exception {
ActiveMQConnection.this.executor.execute(new Runnable() {
public void run() {
ActiveMQConnection.this.onAsyncException(error.getException());
}
});
return null;
}
public Response processControlCommand(ControlCommand commandx) throws Exception {
return null;
}
public Response processConnectionControl(ConnectionControl control) throws Exception {
ActiveMQConnection.this.onConnectionControl((ConnectionControl)command);
return null;
}
public Response processConsumerControl(ConsumerControl control) throws Exception {
ActiveMQConnection.this.onConsumerControl((ConsumerControl)command);
return null;
}
public Response processWireFormat(WireFormatInfo info) throws Exception {
ActiveMQConnection.this.onWireFormatInfo((WireFormatInfo)command);
return null;
}
});
} catch (Exception var5) {
this.onClientInternalException(var5);
}
}
Iterator iter = this.transportListeners.iterator();
while(iter.hasNext()) {
TransportListener listener = (TransportListener)iter.next();
listener.onCommand(command);
}
}
这里使用了适配器模式,传入一个CommandVisitor,visit方法在根据Command是什么具体实现类使用不同实现去决定调用什么方法来处理命令(Command是tcp传输回来消息数据反序列化包装出来的对象,可能是各种类型的消息(包括broker连接,主题删除等,ack处理)),最后的循环主要是如果还设置了一些别的处理TransportListener,则会继续执行对应的onCommand方法(扩展接口) 所以ActiveMQ的完整发送消息的流程如下:
activeMQ流程