聊聊PowerJob的TransportServiceAware

2024-01-19 16:51:29 浏览数 (2)

本文主要研究一下PowerJob的TransportServiceAware

TransportServiceAware

tech/powerjob/server/remote/aware/TransportServiceAware.java

代码语言:javascript复制
public interface TransportServiceAware extends PowerJobAware {

    void setTransportService(TransportService transportService);
}

TransportServiceAware继承了PowerJobAware,它定义了setTransportService方法

FriendActor

tech/powerjob/server/remote/server/FriendActor.java

代码语言:javascript复制
@Slf4j
@Component
@Actor(path = S4S_PATH)
public class FriendActor implements TransportServiceAware {

    private TransportService transportService;

    /**
     * 处理存活检测的请求
     */
    @Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING)
    public AskResponse onReceivePing(Ping ping) {
        return AskResponse.succeed(transportService.allProtocols());
    }

    @Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING)
    public AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) {

        AskResponse response = new AskResponse();
        response.setSuccess(true);
        try {
            response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req)));
        } catch (Throwable t) {
            log.error("[FriendActor] process remote request[{}] failed!", req, t);
            response.setSuccess(false);
            response.setMessage(ExceptionUtils.getMessage(t));
        }
        return response;
    }

    @Override
    public void setTransportService(TransportService transportService) {
        this.transportService = transportService;
    }
}

FriendActor用于处理服务器之间的通讯,它定义了onReceivePing、onReceiveRemoteProcessReq这两个handler,其中onReceivePing返回transportService.allProtocols(),onReceiveRemoteProcessReq则执行RemoteRequestProcessor.processRemoteRequest(req)

RemoteProcessReq

tech/powerjob/server/remote/server/redirector/RemoteProcessReq.java

代码语言:javascript复制
@Getter
@Setter
@Accessors(chain = true)
public class RemoteProcessReq implements PowerSerializable {

    private String className;
    private String methodName;
    private String[] parameterTypes;

    private Object[] args;

}

RemoteProcessReq定义了className、methodName、parameterTypes、args属性

RemoteRequestProcessor

tech/powerjob/server/remote/server/redirector/RemoteRequestProcessor.java

代码语言:javascript复制
public class RemoteRequestProcessor {

    public static Object processRemoteRequest(RemoteProcessReq req) throws ClassNotFoundException {
        Object[] args = req.getArgs();
        String[] parameterTypes = req.getParameterTypes();
        Class<?>[] parameters = new Class[parameterTypes.length];

        for (int i = 0; i < parameterTypes.length; i  ) {
            parameters[i] = Class.forName(parameterTypes[i]);
            Object arg = args[i];
            if (arg != null) {
                args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]);
            }
        }

        Class<?> clz = Class.forName(req.getClassName());

        Object bean = SpringUtils.getBean(clz);
        Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters);

        assert method != null;
        return ReflectionUtils.invokeMethod(method, bean, args);
    }
}

RemoteRequestProcessor的processRemoteRequest主要是通过Class.forName加载对应的类,然后从spring中获取对应的bean,再通过ReflectionUtils查找方法,最后执行invoke

小结

TransportServiceAware继承了PowerJobAware,它定义了setTransportService方法;FriendActor用于处理服务器之间的通讯,它定义了onReceivePing、onReceiveRemoteProcessReq这两个handler,其中onReceivePing返回transportService.allProtocols(),onReceiveRemoteProcessReq则执行RemoteRequestProcessor.processRemoteRequest(req)。

0 人点赞