序
本文主要研究一下reactor-logback的AsyncAppender
AsyncAppender
reactor-logback/src/main/java/reactor/logback/AsyncAppender.java
代码语言:javascript复制public class AsyncAppender extends ContextAwareBase
implements Appender<ILoggingEvent>, AppenderAttachable<ILoggingEvent>,
CoreSubscriber<ILoggingEvent> {
private final AppenderAttachableImpl<ILoggingEvent> aai =
new AppenderAttachableImpl<ILoggingEvent>();
private final FilterAttachableImpl<ILoggingEvent> fai =
new FilterAttachableImpl<ILoggingEvent>();
private final AtomicReference<Appender<ILoggingEvent>> delegate =
new AtomicReference<Appender<ILoggingEvent>>();
private String name;
private WorkQueueProcessor<ILoggingEvent> processor;
private int backlog = 1024 * 1024;
private boolean includeCallerData = false;
private boolean started = false;
//......
}
AsyncAppender继承了ContextAwareBase,同时实现了Appender、AppenderAttachable、CoreSubscriber接口
CoreSubscriber
reactor/core/CoreSubscriber.java
代码语言:javascript复制public interface CoreSubscriber<T> extends Subscriber<T> {
/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
*/
default Context currentContext(){
return Context.empty();
}
/**
* Implementors should initialize any state used by {@link #onNext(Object)} before
* calling {@link Subscription#request(long)}. Should further {@code onNext} related
* state modification occur, thread-safety will be required.
* <p>
* Note that an invalid request {@code <= 0} will not produce an onError and
* will simply be ignored or reported through a debug-enabled
* {@link reactor.util.Logger}.
*
* {@inheritDoc}
*/
@Override
void onSubscribe(Subscription s);
}
CoreSubscriber继承了Subscriber接口,Subscriber接口定义了onSubscribe(Subscription s)、onNext、onError、onComplete方法
onSubscribe
代码语言:javascript复制 public void onSubscribe(Subscription s) {
try {
doStart();
}
catch (Throwable t) {
addError(t.getMessage(), t);
}
finally {
started = true;
s.request(Long.MAX_VALUE);
}
}
protected void doStart() {
}
onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE)
onNext
代码语言:javascript复制 public void onNext(ILoggingEvent iLoggingEvent) {
aai.appendLoopOnAppenders(iLoggingEvent);
}
onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法
onError
代码语言:javascript复制 public void onError(Throwable t) {
addError(t.getMessage(), t);
}
onError主要是添加错误信息到logback的status
onComplete
代码语言:javascript复制 public void onComplete() {
try {
Appender<ILoggingEvent> appender = delegate.getAndSet(null);
if (appender != null){
doStop();
appender.stop();
aai.detachAndStopAllAppenders();
}
}
catch (Throwable t) {
addError(t.getMessage(), t);
}
finally {
started = false;
}
}
protected void doStop() {
}
onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false
Appender.doAppend
代码语言:javascript复制 public void doAppend(ILoggingEvent evt) throws LogbackException {
if (getFilterChainDecision(evt) == FilterReply.DENY) {
return;
}
evt.prepareForDeferredProcessing();
if (includeCallerData) {
evt.getCallerData();
}
try {
queueLoggingEvent(evt);
}
catch (Throwable t) {
addError(t.getMessage(), t);
}
}
protected void queueLoggingEvent(ILoggingEvent evt) {
if (null != delegate.get()) {
processor.onNext(evt);
}
}
doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)
LifeCycle.start
代码语言:javascript复制 public void start() {
startDelegateAppender();
processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger")
.bufferSize(backlog)
.autoCancel(false)
.build();
processor.subscribe(this);
}
private void startDelegateAppender() {
Appender<ILoggingEvent> delegateAppender = delegate.get();
if (null != delegateAppender && !delegateAppender.isStarted()) {
delegateAppender.start();
}
}
public void addAppender(Appender<ILoggingEvent> newAppender) {
if (delegate.compareAndSet(null, newAppender)) {
aai.addAppender(newAppender);
}
else {
throw new IllegalArgumentException(delegate.get() " already attached.");
}
}
start方法执行startDelegateAppender,然后创建WorkQueueProcessor(
默认bufferSize为1024 * 1024
),并subscribe当前实例;addAppender方法会设置delegate,并往AppenderAttachableImpl添加appender
stop
代码语言:javascript复制 public void stop() {
processor.onComplete();
}
stop方法执行processor.onComplete()
小结
reactor-logback基于WorkQueueProcessor提供了另外一种AsyncAppender,它不是基于BlockingQueue而是基于RingBuffer来实现的。其onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE);onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法;onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false;doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)。