flume ExecSource使用与源码阅读笔记

2018-09-17 21:55:06 浏览数 (1)

ExecSource是很多人接触flume时第一个使用的source,这里我简单的分析下这个source的使用与实现。

一、FlumeUserGuide

FlumeUserGuide:https://flume.apache.org/FlumeUserGuide.html

这里我简单翻译和总结下:

Exec source在启动时运行Unix命令,并且期望它会不断的在标准输出中产生数据。 (stderr默认会被丢弃).如果进程因为某些原因退出,Exce Source也将退出并且不会再产生数据。

详细配置说明如下: 加粗的是必配项:

注意:ExecSource无法感知数据的丢失,比如channel满的时候数据发送失败。为了更健壮的数据可靠性,推荐:Spooling Directory Source、Taildir Source或者通过flume的SDK直接实现。

如此一个简单的配置,我们就能实时地将/var/log/secure下的日志发送到flume的channel里面。

代码语言:javascript复制
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

简单说下shell和command,shell主要是区分Bash 或 Powershell。command作为一个参数传递给shell执行,这样command就可以使用shell的一些特性,如wildcards, back ticks, pipes, loops, conditionals,如果shell没有指定,一般会使用默认值‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’。

二、核心流程源码分析

代码语言:java复制
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable{
    xxx;
}

主要函数/类有:

  • start()
  • stop()
  • configure(Context context)
  • static class ExecRunnable implements Runnable
  • static class StderrReader extends Thread

1. start():

代码比较少,功能比较简单:

  1. 线程池
  2. 构建ExecRunnable线程对象,传入配置文件的参数
  3. 启动计数器

2. stop():

功能同样比较简单

  1. kill runner
  2. shutdown executor
  3. stop sourceCounter

3. configure(Context context)

获取excel里面的那些参数

4. ExecRunnable 主要实现类,核心 关键

static class ExecRunnable implements Runnable

核心部分是一个

代码语言:java复制
do{
   xxx;
}while(restart);

所以stop时需要将restart置为false,防止do重做。

do内容主要包括:

1) shell与command处理,然后执行:

代码语言:java复制
if (shell != null) {
  String[] commandArgs = formulateShellCommand(shell, command);
 process = Runtime.getRuntime().exec(commandArgs);
}  else {
  String[] commandArgs = command.split("\s ");
 process = new ProcessBuilder(commandArgs).start();
}

2)启动reader

代码语言:java复制
reader = new BufferedReader(
 new InputStreamReader(process.getInputStream(), charset));

3)启动stderrReader

代码语言:java复制
StderrReader stderrReader = new StderrReader(new BufferedReader(
 new InputStreamReader(process.getErrorStream(), charset)), logStderr);
xxxx;

4)定时任务,每batchTimeout时间批量将eventList flush

代码语言:javascript复制
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
 @Override
 public void run() {
 try {
 synchronized (eventList) {
 if (!eventList.isEmpty() && timeout()) {
            flushEventBatch(eventList);
          }
        }
      } catch (Exception e) {
 logger.error("Exception occurred when processing event batch", e);
 if (e instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
      }
    }
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

5)读取和发送数据

代码语言:java复制
while ((line = reader.readLine()) != null) {
 sourceCounter.incrementEventReceivedCount();
 synchronized (eventList) {
    eventList.add(EventBuilder.withBody(line.getBytes(charset)));
 if (eventList.size() >= bufferCount || timeout()) {
      flushEventBatch(eventList);
    }
  }
}
synchronized (eventList) {
 if (!eventList.isEmpty()) {
    flushEventBatch(eventList);
  }
}

这个通用的小技巧,几乎所有的批处理代码里面都有这类写法:

1. 没到最后一行,达到bufferCount或者到timeout一批处理列表里面的数据

2. 读完最后一行(字节流没数据了),执行批处理

5. StderrReader处理stderr,这里不再赘述

static class StderrReader extends Thread

三、其他代码片段

1.批量flush Event

代码语言:java复制
private void flushEventBatch(List<Event> eventList) {
 channelProcessor.processEventBatch(eventList);
 sourceCounter.addToEventAcceptedCount(eventList.size());
  eventList.clear();
 lastPushToChannel = systemClock.currentTimeMillis();
}

2.将shell和Command结合在一起,组成一个数组

代码语言:java复制
private static String[] formulateShellCommand(String shell, String command) {
  String[] shellArgs = shell.split("\s ");
  String[] result = new String[shellArgs.length   1];
  System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
  result[shellArgs.length] = command;
 return result;
}

3.如果是中断导致的,则需要中断线程

代码语言:javascript复制
catch (Exception e) {
 logger.error("Failed while running command: "   command, e);
 if (e instanceof InterruptedException) {
    Thread.currentThread().interrupt();
  }
} 

4.超时时间判断

代码语言:java复制
private boolean timeout() {
 return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
}

四、阅读收获

  • 学习开发一个EventDrivenSource
  • 部分代码适用于命令执行模块
  • ExecSource确实无法感知数据丢失
  • 了解ExecSource的使用

“了解ExecSource的使用” 放在最后是因为ExecSource确实不太适合用在生产环境

参考文章:https://blog.csdn.net/qianshangding0708/article/details/49736019

0 人点赞