版权声明:本文为博主原创文章,转载请注明源地址。 https://cloud.tencent.com/developer/article/1433452
《facebook/swift:构建thrift http server(1)》
《facebook/swift:构建thrift http server(2)–HttpServerCodec》
《facebook/swift:构建thrift http server(3)–CORS跨域》
在上一篇博客中解决了thrift http sever的CORS跨域问题,但前端依然没有服务端的正常响应。看来还存在问题。
继续研究Netty的代码。
ThriftMessage
通过跟踪服务端收到的HTTP POST请求在管道(ChannelPipeline)中的传递流程找到了问题:
如下是实现将网络请求分发到thrift服务实例(NiftyProcessor)的ChannelHandler实例com.facebook.nifty.core.NiftyDispatcher的messageReceived
方法实现代码:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception
{
if (e.getMessage() instanceof ThriftMessage) {
ThriftMessage message = (ThriftMessage) e.getMessage();
if (taskTimeoutMillis > 0) {
message.setProcessStartTimeMillis(System.currentTimeMillis());
}
checkResponseOrderingRequirements(ctx, message);
TNiftyTransport messageTransport = new TNiftyTransport(ctx.getChannel(), message);
TTransportPair transportPair = TTransportPair.fromSingleTransport(messageTransport);
TProtocolPair protocolPair = duplexProtocolFactory.getProtocolPair(transportPair);
TProtocol inProtocol = protocolPair.getInputProtocol();
TProtocol outProtocol = protocolPair.getOutputProtocol();
processRequest(ctx, message, messageTransport, inProtocol, outProtocol);
}
else {
// 如果不是message是ThriftMessage实例则继续向下传递
ctx.sendUpstream(e);
}
}
请注意第一行的判断语句,只有收到的消息是ThriftMessage实例,才会响应请求。
而HttpServerCodec只会将收到的网络请求解析为DefaultHttpRequest。
参见 org.jboss.netty.handler.codec.http.HttpRequestDecoder
所以问题搞清楚了。我们需要一个ChannelUpstreamHandler
将DefaultHttpRequest
转换为ThriftMessage
向后传递给NiftyDispatcher
,同时也需要一个ChannelDownstreamHandler
将ThriftMessage
转为DefaultHttpResponse
向前传递给HttpServerCodec
由HttpResponseEncoder
进一步封装成HTTP响应数据最后发送给前端。
ThriftXHRDecoder
HTTP request解码器ThriftXHRDecoder实现如下:
代码语言:javascript复制import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import com.facebook.nifty.core.ThriftMessage;
import com.facebook.nifty.core.ThriftTransportType;
/**
* XHR(XML Http Request)解码器<br>
* 将{@link HttpRequest}请求的内容数据(content)转为{@link ThriftMessage},
* 提供给{@link com.facebook.nifty.core.NiftyDispatcher}
* @author guyadong
*
*/
public class ThriftXHRDecoder extends SimpleChannelUpstreamHandler {
public ThriftXHRDecoder() {
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if(e.getMessage() instanceof HttpRequest){
HttpRequest request = (HttpRequest)e.getMessage();
if(request.getContent().readable() && HttpMethod.POST.equals(request.getMethod())){
// 非FRAME数据
ThriftMessage thriftMessage = new ThriftMessage(request.getContent(),ThriftTransportType.UNFRAMED);
ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), thriftMessage, e.getRemoteAddress()));
return;
}
}
super.messageReceived(ctx, e);
}
}
ThriftXHREncoder
ThriftMessage
编码器ThriftXHREncoder实现如下:
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import com.facebook.nifty.core.ThriftMessage;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* XHR(XML Http Request)编码器<br>
* 将{@link com.facebook.nifty.core.NiftyDispatcher}输出的
* {@link ThriftMessage}响应数据转为{@link DownstreamMessageEvent},
*
* @author guyadong
*
*/
public class ThriftXHREncoder extends SimpleChannelDownstreamHandler {
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if(e.getMessage() instanceof ThriftMessage){
ThriftMessage thriftMessage = (ThriftMessage)e.getMessage();
if(thriftMessage.getBuffer().readable()){
switch (thriftMessage.getTransportType()) {
case UNFRAMED:
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
response.setContent(thriftMessage.getBuffer());
ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel(),
Channels.future(ctx.getChannel()), response, e.getRemoteAddress()));
return;
default:
throw new UnsupportedOperationException(
thriftMessage.getTransportType().name() " transport is not supported");
}
}
}
super.writeRequested(ctx, e);
}
}
修改ChannelPipeline
有了ThriftXHRDecoder
和ThriftXHREncoder
就可以将它们添加到frameCodec
之后,dispatcher
之前,基于上一篇中addCorsHandlerIfHttp
方法代码修改如下,
/**
* 添加CORS Handler和XHR编解码器
*/
protected void addCorsHandlerIfHttp(){
if(HTTP_TRANSPORT.equals(thriftServerConfig.getTransportName())){
try {
// 反射获取私有的成员NettyServerTransport
final NettyServerTransport nettyServerTransport = ReflectionUtils.valueOfField(thriftServer, "transport");
// 反射获取私有的成员ChannelPipelineFactory
Field pipelineFactory = NettyServerTransport.class.getDeclaredField("pipelineFactory");
{
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true); //Field 的 modifiers 是私有的
modifiersField.setInt(pipelineFactory, pipelineFactory.getModifiers() & ~Modifier.FINAL);
}
pipelineFactory.setAccessible(true);
final ChannelPipelineFactory channelPipelineFactory = (ChannelPipelineFactory) pipelineFactory.get(nettyServerTransport);
final Netty3CorsConfig corsConfig = Netty3CorsConfigBuilder.forAnyOrigin()
.allowedRequestMethods(POST,GET,OPTIONS)
.allowedRequestHeaders("Origin","Content-Type","Accept","application","x-requested-with")
.build();
ChannelPipelineFactory factoryWithCORS = new ChannelPipelineFactory(){
@Override
public ChannelPipeline getPipeline() throws Exception {
// 修改 ChannelPipeline,在frameCodec后(顺序)增加CORS handler,XHR编解码器
ChannelPipeline cp = channelPipelineFactory.getPipeline();
cp.addAfter("frameCodec", "thriftXHRDecoder", new ThriftXHRDecoder());
cp.addAfter("frameCodec", "thriftXHREncoder", new ThriftXHREncoder());
cp.addAfter("frameCodec", "cors", new Netty3CorsHandler(corsConfig));
return cp;
}};
// 修改nettyServerTransport的私有常量pipelineFactory
pipelineFactory.set(nettyServerTransport, factoryWithCORS);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
}
修改完毕后再执行test_js.html,thrift http server终于有响应了。
而且数据结果正常
遗留问题
至此,基于facebook/swift构建的thrift http server已经基本可以正常工作,但还是存在一个小问题。HTTP响应只会在前端空闲超时后才会发送响应数据到前端,所以ThriftServerConfig
的IdleConnectionTimeout如果设置过大(比如默认值60s),那么前端要在发送请求60秒后才会收到服务端的响应,这个问题一直到目前还没有找到。所以目前的办法是将这个值设置在10ms,就基本不会影响前端的响应延迟。
零零散散的写了好几篇文章,贴出的代码都比较零碎,完整的代码参见码云仓库
https://gitee.com/l0km/common-java/tree/master/common-thrift/src/main/java/com/facebook/swift/service
重要的修改在ThriftServerService