2024-4-18 群讨论:关于异步HttpClient如何测试验证

2024-05-25 09:01:33 浏览数 (1)

群友问题:群友想尽量快的将请求发到三方接口,不考虑三方接口的压力。如何开发并验证?

思路:

  1. 肯定要使用 WebClient 这种异步非阻塞的 io 接口。或者 vertx 这种框架的。虚拟线程先不考虑,因为截止目前不推荐虚拟线程上生产: https://zhuanlan.zhihu.com/p/685013298
  2. 首先的你如果只考虑你的代码的性能,不考虑调用的接口的响应程度,只想尽量把压力吐给他。你不能在测试代码直接调用三方接口,需要先隔离开,先保证你的代码到达你想要的效果。因为这个响应时间,影响因素太多了,并且不稳定。比如你和三方接口之间的带宽,你的网卡,对方是否有限流,以及如果你不限制链接数量,会被 cdn 就拦截限流了。这些都不知道。比如受限于带宽,假设你非阻塞同时发 10000 个请求,那么其实很多是在网络排队等着发。
  3. 只测自己代码,并且想模拟延迟返回。一般测试本地。我一般用 TestContainers 包裹 httpbin 镜像(kennethreitz/httpbin:latest),针对你的场景你可以在每个请求加一个调用时间然后调用 /anything 接口之后收集响应,anything 接口就是返回你发的所有参数。如果你想模拟带宽,可以再加上 toxicproxy 镜像,通过 toxicproxy 去访问 httpbin 去做一定的带宽限流。如果你想模拟接口延迟,可以通过 /delay/0.1 (延迟100ms)实现。

一个代码示例(简单测试,没有仔细调参,只是为了展示测试思路),首先编写 TestContainer 基类,用于复用(这里并不是所有的都用的上,我放出来只是供大家以后测试各种场景):

import eu.rekawek.toxiproxy.Proxy;

import eu.rekawek.toxiproxy.ToxiproxyClient;

import eu.rekawek.toxiproxy.model.ToxicDirection;

import org.testcontainers.containers.GenericContainer;

import org.testcontainers.containers.Network;

import org.testcontainers.containers.ToxiproxyContainer;

import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;

@Testcontainers

public class CommonMicroServiceTest {

private static final Network network = Network.newNetwork();

private static final String HTTPBIN = "httpbin";

public static final int HTTPBIN_PORT = 80;

public static final GenericContainer<?> HTTPBIN_CONTAINER

= new GenericContainer<>("kennethreitz/httpbin:latest")

.withExposedPorts(HTTPBIN_PORT)

.withNetwork(network)

.withNetworkAliases(HTTPBIN);

/**

* <a href="https://java.testcontainers.org/modules/toxiproxy/">toxiproxy</a>

* 使用 toxiproxy 封装 httpbin

* 可以使用 toxiproxy 模拟网络故障等情况

* 可以用的 port 范围是 8666~8697

*/

private static final ToxiproxyContainer TOXIPROXY_CONTAINER = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0")

.withNetwork(network);

private static final int GOOD_HTTPBIN_PROXY_PORT = 8666;

private static final int READ_TIMEOUT_HTTPBIN_PROXY_PORT = 8667;

private static final int RESET_PEER_HTTPBIN_PROXY_PORT = 8668;

public static final String GOOD_HOST;

public static final int GOOD_PORT;

/**

* 以下代表请求已经发出到服务端,但是响应超时,或者不能响应(比如服务器重启)

*/

public static final String READ_TIMEOUT_HOST;

public static final int READ_TIMEOUT_PORT;

public static final String RESET_PEER_HOST;

public static final int RESET_PEER_PORT;

/**

* 以下代表请求都没有发出去,TCP 链接都没有建立

*/

public static final String CONNECT_TIMEOUT_HOST = "localhost";

/**

* 端口 1 一定连不上的

*/

public static final int CONNECT_TIMEOUT_PORT = 1;

static {

//不使用 @Container 注解管理容器声明周期,因为我们需要在静态块生成代理,必须在这之前启动容器

//不用担心容器不会被关闭,因为 testcontainers 会启动一个 ryuk 容器,用于监控并关闭所有容器

HTTPBIN_CONTAINER.start();

TOXIPROXY_CONTAINER.start();

final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(TOXIPROXY_CONTAINER.getHost(), TOXIPROXY_CONTAINER.getControlPort());

try {

Proxy proxy = toxiproxyClient.createProxy("good", "0.0.0.0:" GOOD_HTTPBIN_PROXY_PORT, HTTPBIN ":" HTTPBIN_PORT);

//关闭流量,会 READ TIME OUT

proxy = toxiproxyClient.createProxy("read_timeout", "0.0.0.0:" READ_TIMEOUT_HTTPBIN_PROXY_PORT, HTTPBIN ":" HTTPBIN_PORT);

proxy.toxics().bandwidth("UP_DISABLE", ToxicDirection.UPSTREAM, 0);

proxy.toxics().bandwidth("DOWN_DISABLE", ToxicDirection.DOWNSTREAM, 0);

proxy = toxiproxyClient.createProxy("connect_timeout", "0.0.0.0:" RESET_PEER_HTTPBIN_PROXY_PORT, HTTPBIN ":" HTTPBIN_PORT);

proxy.toxics().resetPeer("UP_SLOW_CLOSE", ToxicDirection.UPSTREAM, 1);

proxy.toxics().resetPeer("DOWN_SLOW_CLOSE", ToxicDirection.DOWNSTREAM, 1);

} catch (IOException e) {

throw new RuntimeException(e);

}

GOOD_HOST = TOXIPROXY_CONTAINER.getHost();

GOOD_PORT = TOXIPROXY_CONTAINER.getMappedPort(GOOD_HTTPBIN_PROXY_PORT);

READ_TIMEOUT_HOST = TOXIPROXY_CONTAINER.getHost();

READ_TIMEOUT_PORT = TOXIPROXY_CONTAINER.getMappedPort(READ_TIMEOUT_HTTPBIN_PROXY_PORT);

RESET_PEER_HOST = TOXIPROXY_CONTAINER.getHost();

RESET_PEER_PORT = TOXIPROXY_CONTAINER.getMappedPort(RESET_PEER_HTTPBIN_PROXY_PORT);

}

}

测试代码:

@Test

public void test() {

// 创建一个自定义的连接提供者

ConnectionProvider provider = ConnectionProvider.builder("customConnectionProvider")

.maxConnections(100) // 增加最大连接数,这个不能太大,否则会被 cloudflare 等 cdn 认为是类 ddos 攻击

.pendingAcquireMaxCount(10000) // 增加等待队列的大小

.build();

HttpClient httpClient = HttpClient.create(provider)

.responseTimeout(Duration.ofMillis(100000)); // 响应超时时间

WebClient build = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();

List<Mono<String>> monos = Lists.newArrayList();

for (int i = 0; i < 10000; i ) {

//我这里使用了 TestContainer 的端口模拟 0.1s 延迟

Mono<String> stringMono = build.get().uri("http://localhost:"

CommonMicroServiceTest.HTTPBIN_CONTAINER.getMappedPort(HTTPBIN_PORT) "/delay/0.1")

.retrieve().bodyToMono(String.class);

monos.add(stringMono);

}

long start = System.currentTimeMillis();

String block = Mono.zip(monos, objects -> {

log.info("{}", objects);

return "ok";

}).block();

log.info("block: {} in {}ms", block, System.currentTimeMillis() - start);

}

测试输出:block: ok in 10362ms

基本符合预期:

  1. 10000 个请求,每个请求 0.1s,链接池 100
  2. 耗时约等于:0.1*10000/100=10s

另外,我一般用 toxicproxy 模拟服务端断开链接,请求发到服务端但是服务端无法响应,请求发不到服务端,发一半到服务端之后的发不到等等等等,在编写微服务基础框架的时候非常好用~

个人简介:个人业余研究了 AI LLM 微调与 RAG,目前成果是微调了三个模型:

  1. 一个模型是基于 whisper 模型的微调,使用我原来做的精翻的视频按照语句段落切分的片段,并尝试按照方言类别,以及技术类别分别尝试微调的成果。用于视频字幕识别。
  2. 一个模型是基于 Mistral Large 的模型的微调,识别提取视频课件的片段,辅以实际的课件文字进行识别微调。用于识别课件的片段。
  3. 最后一个模型是基于 Claude 3 的模型微调,使用我之前制作的翻译字幕,与 AWS、Go 社区、CNCF 生态里面的官方英文文档以及中文文档作为语料,按照内容段交叉拆分,进行微调,用于字幕翻译。

0 人点赞