本文解析jenkins主节点向从节点发送远程请求过程的源码
job执行shell命令入口
入口位于jenkins-core项目下的CommandInterpreter类
public boolean perform(AbstractBuild<?,?> build, Launcher launcher, TaskListener listener) throws InterruptedException { ... r = join(launcher.launch().cmds(buildCommandLine(script)).envs(envVars).stdout(listener).pwd(ws).start()); ... }
通过上面的start方法启动一个新的进程
跳转到jenkins-core项目下的launcher类
public Proc start() throws IOException { return launch(this); }
然后调用launcher类的内部类RemoteLauncher的launch方法
public Proc launch(ProcStarter ps) throws IOException { ... return new ProcImpl(getChannel().call(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, ps.reverseStdin, out, ps.reverseStdout, err, ps.reverseStderr, ps.quiet, workDir, listener))); ... }
通过Channel.call()发送远程请求
通过getChannel().call()调用remoting项目(即从节点的slave.jar)的Channel类的call方法
public <V,T extends Throwable> V call(Callable<V,T> callable) throws IOException, T, InterruptedException { ... request = new UserRequest<V, T>(this, callable); UserResponse<V,T> r = request.call(this); return r.retrieve(this, UserRequest.getClassLoader(callable)); ... }
然后通过new UserRequest<V, T>(this, callable)方法来序列化callable(传入的是实现callable接口的RemoteLaunchCallable)并创建字节数组
通过new UserReques初始化UserRequest
位于remoting项目UserRequest类的构造方法
public UserRequest(Channel local, Callable<?,EXC> c) throws IOException { exports = local.startExportRecording(); try { request = serialize(c,local); } finally { exports.stopRecording(); } this.toString = c.toString(); ClassLoader cl = getClassLoader(c); classLoaderProxy = RemoteClassLoader.export(cl,local); }
1.通过serialize方法序列化RemoteLaunchCallable并创建字节数组
private byte[] _serialize(Object o, final Channel channel) throws IOException { Channel old = Channel.setCurrent(channel); try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos; if (channel.remoteCapability.supportsMultiClassLoaderRPC()) oos = new MultiClassLoaderSerializer.Output(channel,baos); else oos = new ObjectOutputStream(baos); oos.writeObject(o); return baos.toByteArray(); } finally { Channel.setCurrent(old); } }
2. 创建一个ClassLoader代理
ClassLoader cl = getClassLoader(c); classLoaderProxy = RemoteClassLoader.export(cl,local);
3. 回到getChannel().call()方法,发送UserRequest到slave节点
UserResponse<V,T> r = request.call(this); return r.retrieve(this, UserRequest.getClassLoader(callable));
通过request.call(this)调用remoting项目的Request类的call方法来发送UserRequest
//Sends this request to a remote system, and blocks until we receives a response. public final RSP call(Channel channel) throws EXC, InterruptedException, IOException { ... // Channel.send() locks channel, and there are other call sequences // ( like Channel.terminate()->Request.abort()->Request.onCompleted() ) // that locks channel -> request, so lock objects in the same order synchronized(channel) { synchronized(this) { response=null; channel.pendingCalls.put(id,this); channel.send(this); } } ... }
通过channel.send(this)方法,channel把UserRequest对象写入输出流
然后等待response响应