序
本文主要研究一下lettuce的指标监控
DefaultCommandLatencyEventPublisher
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/event/metrics/DefaultCommandLatencyEventPublisher.java
代码语言:javascript复制public class DefaultCommandLatencyEventPublisher implements MetricEventPublisher {
private final EventExecutorGroup eventExecutorGroup;
private final EventPublisherOptions options;
private final EventBus eventBus;
private final CommandLatencyCollector commandLatencyCollector;
private final Runnable EMITTER = this::emitMetricsEvent;
private volatile ScheduledFuture<?> scheduledFuture;
public DefaultCommandLatencyEventPublisher(EventExecutorGroup eventExecutorGroup, EventPublisherOptions options,
EventBus eventBus, CommandLatencyCollector commandLatencyCollector) {
this.eventExecutorGroup = eventExecutorGroup;
this.options = options;
this.eventBus = eventBus;
this.commandLatencyCollector = commandLatencyCollector;
if (!options.eventEmitInterval().isZero()) {
scheduledFuture = this.eventExecutorGroup.scheduleAtFixedRate(EMITTER, options.eventEmitInterval().toMillis(),
options.eventEmitInterval().toMillis(), TimeUnit.MILLISECONDS);
}
}
@Override
public boolean isEnabled() {
return !options.eventEmitInterval().isZero() && scheduledFuture != null;
}
@Override
public void shutdown() {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
scheduledFuture = null;
}
}
@Override
public void emitMetricsEvent() {
if (!isEnabled() || !commandLatencyCollector.isEnabled()) {
return;
}
eventBus.publish(new CommandLatencyEvent(commandLatencyCollector.retrieveMetrics()));
}
}
- 这里判断如果EventPublisherOptions的eventEmitInterval不为0的话,则开启调度定时调用emitMetricsEvent发布命令延时事件
LettuceConnectionConfiguration
spring-boot-autoconfigure-2.0.4.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/data/redis/LettuceConnectionConfiguration.java
代码语言:javascript复制@Configuration
@ConditionalOnClass(RedisClient.class)
class LettuceConnectionConfiguration extends RedisConnectionConfiguration {
private final RedisProperties properties;
private final List<LettuceClientConfigurationBuilderCustomizer> builderCustomizers;
LettuceConnectionConfiguration(RedisProperties properties,
ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider,
ObjectProvider<List<LettuceClientConfigurationBuilderCustomizer>> builderCustomizers) {
super(properties, sentinelConfigurationProvider, clusterConfigurationProvider);
this.properties = properties;
this.builderCustomizers = builderCustomizers
.getIfAvailable(Collections::emptyList);
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(ClientResources.class)
public DefaultClientResources lettuceClientResources() {
return DefaultClientResources.create();
}
//......
}
- LettuceConnectionConfiguration使用默认配置创建了DefaultClientResources
DefaultEventPublisherOptions
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/event/DefaultEventPublisherOptions.java
代码语言:javascript复制public class DefaultEventPublisherOptions implements EventPublisherOptions {
public static final long DEFAULT_EMIT_INTERVAL = 10;
public static final TimeUnit DEFAULT_EMIT_INTERVAL_UNIT = TimeUnit.MINUTES;
public static final Duration DEFAULT_EMIT_INTERVAL_DURATION = Duration.ofMinutes(DEFAULT_EMIT_INTERVAL);
//......
}
- 这里DEFAULT_EMIT_INTERVAL_DURATION默认为10分钟
CommandLatencyCollector
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/metrics/CommandLatencyCollector.java
代码语言:javascript复制public interface CommandLatencyCollector extends MetricCollector<Map<CommandLatencyId, CommandMetrics>> {
/**
* Record the command latency per {@code connectionPoint} and {@code commandType}.
*
* @param local the local address
* @param remote the remote address
* @param commandType the command type
* @param firstResponseLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the first response
* @param completionLatency latency value in {@link TimeUnit#NANOSECONDS} from send to the command completion
*/
void recordCommandLatency(SocketAddress local, SocketAddress remote, ProtocolKeyword commandType,
long firstResponseLatency, long completionLatency);
}
- DefaultCommandLatencyEventPublisher主要是依靠CommandLatencyCollector来获取指标的,而其指标的采集则依靠recordCommandLatency方法来记录
CommandHandler
lettuce-core-5.0.4.RELEASE-sources.jar!/io/lettuce/core/protocol/CommandHandler.java
代码语言:javascript复制/**
* A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server.
*
* @author Will Glozer
* @author Mark Paluch
* @author Jongyeol Choi
* @author Grzegorz Szpak
*/
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {
//......
/**
* @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
if (!input.isReadable() || input.refCnt() == 0) {
logger.warn("{} Input not readable {}, {}", logPrefix(), input.isReadable(), input.refCnt());
return;
}
if (debugEnabled) {
logger.debug("{} Received: {} bytes, {} commands in the stack", logPrefix(), input.readableBytes(), stack.size());
}
try {
if (buffer.refCnt() < 1) {
logger.warn("{} Ignoring received data for closed or abandoned connection", logPrefix());
return;
}
if (debugEnabled && ctx.channel() != channel) {
logger.debug("{} Ignoring data for a non-registered channel {}", logPrefix(), ctx.channel());
return;
}
if (traceEnabled) {
logger.trace("{} Buffer: {}", logPrefix(), input.toString(Charset.defaultCharset()).trim());
}
buffer.writeBytes(input);
decode(ctx, buffer);
} finally {
input.release();
}
}
private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {
if (latencyMetricsEnabled && command instanceof WithLatency) {
WithLatency withLatency = (WithLatency) command;
if (withLatency.getFirstResponse() == -1) {
withLatency.firstResponse(nanoTime());
}
if (!decode0(ctx, buffer, command)) {
return false;
}
recordLatency(withLatency, command.getType());
return true;
}
return decode0(ctx, buffer, command);
}
private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
if (withLatency != null && clientResources.commandLatencyCollector().isEnabled() && channel != null && remote() != null) {
long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
long completionLatency = nanoTime() - withLatency.getSent();
clientResources.commandLatencyCollector().recordCommandLatency(local(), remote(), commandType,
firstResponseLatency, completionLatency);
}
}
}
- 这里channelRead会调用decode方法,而decode方法最后调用clientResources.commandLatencyCollector().recordCommandLatency记录命令的延时
消费事件实例
代码语言:javascript复制public class LettuceEventConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(LettuceEventConsumer.class);
EventBus eventBus;
public LettuceEventConsumer(EventBus eventBus) {
this.eventBus = eventBus;
}
@PostConstruct
public void init(){
eventBus.get().subscribe(e -> {
LOGGER.info("event:{}",e);
});
}
}
- 这里我们通过eventBus获取到Flux,然后进行消费,输出实例如下:
2018-09-11 16:32:57.361 INFO 6656 --- [xecutorLoop-1-3] com.example.config.LettuceEventConsumer : event:{[local:any -> /192.168.99.100:6379, commandType=GET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=884, max=888, percentiles={50.0=888, 90.0=888, 95.0=888, 99.0=888, 99.9=888}], completion=[min=950, max=954, percentiles={50.0=954, 90.0=954, 95.0=954, 99.0=954, 99.9=954}]], [local:any -> /192.168.99.100:6379, commandType=INFO]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=1449, max=1458, percentiles={50.0=1458, 90.0=1458, 95.0=1458, 99.0=1458, 99.9=1458}], completion=[min=2457, max=2473, percentiles={50.0=2473, 90.0=2473, 95.0=2473, 99.0=2473, 99.9=2473}]], [local:any -> /192.168.99.100:6379, commandType=PUBLISH]=[count=40, timeUnit=MICROSECONDS, firstResponse=[min=708, max=17956, percentiles={50.0=1343, 90.0=2719, 95.0=3948, 99.0=17956, 99.9=17956}], completion=[min=733, max=17956, percentiles={50.0=1376, 90.0=2752, 95.0=3981, 99.0=17956, 99.9=17956}]], [local:any -> /192.168.99.100:6379, commandType=SET]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=909, max=913, percentiles={50.0=913, 90.0=913, 95.0=913, 99.0=913, 99.9=913}], completion=[min=995, max=999, percentiles={50.0=999, 90.0=999, 95.0=999, 99.0=999, 99.9=999}]], [local:any -> /192.168.99.100:6379, commandType=SUBSCRIBE]=[count=1, timeUnit=MICROSECONDS, firstResponse=[min=19267, max=19398, percentiles={50.0=19398, 90.0=19398, 95.0=19398, 99.0=19398, 99.9=19398}], completion=[min=41418, max=41680, percentiles={50.0=41680, 90.0=41680, 95.0=41680, 99.0=41680, 99.9=41680}]]}
小结
lettuce通过内置eventBus,然后对其命令的执行发布相应的延时事件,client端可以根据需求消费eventBus的数据来获取lettuce的相关指标。可以说在指标监控场景,采用事件驱动的方式进行实现,显得更为灵活,把Event-Driven Architecture的思想发挥的淋漓尽致。
doc
- clientresources.advanced-settings
- Implement an EventBus system to publish events and metrics #124