概述
JMeter 默认单机压测引擎,运行 JMeter 测试,直接用于本地 GUI 和非 GUI 调用,或者RemoteJMeterEngineImpl 在服务器模式下运行时启动。
StandardJMeterEngine继承JMeterEngine 和Runable接口,本质上是一个线程对象。
API地址:
API地址:https://jmeter.apache.org/api/org/apache/jmeter/engine/StandardJMeterEngine.htm
简要解读:
- HashTree 是依赖的数据结构;
- SearchByClass 用来查找 HashTree 中的所有节点,并把节点实例化为真正的对象,例如TestPlan/ThreadGroup/Sampler/ResultCollector 在 HashTree 中本来都是只是配置,全部通过 SearchByClass 实例化的;
- 实例化出来的对象如果是 TestStateListener 类型,则会在有生命周期的函数回调,测试前调 testStarted,结束时调 testEnded, 比如 ResultCollector是该类型的一种,在结束的时候回调 testEnded 方法完成 report 的写入;
- PreCompiler 用来解析 Arguments, 把 TestPlan 节点中配置的参数作为JMeterVariables 加入到测试线程上线文中,同时进行参数和函数替换操作;
- ThreadGroup 用来用来管理一组线程,包括线程的个数/启动/关闭等;
- StopTest 作为其内部类对外不可见,作为一个 Runnable,作用是异步停止测试,stopTest方法也是通过该内部类实现的。
源码解读
主要参数变量
代码语言:txt复制 private static volatile StandardJMeterEngine engine;
private volatile boolean running = false;
private volatile boolean active = false;
private volatile boolean serialized = false;
private volatile boolean tearDownOnShutdown = false;
private HashTree test;
private final String host;
构造函数
代码语言:txt复制 // 不带参数的构造函数,用于单机压测
public StandardJMeterEngine() {
this(null);
}
// 带参数的构造函数,用于分布式压测
public StandardJMeterEngine(String host) {
this.host = host;
// Hack to allow external control
initSingletonEngine(this);
}
主要方法
configure(HashTree testTree)
配置引擎,HashTree 是 JMeter 执行测试依赖的数据结构,configure 在执行测试之前进行配置测试数据。
代码语言:txt复制 // 从HashTree中解析出TestPlan,并创建
SearchByClass<TestPlan> testPlan = new SearchByClass<>(TestPlan.class);
testTree.traverse(testPlan);
Object[] plan = testPlan.getSearchResults().toArray();
if (plan.length == 0) {
throw new IllegalStateException("Could not find the TestPlan class!");
}
TestPlan tp = (TestPlan) plan[0];
// 一个测试中可能会有多个线程组,如果serialized为true,则StandardJMeterEngine会串行的去执行这些线程组,每启动一个ThreadGroup主线程都会等它结束;否则就并行执行所有的线程组。
serialized = tp.isSerialized();
// tearDownOnShutdown与PostThreadGroup配合使用的,用来做清理工作
tearDownOnShutdown = tp.isTearDownOnShutdown();
active = true;
test = testTree;
runTest
runTest()方法启动测试,该方法调用的是继承Runnable后重写的run()方法。
StandardJMeterEngine实现了Runable接口,本质上是一个线程,通过运行run()方法启动测试。
代码语言:txt复制 public void runTest() throws JMeterEngineException {
if (host != null){
long now=System.currentTimeMillis();
System.out.println("Starting the test on host " host " @ " new Date(now) " (" now ")"); // NOSONAR Intentional
}
try {
// StandardJMeterEngine本质上是一个线程
Thread runningThread = new Thread(this, "StandardJMeterEngine");
runningThread.start();
} catch (Exception err) {
stopTest();
throw new JMeterEngineException(err);
}
}
run
执行StandardJMeterEngine的run方法
JMeterContextService类,初始化操作:numberOfActiveThreads=0, 重置 testStart时间,创建线程变量JMeterContext
代码语言:txt复制 JMeterContextService.startTest();
PreCompiler类,继承HashTree类,初始化testPlan的Arguments类变量,替换参数化变量,存入JMeterVariables
代码语言:txt复制 PreCompiler compiler = new PreCompiler();
test.traverse(compiler);
利用 SearchByClass 解析所有 TestStateListener类
TestStateListener类:生命周期的函数回调,测试前调 testStarted,结束时调 testEnded
代码语言:txt复制 SearchByClass<TestStateListener> testListeners = new SearchByClass<>(TestStateListener.class); // TL - S&E
test.traverse(testListeners);
//将testlist中的元素添加进去
testListeners.getSearchResults().addAll(testList);
testList.clear();
测试前调用testStart方法,比如TestPlan会加载依赖包,BackendListener会启动守护线程收集SamplerResult,ResultCollector类会递增 instanceCount,初始化 fileOutput,DataSourceElement类创建跟数据库的连接等等。
代码语言:txt复制 notifyTestListenersOfStart(testListeners);
private void notifyTestListenersOfStart(SearchByClass<TestStateListener> testListeners) {
for (TestStateListener tl : testListeners.getSearchResults()) {
if (tl instanceof TestBean) {
TestBeanHelper.prepare((TestElement) tl);
}
if (host == null) {
tl.testStarted();
} else {
tl.testStarted(host);
}
}
}
利用 SearchByClass 解析所有 ThreadGroup(包括SetupThreadGroup,ThreadGroup, PostThreadGroup)
代码语言:txt复制 SearchByClass<SetupThreadGroup> setupSearcher = new SearchByClass<>(SetupThreadGroup.class);
SearchByClass<AbstractThreadGroup> searcher = new SearchByClass<>(AbstractThreadGroup.class);
SearchByClass<PostThreadGroup> postSearcher = new SearchByClass<>(PostThreadGroup.class);
test.traverse(setupSearcher);
test.traverse(searcher);
test.traverse(postSearcher);
Iterator<SetupThreadGroup> setupIter = setupSearcher.getSearchResults().iterator();
Iterator<AbstractThreadGroup> iter = searcher.getSearchResults().iterator();
Iterator<PostThreadGroup> postIter = postSearcher.getSearchResults().iterator();
实例化一个 ListenerNotifier 实例,用来通知事件发生,比如backendListener, resultCollector等
代码语言:txt复制 ListenerNotifier notifier = new ListenerNotifier();
启动所有 SetupThreadGroup (一般情况下没有 SetupThreadGroup )并等待结束
代码语言:txt复制 if (setupIter.hasNext()) {
log.info("Starting setUp thread groups");
while (running && setupIter.hasNext()) {//for each setup thread group
AbstractThreadGroup group = setupIter.next();
groupCount ;
String groupName = group.getName();
log.info("Starting setUp ThreadGroup: {} : {} ", groupCount, groupName);
startThreadGroup(group, groupCount, setupSearcher, testLevelElements, notifier);
// 判断ThreadGroup是否串行执行
if (serialized && setupIter.hasNext()) {
log.info("Waiting for setup thread group: {} to finish before starting next setup group",
groupName);
group.waitThreadsStopped();
}
}
log.info("Waiting for all setup thread groups to exit");
//wait for all Setup Threads To Exit
waitThreadsStopped();
log.info("All Setup Threads have ended");
groupCount=0;
JMeterContextService.clearTotalThreads();
}
groups.clear();
等待所有的ThreadGroup结束
代码语言:txt复制
JMeterContextService.getContext().setSamplingStarted(true);
boolean mainGroups = running; // still running at this point, i.e. setUp was not cancelled
while (running && iter.hasNext()) {// for each thread group
AbstractThreadGroup group = iter.next();
//ignore Setup and Post here. We could have filtered the searcher. but then
//future Thread Group objects wouldn't execute.
if (group instanceof SetupThreadGroup ||
group instanceof PostThreadGroup) {
continue;
}
groupCount ;
String groupName = group.getName();
log.info("Starting ThreadGroup: {} : {}", groupCount, groupName);
startThreadGroup(group, groupCount, searcher, testLevelElements, notifier);
// 判断ThreadGroup是否串行执行
if (serialized && iter.hasNext()) {
log.info("Waiting for thread group: {} to finish before starting next group", groupName);
// 调用ThreadGroup的waitThreadsStopped方法
group.waitThreadsStopped();
}
} // end of thread groups
if (groupCount == 0){ // No TGs found
log.info("No enabled thread groups found");
} else {
if (running) {
log.info("All thread groups have been started");
} else {
log.info("Test stopped - no more thread groups will be started");
}
}
//wait for all Test Threads To Exit
waitThreadsStopped();
groups.clear();
执行所有PostThreadGroup(一般没有), 并等待结束
代码语言:txt复制 if (postIter.hasNext()){
groupCount = 0;
JMeterContextService.clearTotalThreads();
log.info("Starting tearDown thread groups");
if (mainGroups && !running) { // i.e. shutdown/stopped during main thread groups
running = tearDownOnShutdown; // re-enable for tearDown if necessary
}
while (running && postIter.hasNext()) {//for each setup thread group
AbstractThreadGroup group = postIter.next();
groupCount ;
String groupName = group.getName();
log.info("Starting tearDown ThreadGroup: {} : {}", groupCount, groupName);
startThreadGroup(group, groupCount, postSearcher, testLevelElements, notifier);
if (serialized && postIter.hasNext()) {
log.info("Waiting for post thread group: {} to finish before starting next post group", groupName);
group.waitThreadsStopped();
}
}
waitThreadsStopped(); // wait for Post threads to stop
}
测试结束调用testListener 的 testEnded 方法,比如:
JavaSampler 会调用真正跑的 AbstractJavaSamplerClient 的 teardownTest 方法,可以打印该 JavaSamplerClient 测试总共花费的时间;
ResultCollector 用来将测试结果写如文件生成;
reportTestPlan 用来关闭文件。
BackendListener 用来清理BackendListenerClients
代码语言:txt复制 notifyTestListenersOfEnd(testListeners);
JMeterContextService.endTest();
private void notifyTestListenersOfEnd(SearchByClass<TestStateListener> testListeners) {
log.info("Notifying test listeners of end of test");
for (TestStateListener tl : testListeners.getSearchResults()) {
try {
if (host == null) {
tl.testEnded();
} else {
tl.testEnded(host);
}
} catch (Exception e) {
log.warn("Error encountered during shutdown of " tl.toString(),e);
}
}
if (host != null) {
log.info("Test has ended on host {} ", host);
long now=System.currentTimeMillis();
System.out.println("Finished the test on host " host " @ " new Date(now) " (" now ")" // NOSONAR Intentional
(EXIT_AFTER_TEST ? " - exit requested." : ""));
if (EXIT_AFTER_TEST){
// 远程server退出
exit();
}
}
active=false;
}
startThreadGroup
启动ThreadGroup线程组
代码语言:txt复制 private void startThreadGroup(AbstractThreadGroup group, int groupCount, SearchByClass<?> searcher, List<?> testLevelElements, ListenerNotifier notifier)
{
try {
// 获取线程组的属性配置
int numThreads = group.getNumThreads();
JMeterContextService.addTotalThreads(numThreads);
boolean onErrorStopTest = group.getOnErrorStopTest();
boolean onErrorStopTestNow = group.getOnErrorStopTestNow();
boolean onErrorStopThread = group.getOnErrorStopThread();
boolean onErrorStartNextLoop = group.getOnErrorStartNextLoop();
String groupName = group.getName();
log.info("Starting {} threads for group {}.", numThreads, groupName);
if (onErrorStopTest) {
log.info("Test will stop on error");
} else if (onErrorStopTestNow) {
log.info("Test will stop abruptly on error");
} else if (onErrorStopThread) {
log.info("Thread will stop on error");
} else if (onErrorStartNextLoop) {
log.info("Thread will start next loop on error");
} else {
log.info("Thread will continue on error");
}
// 获取ThreadGroup的HashTree对象
ListedHashTree threadGroupTree = (ListedHashTree) searcher.getSubTree(group);
threadGroupTree.add(group, testLevelElements);
// 将threadGroup加入List列表
groups.add(group);
// 执行ThreadGroup对象的start方法
group.start(groupCount, notifier, threadGroupTree, this);
} catch (JMeterStopTestException ex) { // NOSONAR Reported by log
JMeterUtils.reportErrorToUser("Error occurred starting thread group :" group.getName() ", error message:" ex.getMessage()
", rnsee log file for more details", ex);
return; // no point continuing
}
}
waitThreadsStopped
等待线程结束
代码语言:txt复制 private void waitThreadsStopped() {
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
// 调用ThreadGroup的waitThreadsStopped方法
threadGroup.waitThreadsStopped();
}
}
stopTest
停止测试,若 now 为 true 则停止动作立即执行;若为 false 则等待当前正在执行的测试至少执行完一个 iteration。
代码语言:txt复制 public synchronized void stopTest() {
stopTest(true);
}
public synchronized void stopTest(boolean now) {
Thread stopThread = new Thread(new StopTest(now));
stopThread.start();
}
调用内部类StopTest,由于该类继承Runnable,调用该类的run方法
代码语言:txt复制 public void run() {
running = false;
resetSingletonEngine();
// 立即停止线程的标志
if (now) {
// 停止线程组
tellThreadGroupsToStop();
pause(10L * countStillActiveThreads());
// 验证线程组是否停止
boolean stopped = verifyThreadsStopped();
// 停止失败
if (!stopped) { // we totally failed to stop the test
if (JMeter.isNonGUI()) {
log.error(JMeterUtils.getResString("stopping_test_failed")); //$NON-NLS-1$
if (SYSTEM_EXIT_ON_STOP_FAIL) { // default is true
log.error("Exiting");
System.out.println("Fatal error, could not stop test, exiting"); // NOSONAR Intentional
System.exit(1); // NOSONAR Intentional
} else {
System.out.println("Fatal error, could not stop test"); // NOSONAR Intentional
}
} else { // 非立即停止
JMeterUtils.reportErrorToUser(
JMeterUtils.getResString("stopping_test_failed"), //$NON-NLS-1$
JMeterUtils.getResString("stopping_test_title")); //$NON-NLS-1$
}
} // else will be done by threadFinished()
} else {
stopAllThreadGroups();
}
}
停止ThreadGroup线程组
代码语言:txt复制 private void tellThreadGroupsToStop() {
// ConcurrentHashMap does not need protecting
for (AbstractThreadGroup threadGroup : groups) {
threadGroup.tellThreadsToStop();
}
}
验证线程组是否全部停止
代码语言:txt复制 private boolean verifyThreadsStopped() {
boolean stoppedAll = true;
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
stoppedAll = stoppedAll && threadGroup.verifyThreadsStopped();
}
return stoppedAll;
}
等待线程组执行完
代码语言:txt复制 private void stopAllThreadGroups() {
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
threadGroup.stop();
}
}