日志打入kafka改造历程-我们到底能走多远系列

2019-01-28 10:29:45 浏览数 (1)

方案

日志收集的方案有很多,包括各种日志过滤清洗,分析,统计,而且看起来都很高大上。本文只描述一个打入kafka的功能。

流程:app->kafka->logstash->es->kibana

业务应用直接将日志打入kafka,然后由logstash消费,数据进入es。

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

另一方面,应用在服务器上会打日志文件。

如图:

详细

初步实现

首先,我们来初步实现这个方案,搭建elk略去不谈,其中特别注意各个版本的兼容。这里主要在代码层面讲解如何实现的历程。

要将日志数据写入kafka,我们想只要依赖官方提供的kafka client就可以了,翻看github,有现成的:链接

没多少代码,通看一遍,在此基础上进行修改即可。

以下代码在spring boot框架基础。

核心appender代码:

publicclassKafkaAppenderextendsKafkaAppenderConfig{/**

* Kafka clients uses this prefix for its slf4j logging.

* This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects.

*/privatestaticfinalString KAFKA_LOGGER_PREFIX ="org.apache.kafka.clients";publicstaticfinalLogger logger = LoggerFactory.getLogger(KafkaAppender.class);privateLazyProducer lazyProducer =null;privatefinalAppenderAttachableImpl aai =newAppenderAttachableImpl();privatefinalConcurrentLinkedQueue queue =newConcurrentLinkedQueue();privatefinalFailedDeliveryCallback failedDeliveryCallback =newFailedDeliveryCallback() {@OverridepublicvoidonFailedDelivery(E evt, Throwable throwable){ aai.appendLoopOnAppenders(evt); } };publicKafkaAppender(){// setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); }@OverridepublicvoiddoAppend(E e){ ensureDeferredAppends();if(einstanceofILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { deferAppend(e); }else{super.doAppend(e); } }@Overridepublicvoidstart(){// only error free appenders should be activatedif(!checkPrerequisites())return; lazyProducer =newLazyProducer();super.start(); }@Overridepublicvoidstop(){super.stop();if(lazyProducer !=null&& lazyProducer.isInitialized()) {try{ lazyProducer.get().close(); }catch(KafkaException e) {this.addWarn("Failed to shut down kafka producer: " e.getMessage(), e); } lazyProducer =null; } }@OverridepublicvoidaddAppender(Appender<E> newAppender){ aai.addAppender(newAppender); }@OverridepublicIterator> iteratorForAppenders() {returnaai.iteratorForAppenders(); }@OverridepublicAppender getAppender(String name) {returnaai.getAppender(name); }@OverridepublicbooleanisAttached(Appender<E> appender){returnaai.isAttached(appender); }@OverridepublicvoiddetachAndStopAllAppenders(){ aai.detachAndStopAllAppenders(); }@OverridepublicbooleandetachAppender(Appender<E> appender){returnaai.detachAppender(appender); }@OverridepublicbooleandetachAppender(String name){returnaai.detachAppender(name); }@Overrideprotectedvoidappend(E e){// encode 逻辑finalbyte[] payload = encoder.doEncode(e);finalbyte[] key = keyingStrategy.createKey(e);finalProducerRecord record =newProducerRecord(topic, key, payload); Producer producer = lazyProducer.get();if(producer ==null){ logger.error("kafka producer is null");return; }// 核心发送方法deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); }protectedProducer createProducer() {returnnewKafkaProducer(newHashMap(producerConfig)); }privatevoiddeferAppend(E event){ queue.add(event); }// drains queue events to superprivatevoidensureDeferredAppends(){ E event;while((event = queue.poll()) !=null) {super.doAppend(event); } }/** * Lazy initializer for producer, patterned after commons-lang. * *@seeLazyInitializer */privateclassLazyProducer{privatevolatileProducer producer;privatebooleaninitialized;publicProducer get() { Producer result =this.producer;if(result ==null) {synchronized(this) {if(!initialized){ result =this.producer;if(result ==null) {// 注意 这里initialize可能失败,比如传入servers为非法字符,返回producer为空,所以只用initialized标记来确保不进行重复初始化,而避免不断出错的初始化this.producer = result =this.initialize(); initialized =true; } } } }returnresult; }protectedProducer initialize() { Producer producer =null;try{ producer = createProducer(); }catch(Exception e) { addError("error creating producer", e); }returnproducer; }publicbooleanisInitialized(){returnproducer !=null; } }}

以上代码对producer生产时进行initialized标记,确保在异常场景时只生产一次。

在实际场景中比如我们的servers配置非ip的字符,initialize方法会返回null,因为判断是否进行initialize()方法是判断producer是否为空,所以进入不断失败的情况,从而导致应用启动失败。

配置logback-spring.xml:

${LOG_KAFKA_TOPIC}bootstrap.servers=${LOG_KAFKA_SERVERS}

bootstrap.properties配置:

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

application.log.kafka.bootstrap.servers=10.0.11.55:9092application.log.kafka.topic=prod-java

在打入kafka的json进行自定义,上面的encoder.doEncode(e)进行扩展:

publicclass FormatKafkaMessageEncoder extends KafkaMessageEncoderBase {protectedstaticfinalintBUILDER_CAPACITY =2048;protectedstaticfinalintLENGTH_OPTION =2048;publicstaticfinalStringCAUSED_BY ="Caused by: ";publicstaticfinalStringSUPPRESSED ="Suppressed: ";publicstaticfinalcharTAB ='t';publicbyte[] encode(ILoggingEvent event) { Map formatMap =newHashMap<>(); formatMap.put("timestamp", event.getTimeStamp()!=0?String.valueOf(newDate(event.getTimeStamp())):""); formatMap.put("span", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-SpanId"):""); formatMap.put("trace", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-TraceId"):""); formatMap.put("class", event.getLoggerName()); formatMap.put("level", event.getLevel() !=null?event.getLevel().toString():""); formatMap.put("message", event.getMessage()); formatMap.put("stacktrace", event.getThrowableProxy()!=null?convertStackTrace(event.getThrowableProxy()):""); formatMap.put("thread", event.getThreadName()); formatMap.put("ip", IpUtil.getLocalIP()); formatMap.put("application", event.getLoggerContextVO()!=null&&event.getLoggerContextVO().getPropertyMap()!=null? event.getLoggerContextVO().getPropertyMap().get("springAppName"):"");StringformatJson =JSONObject.toJSONString(formatMap);returnformatJson.getBytes(); } @Overridepublicbyte[] doEncode(E event) {returnencode((ILoggingEvent) event); }publicStringconvertStackTrace(IThrowableProxy tp){ StringBuilder sb =newStringBuilder(BUILDER_CAPACITY); recursiveAppend(sb, tp,null);returnsb.toString(); }privatevoidrecursiveAppend(StringBuilder sb, IThrowableProxy tp,Stringprefix) {if(tp ==null){return; }if(prefix !=null) { sb.append(prefix); } sb.append(tp.getClassName()).append(": ").append(tp.getMessage()); sb.append(CoreConstants.LINE_SEPARATOR); StackTraceElementProxy[] stepArray = tp.getStackTraceElementProxyArray();booleanunrestrictedPrinting = LENGTH_OPTION > stepArray.length;intmaxIndex = (unrestrictedPrinting) ? stepArray.length : LENGTH_OPTION;for(inti =0; i < maxIndex; i ) { sb.append(TAB); StackTraceElementProxy element = stepArray[i]; sb.append(element); sb.append(CoreConstants.LINE_SEPARATOR); } IThrowableProxy[] suppressed = tp.getSuppressed();if(suppressed !=null) {for(IThrowableProxy current : suppressed) { recursiveAppend(sb, current, SUPPRESSED); } } recursiveAppend(sb, tp.getCause(), CAUSED_BY); }}

其中recursiveAppend方法是模仿ch.qos.logback.classic.spi.ThrowableProxyUtil,用来答应异常的全部堆栈。

还有这个ip的获取问题,InetAddress.getLocalHost().getHostAddress()解决不了。

以下是详细代码:

publicclassIpUtil{publicstaticfinalString DEFAULT_IP ="127.0.0.1";publicstaticString cacheLocalIp =null;privatestaticLogger logger = LoggerFactory.getLogger(IpUtil.class);/** * 直接根据第一个网卡地址作为其内网ipv4地址,避免返回 127.0.0.1 * *@return*/privatestaticStringgetLocalIpByNetworkCard(){ String ip =null;try{for(Enumeration e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements(); ) { NetworkInterface item = e.nextElement();for(InterfaceAddress address : item.getInterfaceAddresses()) {if(item.isLoopback() || !item.isUp()) {continue; }if(address.getAddress()instanceofInet4Address) { Inet4Address inet4Address = (Inet4Address) address.getAddress(); ip = inet4Address.getHostAddress(); } } } }catch(Exception e) { logger.error("getLocalIpByNetworkCard error", e);try{ ip = InetAddress.getLocalHost().getHostAddress(); }catch(Exception e1) { logger.error("InetAddress.getLocalHost().getHostAddress() error", e1); ip = DEFAULT_IP; } }returnip ==null? DEFAULT_IP : ip; }publicsynchronizedstaticStringgetLocalIP(){if(cacheLocalIp ==null){ cacheLocalIp = getLocalIpByNetworkCard();returncacheLocalIp; }else{returncacheLocalIp; } }}

另外在logback-spring.xml中配置了本地日志appender:

<!-- 按照每天生成日志文件 --><!-- rollover daily -->${LOG_FOLDER}/${springAppName}.%d{yyyy-MM-dd}.%i.log<!-- each file should be at most 100MB, keep 6 days worth of history-->300MB<!--历史文件保留个数-->5<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->${CONSOLE_LOG_PATTERN}

注意这里使用SizeAndTimeBasedRollingPolicy而不是使用TimeBasedRollingPolicy SizeBasedTriggeringPolicy。

后者是按文件大小优先级最高不会自动按日期生成新的log文件。

至此,一个打入kafka日志的代码就算完结了,功能完全,执行正确。

异常场景

思考下,在启动应用或在应用运行时,kafka无法正确接收信息,比如挂掉了。那么这个打日志的功能会怎么表现呢?

当然是每次写日志都会尝试去连kafka,但是失败,必然影响应用状态。

所以想到熔断的思路,假设kafka挂掉,可以通过熔断的方式降低对应用的影响。

这里就实现了一下熔断器的逻辑。

状态流转图:

熔断器:

/**

* @desc 熔断器

* 1,使用failureCount和consecutiveSuccessCount控制断路器状态的流转,两者都使用了AtomicInteger以确保并发场数量的精准

* 2,successCount 没有使用AtomicInteger 不确保准确性

* 3,failureThreshold,consecutiveSuccessThreshold,timeout参数非法赋默认值

*/publicclassCircuitBreaker{privatestaticfinal Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);privateString name;/**

* 熔断器状态

*/privateCircuitBreakerState state;/**

* 失败次数阀值

*/privateintfailureThreshold;/**

* 熔断状态时间窗口

*/privatelongtimeout;/**

* 失败次数

*/privateAtomicInteger failureCount;/**

* 成功次数 (并发不准确)

*/privateintsuccessCount;/**

* 半开时间窗口里连续成功的次数

*/privateAtomicInteger consecutiveSuccessCount;/**

* 半开时间窗口里连续成功的次数阀值

*/privateintconsecutiveSuccessThreshold;publicCircuitBreaker(String name,intfailureThreshold,intconsecutiveSuccessThreshold,longtimeout){if(failureThreshold <=0){ failureThreshold =1; }if(consecutiveSuccessThreshold <=0){ consecutiveSuccessThreshold =1; }if(timeout <=0){ timeout =10000; }this.name = name;this.failureThreshold = failureThreshold;this.consecutiveSuccessThreshold = consecutiveSuccessThreshold;this.timeout = timeout;this.failureCount =newAtomicInteger(0);this.consecutiveSuccessCount =newAtomicInteger(0); state =newCloseCircuitBreakerState(this); }publicvoidincreaseFailureCount(){ failureCount.addAndGet(1); }publicvoidincreaseSuccessCount(){ successCount ; }publicvoidincreaseConsecutiveSuccessCount(){ consecutiveSuccessCount.addAndGet(1); }publicbooleanincreaseFailureCountAndThresholdReached(){returnfailureCount.addAndGet(1) >= failureThreshold; }publicbooleanincreaseConsecutiveSuccessCountAndThresholdReached(){returnconsecutiveSuccessCount.addAndGet(1) >= consecutiveSuccessThreshold; }publicbooleanisNotOpen(){return!isOpen(); }/**

* 熔断开启 关闭保护方法的调用

* @return

*/publicbooleanisOpen(){returnstate instanceof OpenCircuitBreakerState; }/**

* 熔断关闭 保护方法正常执行

* @return

*/publicbooleanisClose(){returnstate instanceof CloseCircuitBreakerState; }/**

* 熔断半开 保护方法允许测试调用

* @return

*/publicbooleanisHalfClose(){returnstate instanceof HalfOpenCircuitBreakerState; }publicvoidtransformToCloseState(){ state =newCloseCircuitBreakerState(this); }publicvoidtransformToHalfOpenState(){ state =newHalfOpenCircuitBreakerState(this); }publicvoidtransformToOpenState(){ state =newOpenCircuitBreakerState(this); }/**

* 重置失败次数

*/publicvoidresetFailureCount(){ failureCount.set(0); }/**

* 重置连续成功次数

*/publicvoidresetConsecutiveSuccessCount(){ consecutiveSuccessCount.set(0); }publiclonggetTimeout(){returntimeout; }/**

* 判断是否到达失败阀值

* @return

*/protectedbooleanfailureThresholdReached(){returnfailureCount.get() >= failureThreshold; }/**

* 判断连续成功次数是否达到阀值

* @return

*/protectedbooleanconsecutiveSuccessThresholdReached(){returnconsecutiveSuccessCount.get() >= consecutiveSuccessThreshold; }/**

* 保护方法失败后操作

*/publicvoidactFailed(){ state.actFailed(); }/**

* 保护方法成功后操作

*/publicvoidactSuccess(){ state.actSuccess(); }publicstaticinterface Executor {/**

* 任务执行接口

*

*/voidexecute(); }publicvoidexecute(Executor executor){if(!isOpen()){try{ executor.execute();this.actSuccess(); }catch(Exception e){this.actFailed(); logger.error("CircuitBreaker executor error", e); } }else{ logger.error("CircuitBreaker named {} is open",this.name); } }publicStringshow(){ Mapmap=newHashMap<>();map.put("name:",name);map.put("state", isClose());map.put("failureThreshold:",failureThreshold);map.put("failureCount:",failureCount);map.put("consecutiveSuccessThreshold:",consecutiveSuccessThreshold);map.put("consecutiveSuccessCount:",consecutiveSuccessCount);map.put("successCount:",successCount);map.put("timeout:",timeout);map.put("state class",state.getClass());returnJSONObject.toJSONString(map); }}

状态机:

publicinterfaceCircuitBreakerState{/**

* 保护方法失败后操作

*/voidactFailed();/**

* 保护方法成功后操作

*/voidactSuccess();}publicabstractclassAbstractCircuitBreakerStateimplementsCircuitBreakerState{protectedCircuitBreaker circuitBreaker;publicAbstractCircuitBreakerState(CircuitBreaker circuitBreaker){this.circuitBreaker = circuitBreaker; }@OverridepublicvoidactFailed(){ circuitBreaker.increaseFailureCount(); }@OverridepublicvoidactSuccess(){ circuitBreaker.increaseSuccessCount(); }}publicclassCloseCircuitBreakerStateextendsAbstractCircuitBreakerState{publicCloseCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker); circuitBreaker.resetFailureCount(); circuitBreaker.resetConsecutiveSuccessCount(); }@OverridepublicvoidactFailed(){// 进入开启状态if(circuitBreaker.increaseFailureCountAndThresholdReached()) { circuitBreaker.transformToOpenState(); } }}publicclassHalfOpenCircuitBreakerStateextendsAbstractCircuitBreakerState{publicHalfOpenCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker); circuitBreaker.resetConsecutiveSuccessCount(); }@OverridepublicvoidactFailed(){super.actFailed(); circuitBreaker.transformToOpenState(); }@OverridepublicvoidactSuccess(){super.actSuccess();// 达到成功次数的阀值 关闭熔断if(circuitBreaker.increaseFailureCountAndThresholdReached()){ circuitBreaker.transformToCloseState(); } }}publicclassOpenCircuitBreakerStateextendsAbstractCircuitBreakerState{publicOpenCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker);finalTimer timer =newTimer(); timer.schedule(newTimerTask() {@Overridepublicvoidrun(){ circuitBreaker.transformToHalfOpenState(); timer.cancel(); } }, circuitBreaker.getTimeout()); }}/* @desc 熔断器工厂 集中应用中的CircuitBreaker

* 注意:这里一个熔断器一旦生产,生命周期和应用一样,不会被清除

*/publicclassCircuitBreakerFactory{privatestaticConcurrentHashMap circuitBreakerMap =newConcurrentHashMap();publicCircuitBreakergetCircuitBreaker(String name){ CircuitBreaker circuitBreaker = circuitBreakerMap.get(name);returncircuitBreaker; }/** * *@paramname 唯一名称 *@paramfailureThreshold 失败次数阀值 *@paramconsecutiveSuccessThreshold 时间窗内成功次数阀值 *@paramtimeout 时间窗 * 1,close状态时 失败次数>=failureThreshold,进入open状态 * 2,open状态时每隔timeout时间会进入halfOpen状态 * 3,halfOpen状态里需要连续成功次数达到consecutiveSuccessThreshold, * 即可进入close状态,出现失败则继续进入open状态 *@return*/publicstaticCircuitBreakerbuildCircuitBreaker(String name,intfailureThreshold,intconsecutiveSuccessThreshold,longtimeout){ CircuitBreaker circuitBreaker =newCircuitBreaker(name, failureThreshold, consecutiveSuccessThreshold, timeout); circuitBreakerMap.put(name, circuitBreaker);returncircuitBreaker; }}

发送kafka消息时使用熔断器:

/**

* 因日志为非业务应用核心服务,防止kafka不稳定导致影响应用状态,这里使用使用熔断机制 失败3次开启熔断,每隔20秒半开熔断,连续成功两次关闭熔断。

*/CircuitBreaker circuitBreaker = CircuitBreakerFactory.buildCircuitBreaker("KafkaAppender-c",3,2,20000); public boolean send(Producer producer, ProducerRecord record, final E event, final FailedDeliveryCallback failedDeliveryCallback) {if(circuitBreaker.isNotOpen()){try{ producer.send(record, (metadata, exception) -> {if(exception !=null) { circuitBreaker.actFailed(); failedDeliveryCallback.onFailedDelivery(event, exception); logger.error("kafka producer send log error",exception); }else{ circuitBreaker.actSuccess(); } });returntrue; }catch(KafkaException e){circuitBreaker.actFailed();failedDeliveryCallback.onFailedDelivery(event, e);logger.error("kafka send log error",e);returnfalse; } }else{logger.error("kafka log circuitBreaker open");returnfalse; } }

总结

1,elk搭建时需特别注意各个版本的兼容,kafka client的版本需和kafka版本保持一致

2,方案容许kafka日志失败,而本地日志更加可靠,所以用熔断器方案,以应对万一。也可用于对其他第三方请求时使用。

0 人点赞