序
本文主要研究一下artemis的CriticalAnalyzerPolicy
CriticalAnalyzerPolicy
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java
代码语言:javascript复制public enum CriticalAnalyzerPolicy {
HALT, SHUTDOWN, LOG;
static {
// for URI support on ClusterConnection
BeanSupport.registerConverter(new CriticalAnalyzerPolicyConverter(), CriticalAnalyzerPolicy.class);
}
static class CriticalAnalyzerPolicyConverter implements Converter {
@Override
public <T> T convert(Class<T> type, Object value) {
return type.cast(CriticalAnalyzerPolicy.valueOf(value.toString()));
}
}
}
- CriticalAnalyzerPolicy定义了HALT, SHUTDOWN, LOG三个枚举值
initializeCriticalAnalyzer
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
代码语言:javascript复制public class ActiveMQServerImpl implements ActiveMQServer {
//......
private void initializeCriticalAnalyzer() throws Exception {
// Some tests will play crazy frequenceistop/start
CriticalAnalyzer analyzer = this.getCriticalAnalyzer();
if (analyzer == null) {
if (configuration.isCriticalAnalyzer()) {
// this will have its own ScheduledPool
analyzer = new CriticalAnalyzerImpl();
} else {
analyzer = EmptyCriticalAnalyzer.getInstance();
}
this.analyzer = analyzer;
}
/* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
analyzer.clear();
analyzer.setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
if (configuration.isCriticalAnalyzer()) {
analyzer.start();
}
CriticalAction criticalAction = null;
final CriticalAnalyzerPolicy criticalAnalyzerPolicy = configuration.getCriticalAnalyzerPolicy();
switch (criticalAnalyzerPolicy) {
case HALT:
criticalAction = criticalComponent -> {
ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent);
threadDump();
sendCriticalNotification(criticalComponent);
Runtime.getRuntime().halt(70); // Linux systems will have /usr/include/sysexits.h showing 70 as internal software error
};
break;
case SHUTDOWN:
criticalAction = criticalComponent -> {
ActiveMQServerLogger.LOGGER.criticalSystemShutdown(criticalComponent);
threadDump();
// on the case of a critical failure, -1 cannot simply means forever.
// in case graceful is -1, we will set it to 30 seconds
sendCriticalNotification(criticalComponent);
// you can't stop from the check thread,
// nor can use an executor
Thread stopThread = new Thread() {
@Override
public void run() {
try {
ActiveMQServerImpl.this.stop();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
stopThread.start();
};
break;
case LOG:
criticalAction = criticalComponent -> {
ActiveMQServerLogger.LOGGER.criticalSystemLog(criticalComponent);
threadDump();
sendCriticalNotification(criticalComponent);
};
break;
}
analyzer.addAction(criticalAction);
}
//......
}
- initializeCriticalAnalyzer方法先获取CriticalAnalyzer,若为null则创建一个,其中configuration.isCriticalAnalyzer()为true时创建的是CriticalAnalyzerImpl,否则创建的是EmptyCriticalAnalyzer.getInstance();然后执行clear、设置checkTime,之后根据不同的criticalAnalyzerPolicy创建不同的criticalAction添加到analyzer;不同的criticalAnalyzerPolicy均会执行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不同的是HALT还执行Runtime.getRuntime().halt(70),SHUTDOWN还执行ActiveMQServerImpl.this.stop(),而LOG没有额外其他操作
CriticalAnalyzer
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java
代码语言:javascript复制public interface CriticalAnalyzer extends ActiveMQComponent {
default void clear() {
}
default int getNumberOfComponents() {
return 0;
}
boolean isMeasuring();
void add(CriticalComponent component);
void remove(CriticalComponent component);
CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit);
long getCheckTimeNanoSeconds();
CriticalAnalyzer setTimeout(long timeout, TimeUnit unit);
long getTimeout(TimeUnit unit);
long getTimeoutNanoSeconds();
CriticalAnalyzer addAction(CriticalAction action);
void check();
}
- CriticalAnalyzer接口定义了setCheckTime、addAction、check等方法
EmptyCriticalAnalyzer
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java
代码语言:javascript复制public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();
public static EmptyCriticalAnalyzer getInstance() {
return instance;
}
private EmptyCriticalAnalyzer() {
}
@Override
public void add(CriticalComponent component) {
}
@Override
public void remove(CriticalComponent component) {
}
@Override
public boolean isMeasuring() {
return false;
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public long getTimeoutNanoSeconds() {
return 0;
}
@Override
public boolean isStarted() {
return false;
}
@Override
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
return this;
}
@Override
public long getCheckTimeNanoSeconds() {
return 0;
}
@Override
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
return this;
}
@Override
public long getTimeout(TimeUnit unit) {
return 0;
}
@Override
public CriticalAnalyzer addAction(CriticalAction action) {
return this;
}
@Override
public void check() {
}
}
- EmptyCriticalAnalyzer实现了CriticalAnalyzer接口,其方法都是空操作
CriticalAnalyzerImpl
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
代码语言:javascript复制public class CriticalAnalyzerImpl implements CriticalAnalyzer {
private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);
private volatile long timeoutNanoSeconds;
// one minute by default.. the server will change it for sure
private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60);
private final ActiveMQScheduledComponent scheduledComponent;
private final AtomicBoolean running = new AtomicBoolean(false);
public CriticalAnalyzerImpl() {
// this will make the scheduled component to start its own pool
/* Important: The scheduled component should have its own thread pool...
* otherwise in case of a deadlock, or a starvation of the server the analyzer won't pick up any
* issues and won't be able to shutdown the server or halt the VM
*/
this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) {
@Override
public void run() {
logger.trace("Checking critical analyzer");
check();
}
};
}
@Override
public void clear() {
actions.clear();
components.clear();
}
private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();
private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();
@Override
public int getNumberOfComponents() {
return components.size();
}
@Override
public boolean isMeasuring() {
return true;
}
@Override
public void add(CriticalComponent component) {
components.add(component);
}
@Override
public void remove(CriticalComponent component) {
components.remove(component);
}
@Override
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
this.checkTimeNanoSeconds = unit.toNanos(timeout);
this.scheduledComponent.setPeriod(timeout, unit);
return this;
}
@Override
public long getCheckTimeNanoSeconds() {
if (checkTimeNanoSeconds == 0) {
checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2;
}
return checkTimeNanoSeconds;
}
@Override
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
if (checkTimeNanoSeconds <= 0) {
this.setCheckTime(timeout / 2, unit);
}
this.timeoutNanoSeconds = unit.toNanos(timeout);
return this;
}
@Override
public long getTimeout(TimeUnit unit) {
if (timeoutNanoSeconds == 0) {
timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2);
}
return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS);
}
@Override
public long getTimeoutNanoSeconds() {
return timeoutNanoSeconds;
}
@Override
public CriticalAnalyzer addAction(CriticalAction action) {
this.actions.add(action);
return this;
}
@Override
public void check() {
boolean retry = true;
while (retry) {
try {
for (CriticalComponent component : components) {
if (component.isExpired(timeoutNanoSeconds)) {
fireAction(component);
// no need to keep running if there's already a component failed
return;
}
}
retry = false; // got to the end of the list, no need to retry
} catch (ConcurrentModificationException dontCare) {
// lets retry on the loop
}
}
}
private void fireAction(CriticalComponent component) {
for (CriticalAction action : actions) {
try {
action.run(component);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
actions.clear();
}
@Override
public void start() {
scheduledComponent.start();
}
@Override
public void stop() {
scheduledComponent.stop();
}
@Override
public boolean isStarted() {
return scheduledComponent.isStarted();
}
}
- CriticalAnalyzerImpl的构造器会创建ActiveMQScheduledComponent调度执行check方法;clear方法会清空actions及components,setCheckTime方法会更新checkTimeNanoSeconds及scheduledComponent.setPeriod;check方法会挨个遍历components,判断component.isExpired(timeoutNanoSeconds),若为true则执行fireAction(component)并返回;fireAction方法则遍历actions,挨个执行action.run(component),最后清空actions
小结
CriticalAnalyzerPolicy定义了HALT, SHUTDOWN, LOG三个枚举值;ActiveMQServerImpl的initializeCriticalAnalyzer方法先获取CriticalAnalyzer,若为null则创建一个,其中configuration.isCriticalAnalyzer()为true时创建的是CriticalAnalyzerImpl,否则创建的是EmptyCriticalAnalyzer.getInstance();然后执行clear、设置checkTime,之后根据不同的criticalAnalyzerPolicy创建不同的criticalAction添加到analyzer;不同的criticalAnalyzerPolicy均会执行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不同的是HALT还执行Runtime.getRuntime().halt(70),SHUTDOWN还执行ActiveMQServerImpl.this.stop(),而LOG没有额外其他操作
doc
- ActiveMQServerImpl