jenkins源码分析 —— 发送远程请求(二)

2021-01-14 10:50:08 浏览数 (1)

本文解析jenkins主节点向从节点发送远程请求过程的源码

job执行shell命令入口

入口位于jenkins-core项目下的CommandInterpreter类

public boolean perform(AbstractBuild<?,?> build, Launcher launcher, TaskListener listener) throws InterruptedException { ... r = join(launcher.launch().cmds(buildCommandLine(script)).envs(envVars).stdout(listener).pwd(ws).start()); ... }

通过上面的start方法启动一个新的进程

跳转到jenkins-core项目下的launcher类

public Proc start() throws IOException { return launch(this); }

然后调用launcher类的内部类RemoteLauncher的launch方法

public Proc launch(ProcStarter ps) throws IOException { ... return new ProcImpl(getChannel().call(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, ps.reverseStdin, out, ps.reverseStdout, err, ps.reverseStderr, ps.quiet, workDir, listener))); ... }

通过Channel.call()发送远程请求

通过getChannel().call()调用remoting项目(即从节点的slave.jar)的Channel类的call方法

public <V,T extends Throwable> V call(Callable<V,T> callable) throws IOException, T, InterruptedException { ... request = new UserRequest<V, T>(this, callable); UserResponse<V,T> r = request.call(this); return r.retrieve(this, UserRequest.getClassLoader(callable)); ... }

然后通过new UserRequest<V, T>(this, callable)方法来序列化callable(传入的是实现callable接口的RemoteLaunchCallable)并创建字节数组

通过new UserReques初始化UserRequest

位于remoting项目UserRequest类的构造方法

public UserRequest(Channel local, Callable<?,EXC> c) throws IOException { exports = local.startExportRecording(); try { request = serialize(c,local); } finally { exports.stopRecording(); } this.toString = c.toString(); ClassLoader cl = getClassLoader(c); classLoaderProxy = RemoteClassLoader.export(cl,local); }

1.通过serialize方法序列化RemoteLaunchCallable并创建字节数组

private byte[] _serialize(Object o, final Channel channel) throws IOException { Channel old = Channel.setCurrent(channel); try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos; if (channel.remoteCapability.supportsMultiClassLoaderRPC()) oos = new MultiClassLoaderSerializer.Output(channel,baos); else oos = new ObjectOutputStream(baos); oos.writeObject(o); return baos.toByteArray(); } finally { Channel.setCurrent(old); } }

2. 创建一个ClassLoader代理

ClassLoader cl = getClassLoader(c); classLoaderProxy = RemoteClassLoader.export(cl,local);

3. 回到getChannel().call()方法,发送UserRequest到slave节点

UserResponse<V,T> r = request.call(this); return r.retrieve(this, UserRequest.getClassLoader(callable));

通过request.call(this)调用remoting项目的Request类的call方法来发送UserRequest

//Sends this request to a remote system, and blocks until we receives a response. public final RSP call(Channel channel) throws EXC, InterruptedException, IOException { ... // Channel.send() locks channel, and there are other call sequences // ( like Channel.terminate()->Request.abort()->Request.onCompleted() ) // that locks channel -> request, so lock objects in the same order synchronized(channel) { synchronized(this) { response=null; channel.pendingCalls.put(id,this); channel.send(this); } } ... }

通过channel.send(this)方法,channel把UserRequest对象写入输出流

然后等待response响应

0 人点赞