方案
日志收集的方案有很多,包括各种日志过滤清洗,分析,统计,而且看起来都很高大上。本文只描述一个打入kafka的功能。
流程:app->kafka->logstash->es->kibana
业务应用直接将日志打入kafka,然后由logstash消费,数据进入es。
如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。
另一方面,应用在服务器上会打日志文件。
如图:
详细
初步实现
首先,我们来初步实现这个方案,搭建elk略去不谈,其中特别注意各个版本的兼容。这里主要在代码层面讲解如何实现的历程。
要将日志数据写入kafka,我们想只要依赖官方提供的kafka client就可以了,翻看github,有现成的:链接
没多少代码,通看一遍,在此基础上进行修改即可。
以下代码在spring boot框架基础。
核心appender代码:
publicclassKafkaAppenderextendsKafkaAppenderConfig{/**
* Kafka clients uses this prefix for its slf4j logging.
* This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects.
*/privatestaticfinalString KAFKA_LOGGER_PREFIX ="org.apache.kafka.clients";publicstaticfinalLogger logger = LoggerFactory.getLogger(KafkaAppender.class);privateLazyProducer lazyProducer =null;privatefinalAppenderAttachableImpl aai =newAppenderAttachableImpl();privatefinalConcurrentLinkedQueue queue =newConcurrentLinkedQueue();privatefinalFailedDeliveryCallback failedDeliveryCallback =newFailedDeliveryCallback() {@OverridepublicvoidonFailedDelivery(E evt, Throwable throwable){ aai.appendLoopOnAppenders(evt); } };publicKafkaAppender(){// setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); }@OverridepublicvoiddoAppend(E e){ ensureDeferredAppends();if(einstanceofILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { deferAppend(e); }else{super.doAppend(e); } }@Overridepublicvoidstart(){// only error free appenders should be activatedif(!checkPrerequisites())return; lazyProducer =newLazyProducer();super.start(); }@Overridepublicvoidstop(){super.stop();if(lazyProducer !=null&& lazyProducer.isInitialized()) {try{ lazyProducer.get().close(); }catch(KafkaException e) {this.addWarn("Failed to shut down kafka producer: " e.getMessage(), e); } lazyProducer =null; } }@OverridepublicvoidaddAppender(Appender<E> newAppender){ aai.addAppender(newAppender); }@OverridepublicIterator> iteratorForAppenders() {returnaai.iteratorForAppenders(); }@OverridepublicAppender getAppender(String name) {returnaai.getAppender(name); }@OverridepublicbooleanisAttached(Appender<E> appender){returnaai.isAttached(appender); }@OverridepublicvoiddetachAndStopAllAppenders(){ aai.detachAndStopAllAppenders(); }@OverridepublicbooleandetachAppender(Appender<E> appender){returnaai.detachAppender(appender); }@OverridepublicbooleandetachAppender(String name){returnaai.detachAppender(name); }@Overrideprotectedvoidappend(E e){// encode 逻辑finalbyte[] payload = encoder.doEncode(e);finalbyte[] key = keyingStrategy.createKey(e);finalProducerRecord record =newProducerRecord(topic, key, payload); Producer producer = lazyProducer.get();if(producer ==null){ logger.error("kafka producer is null");return; }// 核心发送方法deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); }protectedProducer createProducer() {returnnewKafkaProducer(newHashMap(producerConfig)); }privatevoiddeferAppend(E event){ queue.add(event); }// drains queue events to superprivatevoidensureDeferredAppends(){ E event;while((event = queue.poll()) !=null) {super.doAppend(event); } }/** * Lazy initializer for producer, patterned after commons-lang. * *@seeLazyInitializer */privateclassLazyProducer{privatevolatileProducer producer;privatebooleaninitialized;publicProducer get() { Producer result =this.producer;if(result ==null) {synchronized(this) {if(!initialized){ result =this.producer;if(result ==null) {// 注意 这里initialize可能失败,比如传入servers为非法字符,返回producer为空,所以只用initialized标记来确保不进行重复初始化,而避免不断出错的初始化this.producer = result =this.initialize(); initialized =true; } } } }returnresult; }protectedProducer initialize() { Producer producer =null;try{ producer = createProducer(); }catch(Exception e) { addError("error creating producer", e); }returnproducer; }publicbooleanisInitialized(){returnproducer !=null; } }}
以上代码对producer生产时进行initialized标记,确保在异常场景时只生产一次。
在实际场景中比如我们的servers配置非ip的字符,initialize方法会返回null,因为判断是否进行initialize()方法是判断producer是否为空,所以进入不断失败的情况,从而导致应用启动失败。
配置logback-spring.xml:
${LOG_KAFKA_TOPIC}bootstrap.servers=${LOG_KAFKA_SERVERS}
bootstrap.properties配置:
如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。
application.log.kafka.bootstrap.servers=10.0.11.55:9092application.log.kafka.topic=prod-java
在打入kafka的json进行自定义,上面的encoder.doEncode(e)进行扩展:
publicclass FormatKafkaMessageEncoder extends KafkaMessageEncoderBase {protectedstaticfinalintBUILDER_CAPACITY =2048;protectedstaticfinalintLENGTH_OPTION =2048;publicstaticfinalStringCAUSED_BY ="Caused by: ";publicstaticfinalStringSUPPRESSED ="Suppressed: ";publicstaticfinalcharTAB ='t';publicbyte[] encode(ILoggingEvent event) { Map formatMap =newHashMap<>(); formatMap.put("timestamp", event.getTimeStamp()!=0?String.valueOf(newDate(event.getTimeStamp())):""); formatMap.put("span", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-SpanId"):""); formatMap.put("trace", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-TraceId"):""); formatMap.put("class", event.getLoggerName()); formatMap.put("level", event.getLevel() !=null?event.getLevel().toString():""); formatMap.put("message", event.getMessage()); formatMap.put("stacktrace", event.getThrowableProxy()!=null?convertStackTrace(event.getThrowableProxy()):""); formatMap.put("thread", event.getThreadName()); formatMap.put("ip", IpUtil.getLocalIP()); formatMap.put("application", event.getLoggerContextVO()!=null&&event.getLoggerContextVO().getPropertyMap()!=null? event.getLoggerContextVO().getPropertyMap().get("springAppName"):"");StringformatJson =JSONObject.toJSONString(formatMap);returnformatJson.getBytes(); } @Overridepublicbyte[] doEncode(E event) {returnencode((ILoggingEvent) event); }publicStringconvertStackTrace(IThrowableProxy tp){ StringBuilder sb =newStringBuilder(BUILDER_CAPACITY); recursiveAppend(sb, tp,null);returnsb.toString(); }privatevoidrecursiveAppend(StringBuilder sb, IThrowableProxy tp,Stringprefix) {if(tp ==null){return; }if(prefix !=null) { sb.append(prefix); } sb.append(tp.getClassName()).append(": ").append(tp.getMessage()); sb.append(CoreConstants.LINE_SEPARATOR); StackTraceElementProxy[] stepArray = tp.getStackTraceElementProxyArray();booleanunrestrictedPrinting = LENGTH_OPTION > stepArray.length;intmaxIndex = (unrestrictedPrinting) ? stepArray.length : LENGTH_OPTION;for(inti =0; i < maxIndex; i ) { sb.append(TAB); StackTraceElementProxy element = stepArray[i]; sb.append(element); sb.append(CoreConstants.LINE_SEPARATOR); } IThrowableProxy[] suppressed = tp.getSuppressed();if(suppressed !=null) {for(IThrowableProxy current : suppressed) { recursiveAppend(sb, current, SUPPRESSED); } } recursiveAppend(sb, tp.getCause(), CAUSED_BY); }}
其中recursiveAppend方法是模仿ch.qos.logback.classic.spi.ThrowableProxyUtil,用来答应异常的全部堆栈。
还有这个ip的获取问题,InetAddress.getLocalHost().getHostAddress()解决不了。
以下是详细代码:
publicclassIpUtil{publicstaticfinalString DEFAULT_IP ="127.0.0.1";publicstaticString cacheLocalIp =null;privatestaticLogger logger = LoggerFactory.getLogger(IpUtil.class);/** * 直接根据第一个网卡地址作为其内网ipv4地址,避免返回 127.0.0.1 * *@return*/privatestaticStringgetLocalIpByNetworkCard(){ String ip =null;try{for(Enumeration e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements(); ) { NetworkInterface item = e.nextElement();for(InterfaceAddress address : item.getInterfaceAddresses()) {if(item.isLoopback() || !item.isUp()) {continue; }if(address.getAddress()instanceofInet4Address) { Inet4Address inet4Address = (Inet4Address) address.getAddress(); ip = inet4Address.getHostAddress(); } } } }catch(Exception e) { logger.error("getLocalIpByNetworkCard error", e);try{ ip = InetAddress.getLocalHost().getHostAddress(); }catch(Exception e1) { logger.error("InetAddress.getLocalHost().getHostAddress() error", e1); ip = DEFAULT_IP; } }returnip ==null? DEFAULT_IP : ip; }publicsynchronizedstaticStringgetLocalIP(){if(cacheLocalIp ==null){ cacheLocalIp = getLocalIpByNetworkCard();returncacheLocalIp; }else{returncacheLocalIp; } }}
另外在logback-spring.xml中配置了本地日志appender:
<!-- 按照每天生成日志文件 --><!-- rollover daily -->${LOG_FOLDER}/${springAppName}.%d{yyyy-MM-dd}.%i.log<!-- each file should be at most 100MB, keep 6 days worth of history-->300MB<!--历史文件保留个数-->5<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->${CONSOLE_LOG_PATTERN}
注意这里使用SizeAndTimeBasedRollingPolicy而不是使用TimeBasedRollingPolicy SizeBasedTriggeringPolicy。
后者是按文件大小优先级最高不会自动按日期生成新的log文件。
至此,一个打入kafka日志的代码就算完结了,功能完全,执行正确。
异常场景
思考下,在启动应用或在应用运行时,kafka无法正确接收信息,比如挂掉了。那么这个打日志的功能会怎么表现呢?
当然是每次写日志都会尝试去连kafka,但是失败,必然影响应用状态。
所以想到熔断的思路,假设kafka挂掉,可以通过熔断的方式降低对应用的影响。
这里就实现了一下熔断器的逻辑。
状态流转图:
熔断器:
/**
* @desc 熔断器
* 1,使用failureCount和consecutiveSuccessCount控制断路器状态的流转,两者都使用了AtomicInteger以确保并发场数量的精准
* 2,successCount 没有使用AtomicInteger 不确保准确性
* 3,failureThreshold,consecutiveSuccessThreshold,timeout参数非法赋默认值
*/publicclassCircuitBreaker{privatestaticfinal Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);privateString name;/**
* 熔断器状态
*/privateCircuitBreakerState state;/**
* 失败次数阀值
*/privateintfailureThreshold;/**
* 熔断状态时间窗口
*/privatelongtimeout;/**
* 失败次数
*/privateAtomicInteger failureCount;/**
* 成功次数 (并发不准确)
*/privateintsuccessCount;/**
* 半开时间窗口里连续成功的次数
*/privateAtomicInteger consecutiveSuccessCount;/**
* 半开时间窗口里连续成功的次数阀值
*/privateintconsecutiveSuccessThreshold;publicCircuitBreaker(String name,intfailureThreshold,intconsecutiveSuccessThreshold,longtimeout){if(failureThreshold <=0){ failureThreshold =1; }if(consecutiveSuccessThreshold <=0){ consecutiveSuccessThreshold =1; }if(timeout <=0){ timeout =10000; }this.name = name;this.failureThreshold = failureThreshold;this.consecutiveSuccessThreshold = consecutiveSuccessThreshold;this.timeout = timeout;this.failureCount =newAtomicInteger(0);this.consecutiveSuccessCount =newAtomicInteger(0); state =newCloseCircuitBreakerState(this); }publicvoidincreaseFailureCount(){ failureCount.addAndGet(1); }publicvoidincreaseSuccessCount(){ successCount ; }publicvoidincreaseConsecutiveSuccessCount(){ consecutiveSuccessCount.addAndGet(1); }publicbooleanincreaseFailureCountAndThresholdReached(){returnfailureCount.addAndGet(1) >= failureThreshold; }publicbooleanincreaseConsecutiveSuccessCountAndThresholdReached(){returnconsecutiveSuccessCount.addAndGet(1) >= consecutiveSuccessThreshold; }publicbooleanisNotOpen(){return!isOpen(); }/**
* 熔断开启 关闭保护方法的调用
* @return
*/publicbooleanisOpen(){returnstate instanceof OpenCircuitBreakerState; }/**
* 熔断关闭 保护方法正常执行
* @return
*/publicbooleanisClose(){returnstate instanceof CloseCircuitBreakerState; }/**
* 熔断半开 保护方法允许测试调用
* @return
*/publicbooleanisHalfClose(){returnstate instanceof HalfOpenCircuitBreakerState; }publicvoidtransformToCloseState(){ state =newCloseCircuitBreakerState(this); }publicvoidtransformToHalfOpenState(){ state =newHalfOpenCircuitBreakerState(this); }publicvoidtransformToOpenState(){ state =newOpenCircuitBreakerState(this); }/**
* 重置失败次数
*/publicvoidresetFailureCount(){ failureCount.set(0); }/**
* 重置连续成功次数
*/publicvoidresetConsecutiveSuccessCount(){ consecutiveSuccessCount.set(0); }publiclonggetTimeout(){returntimeout; }/**
* 判断是否到达失败阀值
* @return
*/protectedbooleanfailureThresholdReached(){returnfailureCount.get() >= failureThreshold; }/**
* 判断连续成功次数是否达到阀值
* @return
*/protectedbooleanconsecutiveSuccessThresholdReached(){returnconsecutiveSuccessCount.get() >= consecutiveSuccessThreshold; }/**
* 保护方法失败后操作
*/publicvoidactFailed(){ state.actFailed(); }/**
* 保护方法成功后操作
*/publicvoidactSuccess(){ state.actSuccess(); }publicstaticinterface Executor {/**
* 任务执行接口
*
*/voidexecute(); }publicvoidexecute(Executor executor){if(!isOpen()){try{ executor.execute();this.actSuccess(); }catch(Exception e){this.actFailed(); logger.error("CircuitBreaker executor error", e); } }else{ logger.error("CircuitBreaker named {} is open",this.name); } }publicStringshow(){ Mapmap=newHashMap<>();map.put("name:",name);map.put("state", isClose());map.put("failureThreshold:",failureThreshold);map.put("failureCount:",failureCount);map.put("consecutiveSuccessThreshold:",consecutiveSuccessThreshold);map.put("consecutiveSuccessCount:",consecutiveSuccessCount);map.put("successCount:",successCount);map.put("timeout:",timeout);map.put("state class",state.getClass());returnJSONObject.toJSONString(map); }}
状态机:
publicinterfaceCircuitBreakerState{/**
* 保护方法失败后操作
*/voidactFailed();/**
* 保护方法成功后操作
*/voidactSuccess();}publicabstractclassAbstractCircuitBreakerStateimplementsCircuitBreakerState{protectedCircuitBreaker circuitBreaker;publicAbstractCircuitBreakerState(CircuitBreaker circuitBreaker){this.circuitBreaker = circuitBreaker; }@OverridepublicvoidactFailed(){ circuitBreaker.increaseFailureCount(); }@OverridepublicvoidactSuccess(){ circuitBreaker.increaseSuccessCount(); }}publicclassCloseCircuitBreakerStateextendsAbstractCircuitBreakerState{publicCloseCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker); circuitBreaker.resetFailureCount(); circuitBreaker.resetConsecutiveSuccessCount(); }@OverridepublicvoidactFailed(){// 进入开启状态if(circuitBreaker.increaseFailureCountAndThresholdReached()) { circuitBreaker.transformToOpenState(); } }}publicclassHalfOpenCircuitBreakerStateextendsAbstractCircuitBreakerState{publicHalfOpenCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker); circuitBreaker.resetConsecutiveSuccessCount(); }@OverridepublicvoidactFailed(){super.actFailed(); circuitBreaker.transformToOpenState(); }@OverridepublicvoidactSuccess(){super.actSuccess();// 达到成功次数的阀值 关闭熔断if(circuitBreaker.increaseFailureCountAndThresholdReached()){ circuitBreaker.transformToCloseState(); } }}publicclassOpenCircuitBreakerStateextendsAbstractCircuitBreakerState{publicOpenCircuitBreakerState(CircuitBreaker circuitBreaker){super(circuitBreaker);finalTimer timer =newTimer(); timer.schedule(newTimerTask() {@Overridepublicvoidrun(){ circuitBreaker.transformToHalfOpenState(); timer.cancel(); } }, circuitBreaker.getTimeout()); }}/* @desc 熔断器工厂 集中应用中的CircuitBreaker
* 注意:这里一个熔断器一旦生产,生命周期和应用一样,不会被清除
*/publicclassCircuitBreakerFactory{privatestaticConcurrentHashMap circuitBreakerMap =newConcurrentHashMap();publicCircuitBreakergetCircuitBreaker(String name){ CircuitBreaker circuitBreaker = circuitBreakerMap.get(name);returncircuitBreaker; }/** * *@paramname 唯一名称 *@paramfailureThreshold 失败次数阀值 *@paramconsecutiveSuccessThreshold 时间窗内成功次数阀值 *@paramtimeout 时间窗 * 1,close状态时 失败次数>=failureThreshold,进入open状态 * 2,open状态时每隔timeout时间会进入halfOpen状态 * 3,halfOpen状态里需要连续成功次数达到consecutiveSuccessThreshold, * 即可进入close状态,出现失败则继续进入open状态 *@return*/publicstaticCircuitBreakerbuildCircuitBreaker(String name,intfailureThreshold,intconsecutiveSuccessThreshold,longtimeout){ CircuitBreaker circuitBreaker =newCircuitBreaker(name, failureThreshold, consecutiveSuccessThreshold, timeout); circuitBreakerMap.put(name, circuitBreaker);returncircuitBreaker; }}
发送kafka消息时使用熔断器:
/**
* 因日志为非业务应用核心服务,防止kafka不稳定导致影响应用状态,这里使用使用熔断机制 失败3次开启熔断,每隔20秒半开熔断,连续成功两次关闭熔断。
*/CircuitBreaker circuitBreaker = CircuitBreakerFactory.buildCircuitBreaker("KafkaAppender-c",3,2,20000); public boolean send(Producer producer, ProducerRecord record, final E event, final FailedDeliveryCallback failedDeliveryCallback) {if(circuitBreaker.isNotOpen()){try{ producer.send(record, (metadata, exception) -> {if(exception !=null) { circuitBreaker.actFailed(); failedDeliveryCallback.onFailedDelivery(event, exception); logger.error("kafka producer send log error",exception); }else{ circuitBreaker.actSuccess(); } });returntrue; }catch(KafkaException e){circuitBreaker.actFailed();failedDeliveryCallback.onFailedDelivery(event, e);logger.error("kafka send log error",e);returnfalse; } }else{logger.error("kafka log circuitBreaker open");returnfalse; } }
总结
1,elk搭建时需特别注意各个版本的兼容,kafka client的版本需和kafka版本保持一致
2,方案容许kafka日志失败,而本地日志更加可靠,所以用熔断器方案,以应对万一。也可用于对其他第三方请求时使用。