前些日子写了Java Redis stream的基本API实践和封装文章:Redis stream Java API实践,自然地也需要对这些API进行性能测试。
总的来说,Redis stream的API跟list操作的API差不多,就是添加、读取、删除一类,对于消费组的API,由于实际工作中并没有使用,所以暂时搁置。
性能测试思路
这里我设计了三个用例:
- 添加消息
- 读取消息
- 添加、删除消息
前两个用例比较简单,所以这里就不演示了,本次用例展示两种动态测试模型的使用方式:动态线程
和动态QPS
。
简述一下第三个用例的思路:首先就是创建一个添加消息,然后成功之后返回一个redis.clients.jedis.StreamEntryID
,再根据这个返回值进行删除操作。实际工作中暂时没想到这个实际的场景,以后再遇到其他使用场景再来进行性能测试。
性能测试用例
这里我先分享一下动态线程模型的用例,我把用到的参数都写成了静态的变量形式,这样比较方便,如果是实际业务场景中,应该传比较复杂的。不过问题不大,都可以写在com.funtest.groovytest.RedisStreamTest.FunTester#doing
里面,或者在动态QPS
的闭包中。
package com.funtest.groovytest
import com.funtester.base.constaint.FunThread
import com.funtester.db.redis.RedisBase
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunConcurrent
import redis.clients.jedis.params.XAddParams
import java.util.concurrent.atomic.AtomicInteger
class RedisStreamTest extends SourceCode {
static RedisBase driver
static def index = new AtomicInteger(0)
static def desc = "Redis stream性能测试实践"
static def map
static def key = "FunTesterStream"
static def params
public static void main(String[] args) {
driver = new RedisBase("127.0.0.1", 6379)
params = XAddParams.xAddParams().maxLen(1000)
map = new HashMap<String, Integer>()
map.put("key1", 1)
map.put("key2", 2)
driver.xadd(key, params, map)
params = XAddParams.xAddParams()
output driver.xlen(key)
new FunConcurrent(new FunTester(),).start()
}
private static class FunTester extends FunThread {
FunTester() {
super(null, desc index.getAndIncrement())
}
@Override
protected void doing() throws Exception {
def xadd = driver.xadd(key, params, map)
driver.xdel(key,xadd)
}
@Override
FunTester clone() {
return new FunTester()
}
}
}
下面分享一下动态QPS模型的压测用例:
代码语言:javascript复制package com.funtest.groovytest
import com.funtester.db.redis.RedisBase
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import redis.clients.jedis.params.XAddParams
class RedisStreamQPSTest extends SourceCode {
static RedisBase driver
static def desc = "Redis stream性能测试实践"
static def map
static def key = "FunTesterStream"
static def params
public static void main(String[] args) {
driver = new RedisBase("127.0.0.1", 6379)
params = XAddParams.xAddParams().maxLen(1000)
map = new HashMap<String, Integer>()
map.put("key1", 1)
map.put("key2", 2)
driver.xadd(key, params, map)
params = XAddParams.xAddParams()
def test = {
def xadd = driver.xadd(key, params, map)
driver.xdel(key,xadd)
}
new FunQpsConcurrent(test).start()
}
}
控制台输出
代码语言:javascript复制15:50:41.091 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
15:50:41.474 main redis连接池IP:127.0.0.1,端口:6379,超时设置:5000
15:50:42.728 Deamon 守护线程开启!
15:50:45.761 main 当前设计QPS:1,实际QPS:0 活跃线程数:0 单线程效率:0
15:50:50.780 main 当前设计QPS:1,实际QPS:1 活跃线程数:1 单线程效率:1
15:50:55.798 main 当前设计QPS:1,实际QPS:1 活跃线程数:0 单线程效率:1
进程已结束,退出代码130 (interrupted by signal 2: SIGINT)