序
本文主要研究一下artemis的SessionProducerCreditsMessage
SessionProducerCreditsMessage
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionProducerCreditsMessage.java
代码语言:javascript复制public class SessionProducerCreditsMessage extends PacketImpl {
private int credits;
private SimpleString address;
public SessionProducerCreditsMessage(final int credits, final SimpleString address) {
super(SESS_PRODUCER_CREDITS);
this.credits = credits;
this.address = address;
}
public SessionProducerCreditsMessage() {
super(SESS_PRODUCER_CREDITS);
}
public int getCredits() {
return credits;
}
public SimpleString getAddress() {
return address;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(credits);
buffer.writeSimpleString(address);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
credits = buffer.readInt();
address = buffer.readSimpleString();
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result ((address == null) ? 0 : address.hashCode());
result = prime * result credits;
return result;
}
@Override
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" address);
buff.append(", credits=" credits);
buff.append("]");
return buff.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (!(obj instanceof SessionProducerCreditsMessage))
return false;
SessionProducerCreditsMessage other = (SessionProducerCreditsMessage) obj;
if (address == null) {
if (other.address != null)
return false;
} else if (!address.equals(other.address))
return false;
if (credits != other.credits)
return false;
return true;
}
}
- SessionProducerCreditsMessage继承了PacketImpl,其type为SESS_PRODUCER_CREDITS
ClientSessionPacketHandler
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
代码语言:javascript复制 class ClientSessionPacketHandler implements ChannelHandler {
@Override
public void handlePacket(final Packet packet) {
byte type = packet.getType();
try {
switch (type) {
case DISCONNECT_CONSUMER: {
handleConsumerDisconnected((DisconnectConsumerMessage) packet);
break;
}
case SESS_RECEIVE_CONTINUATION: {
handleReceiveContinuation((SessionReceiveContinuationMessage) packet);
break;
}
case SESS_RECEIVE_MSG: {
handleReceivedMessagePacket((SessionReceiveMessage) packet);
break;
}
case SESS_RECEIVE_LARGE_MSG: {
handleReceiveLargeMessage((SessionReceiveLargeMessage) packet);
break;
}
case PacketImpl.SESS_PRODUCER_CREDITS: {
handleReceiveProducerCredits((SessionProducerCreditsMessage) packet);
break;
}
case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: {
handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet);
break;
}
case PacketImpl.DISCONNECT_CONSUMER_KILL: {
handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet);
break;
}
case EXCEPTION: {
// We can only log these exceptions
// maybe we should cache it on SessionContext and throw an exception on any next calls
ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet;
ActiveMQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException());
break;
}
default: {
throw ActiveMQClientMessageBundle.BUNDLE.invalidPacket(type);
}
}
} catch (Exception e) {
throw ActiveMQClientMessageBundle.BUNDLE.failedToHandlePacket(e);
}
sessionChannel.confirm(packet);
}
}
- ClientSessionPacketHandler实现了ChannelHandler接口,其handlePacket根据不同的type做不同的处理,当type为SESS_PRODUCER_CREDITS时执行handleReceiveProducerCredits方法
ActiveMQSessionContext
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
代码语言:javascript复制public class ActiveMQSessionContext extends SessionContext {
//......
protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) {
handleReceiveProducerCredits(message.getAddress(), message.getCredits());
}
//......
}
- handleReceiveProducerCredits调用的是父类SessionContext的handleReceiveProducerCredits方法
SessionContext
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
代码语言:javascript复制public abstract class SessionContext {
protected ClientSessionInternal session;
protected SendAcknowledgementHandler sendAckHandler;
protected volatile RemotingConnection remotingConnection;
protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
//......
protected void handleReceiveProducerCredits(SimpleString address, int credits) {
ClientSessionInternal session = this.session;
if (session != null) {
session.handleReceiveProducerCredits(address, credits);
}
}
//......
}
- handleReceiveProducerCredits方法执行的是ClientSessionInternal的handleReceiveProducerCredits方法
ClientSessionImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
代码语言:javascript复制public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
//......
@Override
public void handleReceiveProducerCredits(final SimpleString address, final int credits) {
synchronized (producerCreditManager) {
producerCreditManager.receiveCredits(address, credits);
}
}
//......
}
- handleReceiveProducerCredits调用的是producerCreditManager.receiveCredits方法
小结
SessionProducerCreditsMessage继承了PacketImpl,其type为SESS_PRODUCER_CREDITS;ClientSessionPacketHandler实现了ChannelHandler接口,其handlePacket根据不同的type做不同的处理,当type为SESS_PRODUCER_CREDITS时执行handleReceiveProducerCredits方法;handleReceiveProducerCredits调用的是父类SessionContext的handleReceiveProducerCredits方法;SessionContext的handleReceiveProducerCredits方法执行的是ClientSessionInternal的handleReceiveProducerCredits方法;ClientSessionImpl的handleReceiveProducerCredits调用的是producerCreditManager.receiveCredits方法
doc
- SessionProducerCreditsMessage