序
本文主要研究一下debezium的ChangeEventQueue
ChangeEventQueueMetrics
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java
代码语言:javascript复制public interface ChangeEventQueueMetrics {
int totalCapacity();
int remainingCapacity();
}
- ChangeEventQueueMetrics接口定义了totalCapacity、remainingCapacity方法
ChangeEventQueue
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java
代码语言:javascript复制public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);
private final Duration pollInterval;
private final int maxBatchSize;
private final int maxQueueSize;
private final BlockingQueue<T> queue;
private final Metronome metronome;
private final Supplier<PreviousContext> loggingContextSupplier;
private volatile RuntimeException producerException;
private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.pollInterval = pollInterval;
this.maxBatchSize = maxBatchSize;
this.maxQueueSize = maxQueueSize;
this.queue = new LinkedBlockingDeque<>(maxQueueSize);
this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
this.loggingContextSupplier = loggingContextSupplier;
}
public static class Builder<T> {
private Duration pollInterval;
private int maxQueueSize;
private int maxBatchSize;
private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;
public Builder<T> pollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
return this;
}
public Builder<T> maxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
return this;
}
public Builder<T> maxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}
public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.loggingContextSupplier = loggingContextSupplier;
return this;
}
public ChangeEventQueue<T> build() {
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);
}
}
/**
* Enqueues a record so that it can be obtained via {@link #poll()}. This method
* will block if the queue is full.
*
* @param record
* the record to be enqueued
* @throws InterruptedException
* if this thread has been interrupted
*/
public void enqueue(T record) throws InterruptedException {
if (record == null) {
return;
}
// The calling thread has been interrupted, let's abort
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}
// this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue
queue.put(record);
}
/**
* Returns the next batch of elements from this queue. May be empty in case no
* elements have arrived in the maximum waiting time.
*
* @throws InterruptedException
* if this thread has been interrupted while waiting for more
* elements to arrive
*/
public List<T> poll() throws InterruptedException {
LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();
try {
LOGGER.debug("polling records...");
List<T> records = new ArrayList<>();
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
throwProducerExceptionIfPresent();
LOGGER.debug("no records available yet, sleeping a bit...");
// no records yet, so wait a bit
metronome.pause();
LOGGER.debug("checking for more records...");
}
return records;
}
finally {
previousContext.restore();
}
}
public void producerException(final RuntimeException producerException) {
this.producerException = producerException;
}
private void throwProducerExceptionIfPresent() {
if (producerException != null) {
throw producerException;
}
}
@Override
public int totalCapacity() {
return maxQueueSize;
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
}
- ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者
queue.drainTo(records, maxBatchSize) == 0
为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
Threads
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.java
代码语言:javascript复制public class Threads {
//......
public static interface TimeSince {
/**
* Reset the elapsed time to 0.
*/
void reset();
/**
* Get the time that has elapsed since the last call to {@link #reset() reset}.
*
* @return the number of milliseconds
*/
long elapsedTime();
}
public static interface Timer {
/**
* @return true if current time is greater than start time plus requested time period
*/
boolean expired();
Duration remaining();
}
public static Timer timer(Clock clock, Duration time) {
final TimeSince start = timeSince(clock);
start.reset();
return new Timer() {
@Override
public boolean expired() {
return start.elapsedTime() > time.toMillis();
}
@Override
public Duration remaining() {
return time.minus(start.elapsedTime(), ChronoUnit.MILLIS);
}
};
}
public static TimeSince timeSince(Clock clock) {
return new TimeSince() {
private long lastTimeInMillis;
@Override
public void reset() {
lastTimeInMillis = clock.currentTimeInMillis();
}
@Override
public long elapsedTime() {
long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;
return elapsed <= 0L ? 0L : elapsed;
}
};
}
//......
}
- Threads定义了Timer接口,该接口定义了expired、remaining方法;timer方法先通过timeSince创建TimeSince,然后创建一个匿名Timer
LoggingContext
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.java
代码语言:javascript复制public class LoggingContext {
/**
* The key for the connector type MDC property.
*/
public static final String CONNECTOR_TYPE = "dbz.connectorType";
/**
* The key for the connector logical name MDC property.
*/
public static final String CONNECTOR_NAME = "dbz.connectorName";
/**
* The key for the connector context name MDC property.
*/
public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";
private LoggingContext() {
}
/**
* A snapshot of an MDC context that can be {@link #restore()}.
*/
public static final class PreviousContext {
private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap();
private final Map<String, String> context;
protected PreviousContext() {
Map<String, String> context = MDC.getCopyOfContextMap();
this.context = context != null ? context : EMPTY_CONTEXT;
}
/**
* Restore this logging context.
*/
public void restore() {
MDC.setContextMap(context);
}
}
//......
}
- LoggingContext定义了PreviousContext,其构造器使用MDC.getCopyOfContextMap()拷贝的当前的MDC,其restore方法把之前拷贝的MDC数据再次设置到MDC中
Metronome
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.java
代码语言:javascript复制@FunctionalInterface
public interface Metronome {
public void pause() throws InterruptedException;
public static Metronome sleeper(Duration period, Clock timeSystem) {
long periodInMillis = period.toMillis();
return new Metronome() {
private long next = timeSystem.currentTimeInMillis() periodInMillis;
@Override
public void pause() throws InterruptedException {
for (;;) {
final long now = timeSystem.currentTimeInMillis();
if (next <= now) {
break;
}
Thread.sleep(next - now);
}
next = next periodInMillis;
}
@Override
public String toString() {
return "Metronome (sleep for " periodInMillis " ms)";
}
};
}
//......
}
- Metronome接口定义了pause方法;它提供了sleeper静态方法用于创建匿名的Metronome实现类,该实现类的pause方法通过Thread.sleep来实现pause
小结
ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0
为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
doc
- ChangeEventQueue