聊聊PowerJob的HttpProcessor

2024-01-01 20:42:03 浏览数 (2)

本文主要研究一下PowerJob的HttpProcessor

BasicProcessor

tech/powerjob/worker/core/processor/sdk/BasicProcessor.java

代码语言:javascript复制
public interface BasicProcessor {

    /**
     * 核心处理逻辑
     * 可通过 {@link TaskContext#getWorkflowContext()} 方法获取工作流上下文
     *
     * @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数
     * @return 处理结果,msg有长度限制,超长会被裁剪,不允许返回 null
     * @throws  Exception 异常,允许抛出异常,但不推荐,最好由业务开发者自己处理
     */
    ProcessResult process(TaskContext context) throws Exception;
}

BasicProcessor是适用于单机运行的基础处理器,它定于了process方法

CommonBasicProcessor

tech/powerjob/official/processors/CommonBasicProcessor.java

代码语言:javascript复制
@Slf4j
public abstract class CommonBasicProcessor implements BasicProcessor {

    @Override
    public ProcessResult process(TaskContext ctx) throws Exception {

        OmsLogger omsLogger = ctx.getOmsLogger();
        String securityDKey = getSecurityDKey();
        if (SecurityUtils.disable(securityDKey)) {
            String msg = String.format("%s is not enabled, please set '-D%s=true' to enable it", this.getClass().getSimpleName(), securityDKey);
            omsLogger.warn(msg);
            return new ProcessResult(false, msg);
        }

        String status = "unknown";
        Stopwatch sw = Stopwatch.createStarted();

        omsLogger.info("using params: {}", CommonUtils.parseParams(ctx));

        try {
            ProcessResult result = process0(ctx);
            omsLogger.info("execute succeed, using {}, result: {}", sw, result);
            status = result.isSuccess() ? "succeed" : "failed";
            return result;
        } catch (Throwable t) {
            status = "exception";
            omsLogger.error("execute failed!", t);
            return new ProcessResult(false, ExceptionUtils.getMessage(t));
        } finally {
            log.info("{}|{}|{}|{}|{}", getClass().getSimpleName(), ctx.getJobId(), ctx.getInstanceId(), status, sw);
        }
    }

    protected abstract ProcessResult process0(TaskContext taskContext) throws Exception;

    protected String getSecurityDKey() {
        return null;
    }
}

CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果

HttpProcessor

代码语言:javascript复制
public class HttpProcessor extends CommonBasicProcessor {

    /**
     * Default timeout is 60 seconds.
     */
    private static final int DEFAULT_TIMEOUT = 60;
    private static final int HTTP_SUCCESS_CODE = 200;
    private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();

    @Override
    public ProcessResult process0(TaskContext taskContext) throws Exception {
        OmsLogger omsLogger = taskContext.getOmsLogger();
        HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);

        if (httpParams == null) {
            String message = "httpParams is null, please check jobParam configuration.";
            omsLogger.warn(message);
            return new ProcessResult(false, message);
        }

        if (StringUtils.isEmpty(httpParams.url)) {
            return new ProcessResult(false, "url can't be empty!");
        }

        if (!httpParams.url.startsWith("http")) {
            httpParams.url = "http://"   httpParams.url;
        }
        omsLogger.info("request url: {}", httpParams.url);

        // set default method
        if (StringUtils.isEmpty(httpParams.method)) {
            httpParams.method = "GET";
            omsLogger.info("using default request method: GET");
        } else {
            httpParams.method = httpParams.method.toUpperCase();
            omsLogger.info("request method: {}", httpParams.method);
        }

        // set default mediaType
        if (!"GET".equals(httpParams.method)) {
            // set default request body
            if (StringUtils.isEmpty(httpParams.body)) {
                httpParams.body = new JSONObject().toJSONString();
                omsLogger.warn("try to use default request body:{}", httpParams.body);
            }
            if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) {
                httpParams.mediaType = "application/json";
                omsLogger.warn("try to use 'application/json' as media type");
            }
        }

        // set default timeout
        if (httpParams.timeout == null) {
            httpParams.timeout = DEFAULT_TIMEOUT;
        }
        omsLogger.info("request timeout: {} seconds", httpParams.timeout);
        OkHttpClient client = getClient(httpParams.timeout);

        Request.Builder builder = new Request.Builder().url(httpParams.url);
        if (httpParams.headers != null) {
            httpParams.headers.forEach((k, v) -> {
                builder.addHeader(k, v);
                omsLogger.info("add header {}:{}", k, v);
            });
        }

        switch (httpParams.method) {
            case "PUT":
            case "DELETE":
            case "POST":
                MediaType mediaType = MediaType.parse(httpParams.mediaType);
                omsLogger.info("mediaType: {}", mediaType);
                RequestBody requestBody = RequestBody.create(mediaType, httpParams.body);
                builder.method(httpParams.method, requestBody);
                break;
            default:
                builder.get();
        }

        Response response = client.newCall(builder.build()).execute();
        omsLogger.info("response: {}", response);

        String msgBody = "";
        if (response.body() != null) {
            msgBody = response.body().string();
        }

        int responseCode = response.code();
        String res = String.format("code:%d, body:%s", responseCode, msgBody);
        boolean success = true;
        if (responseCode != HTTP_SUCCESS_CODE) {
            success = false;
            omsLogger.warn("{} url: {} failed, response code is {}, response body is {}",
                    httpParams.method, httpParams.url, responseCode, msgBody);
        }
        return new ProcessResult(success, res);
    }

    //......
}    

HttpProcessor继承了CommonBasicProcessor,其process0方法使用CommonUtils.parseParams(taskContext)获取参数,解析为HttpParams类型,然后构建Request.Builder,针对PUT、DELETE、POST构建RequestBody,然后通过client.newCall(builder.build()).execute()执行请求获取返回结果,根据http status来判断是否success

CommonUtils.parseParams

tech/powerjob/official/processors/util/CommonUtils.java

代码语言:javascript复制
public class CommonUtils {

    private CommonUtils() {

    }

    public static String parseParams(TaskContext context) {
        // 工作流中的总是优先使用 jobParams
        if (context.getWorkflowContext().getWfInstanceId() != null) {
            return context.getJobParams();
        }
        if (StringUtils.isNotEmpty(context.getInstanceParams())) {
            return context.getInstanceParams();
        }
        return context.getJobParams();
    }
}

CommonUtils的parseParams方法针对使用工作流返回jobParams,否则优先返回instanceParams

HttpParams

代码语言:javascript复制
    @Data
    public static class HttpParams {
        /**
         * POST / GET / PUT / DELETE
         */
        private String method;
        /**
         * the request url
         */
        private String url;
        /**
         * application/json
         * application/xml
         * image/png
         * image/jpeg
         * image/gif
         */
        private String mediaType;

        private String body;

        private Map<String, String> headers;

        /**
         * timeout for complete calls
         */
        private Integer timeout;
    }

HttpParams定义了method、url、mediaType、body、headers、timeout属性

getClient

代码语言:javascript复制
    private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();

    private static OkHttpClient getClient(Integer timeout) {
        return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder()
                .connectTimeout(Duration.ZERO)
                .readTimeout(Duration.ZERO)
                .writeTimeout(Duration.ZERO)
                .callTimeout(timeout, TimeUnit.SECONDS)
                .build());
    }

getClient将相同timeout的OkHttpClient进行了缓存

小结

PowerJob的HttpProcessor继承了CommonBasicProcessor,它接收HttpParams参数,然后使用okhttp进行请求;CommonBasicProcessor是一个抽象类,它声明实现BasicProcessor接口,它定义了process0抽象方法,同时还提供了getSecurityDKey方法,其process方法先通过SecurityUtils.disable(securityDKey)先判断是否开启,没有开启直接返回;接着使用try catch来执行process0,最后finally的时候打印一下结果。

0 人点赞