ExecSource是很多人接触flume时第一个使用的source,这里我简单的分析下这个source的使用与实现。
一、FlumeUserGuide
FlumeUserGuide:https://flume.apache.org/FlumeUserGuide.html
这里我简单翻译和总结下:
Exec source在启动时运行Unix命令,并且期望它会不断的在标准输出中产生数据。 (stderr默认会被丢弃).如果进程因为某些原因退出,Exce Source也将退出并且不会再产生数据。
详细配置说明如下: 加粗的是必配项:
注意:ExecSource无法感知数据的丢失
,比如channel满的时候数据发送失败。为了更健壮的数据可靠性,推荐:Spooling Directory Source、Taildir Source或者通过flume的SDK直接实现。
如此一个简单的配置,我们就能实时地将/var/log/secure下的日志发送到flume的channel里面。
代码语言:javascript复制a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
简单说下shell和command,shell主要是区分Bash 或 Powershell。command作为一个参数传递给shell执行,这样command就可以使用shell的一些特性,如wildcards, back ticks, pipes, loops, conditionals,如果shell没有指定,一般会使用默认值‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’。
二、核心流程源码分析
代码语言:java复制public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable{
xxx;
}
主要函数/类有:
- start()
- stop()
- configure(Context context)
- static class ExecRunnable implements Runnable
- static class StderrReader extends Thread
1. start():
代码比较少,功能比较简单:
- 线程池
- 构建ExecRunnable线程对象,传入配置文件的参数
- 启动计数器
2. stop():
功能同样比较简单
- kill runner
- shutdown executor
- stop sourceCounter
3. configure(Context context)
获取excel里面的那些参数
4. ExecRunnable 主要实现类,核心 关键
static class ExecRunnable implements Runnable
核心部分是一个
代码语言:java复制do{
xxx;
}while(restart);
所以stop时需要将restart置为false,防止do重做。
do内容主要包括:
1) shell与command处理,然后执行:
代码语言:java复制if (shell != null) {
String[] commandArgs = formulateShellCommand(shell, command);
process = Runtime.getRuntime().exec(commandArgs);
} else {
String[] commandArgs = command.split("\s ");
process = new ProcessBuilder(commandArgs).start();
}
2)启动reader
代码语言:java复制reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
3)启动stderrReader
代码语言:java复制StderrReader stderrReader = new StderrReader(new BufferedReader(
new InputStreamReader(process.getErrorStream(), charset)), logStderr);
xxxx;
4)定时任务,每batchTimeout时间批量将eventList flush
代码语言:javascript复制future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if (!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occurred when processing event batch", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
5)读取和发送数据
代码语言:java复制while ((line = reader.readLine()) != null) {
sourceCounter.incrementEventReceivedCount();
synchronized (eventList) {
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if (eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}
synchronized (eventList) {
if (!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}
这个通用的小技巧,几乎所有的批处理代码里面都有这类写法:
1. 没到最后一行,达到bufferCount或者到timeout一批处理列表里面的数据
2. 读完最后一行(字节流没数据了),执行批处理
5. StderrReader处理stderr,这里不再赘述
static class StderrReader extends Thread
三、其他代码片段
1.批量flush Event
代码语言:java复制private void flushEventBatch(List<Event> eventList) {
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
2.将shell和Command结合在一起,组成一个数组
代码语言:java复制private static String[] formulateShellCommand(String shell, String command) {
String[] shellArgs = shell.split("\s ");
String[] result = new String[shellArgs.length 1];
System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
result[shellArgs.length] = command;
return result;
}
3.如果是中断导致的,则需要中断线程
代码语言:javascript复制catch (Exception e) {
logger.error("Failed while running command: " command, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
4.超时时间判断
代码语言:java复制private boolean timeout() {
return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
}
四、阅读收获
- 学习开发一个EventDrivenSource
- 部分代码适用于命令执行模块
- ExecSource确实无法感知数据丢失
- 了解ExecSource的使用
“了解ExecSource的使用” 放在最后是因为ExecSource确实不太适合用在生产环境
参考文章:https://blog.csdn.net/qianshangding0708/article/details/49736019