facebook/swift:构建thrift http server(4)--ThriftXHRDecoder,ThriftXHREncoder

2019-05-25 20:37:59 浏览数 (1)

版权声明:本文为博主原创文章,转载请注明源地址。 https://cloud.tencent.com/developer/article/1433452

《facebook/swift:构建thrift http server(1)》

《facebook/swift:构建thrift http server(2)–HttpServerCodec》

《facebook/swift:构建thrift http server(3)–CORS跨域》

在上一篇博客中解决了thrift http sever的CORS跨域问题,但前端依然没有服务端的正常响应。看来还存在问题。

继续研究Netty的代码。

ThriftMessage

通过跟踪服务端收到的HTTP POST请求在管道(ChannelPipeline)中的传递流程找到了问题:

如下是实现将网络请求分发到thrift服务实例(NiftyProcessor)的ChannelHandler实例com.facebook.nifty.core.NiftyDispatcher的messageReceived方法实现代码:

代码语言:javascript复制
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
            throws Exception
    {    	
        if (e.getMessage() instanceof ThriftMessage) {
            ThriftMessage message = (ThriftMessage) e.getMessage();
            if (taskTimeoutMillis > 0) {
                message.setProcessStartTimeMillis(System.currentTimeMillis());
            }
            checkResponseOrderingRequirements(ctx, message);

            TNiftyTransport messageTransport = new TNiftyTransport(ctx.getChannel(), message);
            TTransportPair transportPair = TTransportPair.fromSingleTransport(messageTransport);
            TProtocolPair protocolPair = duplexProtocolFactory.getProtocolPair(transportPair);
            TProtocol inProtocol = protocolPair.getInputProtocol();
            TProtocol outProtocol = protocolPair.getOutputProtocol();

            processRequest(ctx, message, messageTransport, inProtocol, outProtocol);
        }
        else {
        	// 如果不是message是ThriftMessage实例则继续向下传递
            ctx.sendUpstream(e);
        }
    }

请注意第一行的判断语句,只有收到的消息是ThriftMessage实例,才会响应请求。

而HttpServerCodec只会将收到的网络请求解析为DefaultHttpRequest。

参见 org.jboss.netty.handler.codec.http.HttpRequestDecoder

所以问题搞清楚了。我们需要一个ChannelUpstreamHandlerDefaultHttpRequest转换为ThriftMessage向后传递给NiftyDispatcher,同时也需要一个ChannelDownstreamHandlerThriftMessage转为DefaultHttpResponse向前传递给HttpServerCodecHttpResponseEncoder进一步封装成HTTP响应数据最后发送给前端。

ThriftXHRDecoder

HTTP request解码器ThriftXHRDecoder实现如下:

代码语言:javascript复制
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;

import com.facebook.nifty.core.ThriftMessage;
import com.facebook.nifty.core.ThriftTransportType;

/**
 * XHR(XML Http Request)解码器<br>
 * 将{@link HttpRequest}请求的内容数据(content)转为{@link ThriftMessage},
 * 提供给{@link com.facebook.nifty.core.NiftyDispatcher}
 * @author guyadong
 *
 */
public class ThriftXHRDecoder extends SimpleChannelUpstreamHandler {
	
	public ThriftXHRDecoder() {
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		if(e.getMessage() instanceof HttpRequest){
			HttpRequest request = (HttpRequest)e.getMessage();
			if(request.getContent().readable() && HttpMethod.POST.equals(request.getMethod())){
				// 非FRAME数据
				ThriftMessage thriftMessage = new ThriftMessage(request.getContent(),ThriftTransportType.UNFRAMED);
				ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(), thriftMessage, e.getRemoteAddress()));
				return;
			}
		}
		super.messageReceived(ctx, e);
	}
}

ThriftXHREncoder

ThriftMessage编码器ThriftXHREncoder实现如下:

代码语言:javascript复制
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import com.facebook.nifty.core.ThriftMessage;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
 * XHR(XML Http Request)编码器<br>
 * 将{@link com.facebook.nifty.core.NiftyDispatcher}输出的
 * {@link ThriftMessage}响应数据转为{@link DownstreamMessageEvent},
 * 
 * @author guyadong
 *
 */
public class ThriftXHREncoder extends SimpleChannelDownstreamHandler {

	@Override
	public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		if(e.getMessage() instanceof ThriftMessage){
			ThriftMessage thriftMessage = (ThriftMessage)e.getMessage();
			if(thriftMessage.getBuffer().readable()){
				switch (thriftMessage.getTransportType()) {
				case UNFRAMED:
                    DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
                    response.setContent(thriftMessage.getBuffer());
					ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel(), 
							Channels.future(ctx.getChannel()), response, e.getRemoteAddress()));
					return;
				default:
	                throw new UnsupportedOperationException(
	                		thriftMessage.getTransportType().name()  " transport is not supported");
				}
			}
		}
		super.writeRequested(ctx, e);
	}

}

修改ChannelPipeline

有了ThriftXHRDecoderThriftXHREncoder就可以将它们添加到frameCodec之后,dispatcher之前,基于上一篇中addCorsHandlerIfHttp方法代码修改如下,

代码语言:javascript复制
	/**
	 * 添加CORS Handler和XHR编解码器
	 */
	protected void addCorsHandlerIfHttp(){
		if(HTTP_TRANSPORT.equals(thriftServerConfig.getTransportName())){
			try {
				// 反射获取私有的成员NettyServerTransport
				final NettyServerTransport nettyServerTransport = ReflectionUtils.valueOfField(thriftServer, "transport");
				// 反射获取私有的成员ChannelPipelineFactory
				Field pipelineFactory = NettyServerTransport.class.getDeclaredField("pipelineFactory");
				{
					Field modifiersField = Field.class.getDeclaredField("modifiers");
					modifiersField.setAccessible(true); //Field 的 modifiers 是私有的
					modifiersField.setInt(pipelineFactory, pipelineFactory.getModifiers() & ~Modifier.FINAL);
				}
				pipelineFactory.setAccessible(true);
				final ChannelPipelineFactory channelPipelineFactory = (ChannelPipelineFactory) pipelineFactory.get(nettyServerTransport);
				final Netty3CorsConfig corsConfig = Netty3CorsConfigBuilder.forAnyOrigin()
					.allowedRequestMethods(POST,GET,OPTIONS)
					.allowedRequestHeaders("Origin","Content-Type","Accept","application","x-requested-with")
					.build();
				ChannelPipelineFactory factoryWithCORS = new ChannelPipelineFactory(){

					@Override
					public ChannelPipeline getPipeline() throws Exception {
						// 修改 ChannelPipeline,在frameCodec后(顺序)增加CORS handler,XHR编解码器
						ChannelPipeline cp = channelPipelineFactory.getPipeline();
						cp.addAfter("frameCodec", "thriftXHRDecoder", new ThriftXHRDecoder());
						cp.addAfter("frameCodec", "thriftXHREncoder", new ThriftXHREncoder());
						cp.addAfter("frameCodec", "cors", new Netty3CorsHandler(corsConfig));
						return cp;
					}};
				// 修改nettyServerTransport的私有常量pipelineFactory
				pipelineFactory.set(nettyServerTransport, factoryWithCORS);
			} catch (Exception e) {
				Throwables.throwIfUnchecked(e);
				throw new RuntimeException(e);
			}
		}
	}

修改完毕后再执行test_js.html,thrift http server终于有响应了。

而且数据结果正常

遗留问题

至此,基于facebook/swift构建的thrift http server已经基本可以正常工作,但还是存在一个小问题。HTTP响应只会在前端空闲超时后才会发送响应数据到前端,所以ThriftServerConfig的IdleConnectionTimeout如果设置过大(比如默认值60s),那么前端要在发送请求60秒后才会收到服务端的响应,这个问题一直到目前还没有找到。所以目前的办法是将这个值设置在10ms,就基本不会影响前端的响应延迟。

零零散散的写了好几篇文章,贴出的代码都比较零碎,完整的代码参见码云仓库

https://gitee.com/l0km/common-java/tree/master/common-thrift/src/main/java/com/facebook/swift/service

重要的修改在ThriftServerService

0 人点赞