tomcat-集群实现-源码解析

2022-12-01 15:42:39 浏览数 (2)

上文:tomcat类加载-源码解析


背景

tomcat支持单机模式与集群模式,通过集群模式来提供应用的高可用,保障业务的稳定。

如果不懂集群跟单机可以参考以往文章:单机模式与集群模式的区别?

tomcat如何配置集群?

在server.xml 中的engine或host中添加如下:

这里可能有同学想问engine跟host是什么关系,其实之前文章有讲过,这里再啰嗦一下。

一个engine可以包含多个host,一个host仅包含自己,engine的集群可以供host使用,而host仅供自己自使用这是有区别的。

代码语言:javascript复制
<Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster"/>

注意:这里如果不做其它配置默认tomcat组播地址是:228.0.0.4端口:45564

更多集群配置请参考:https://tomcat.apache.org/tomcat-9.0-doc/cluster-howto.html

tomcat集群节点之间是如何通讯的?

Apache Tribes是Tomcat的一个通讯模块,支持服务器集群中的组通信。也就是说tomcat集群之间是通过tribes模块进行通讯的。

Apache Tribes使用了什么技术进行通讯的?

tribes通讯默认以tcp方式进行通讯,由于tcp是可靠的连接方式,所以保障了集群之间的通讯的稳定。当然Tribes还支持 UDP和类似于rpc方式的通讯方式;

源码阅读

相关组件说明

组件/类名称

作用

说明

cluster

作为本地主机集群客户端/服务组件

主要是负责集群内的实例之间的通讯,发送/接收集群消息。

tribes

作为各节点的通讯模块

主要用于集群之间各节点的通信;

HA

用于集群的复制

用于集群之间的节点复制。

channel

消息通知通道

不同的类型有不同的通道;

DeltaManager

会话管理器

用于管理session的信息

BackupMnager

会话备分管理器

每次会话结束都会做一个操作。

当集群之间的互相通讯会通过ClusterSessionListener 进行监听。

ClusterListener 作为集群消息的监听接口 ClusterSessionListener 为该接口的实现。

初始化方法:org.apache.catalina.ha.tcp.SimpleTcpCluster#startInternal

这个类很重要,所有的监听器和相关的配置都在这里已经配置了,所以后续的监听器才生效,特别集群的监听。

代码语言:javascript复制
@Override
protected void startInternal() throws LifecycleException {

    if (log.isInfoEnabled()) {
        log.info(sm.getString("simpleTcpCluster.start"));
    }

    try {
        //这里就是初始化监听器、集群故障转移、阈门容器等
        checkDefaults();
        //注册通道
        registerClusterValve();
        //添加成员监听器
        channel.addMembershipListener(this);
        //添加事件监听器
        channel.addChannelListener(this);
        //设置集群节点名称
        channel.setName(getClusterName()   "-Channel");
        //启动
        channel.start(channelStartOptions);
        //部署方式的启动
        if (clusterDeployer != null) {
            clusterDeployer.start();
        }
        //注册到成员列表中
        registerMember(channel.getLocalMember(false));
    } catch (Exception x) {
        log.error(sm.getString("simpleTcpCluster.startUnable"), x);
        throw new LifecycleException(x);
    }
    //设置当前状态为启动
    setState(LifecycleState.STARTING);
}

停止服务方法:org.apache.catalina.ha.tcp.SimpleTcpCluster#stopInternal

代码语言:javascript复制
@Override
protected void stopInternal() throws LifecycleException {
    //设置当前的状态为停止
    setState(LifecycleState.STOPPING);
    //从成员列表中移除
    unregisterMember(channel.getLocalMember(false));
    //部署方式不为空则进行停止
    if (clusterDeployer != null) {
        clusterDeployer.stop();
    }
    //清空当前集群管理列表
    this.managers.clear();
    try {
        //不为空设置集群为空;
        if ( clusterDeployer != null ) {
            clusterDeployer.setCluster(null);
        }
        //停止通道
        channel.stop(channelStartOptions);
        //移除通道监听器
        channel.removeChannelListener(this);
        //移除成员监听器
        channel.removeMembershipListener(this);
        //解绑当前集群值(赋为空)
        this.unregisterClusterValve();
    } catch (Exception x) {
        log.error(sm.getString("simpleTcpCluster.stopUnable"), x);
    }
}

MembershipListener 为成员添加和移动的监听器,SimpleTcpCluster 就是以此为实现的,其中有两个重要的方法有:

代码语言:javascript复制
/**
添加成功
 * A member was added to the group
 * @param member Member - the member that was added
 */
public void memberAdded(Member member);

/**

移除成员
 * A member was removed from the group<br>
 * If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD data
 * @param member Member
 * @see Member#SHUTDOWN_PAYLOAD
 */
public void memberDisappeared(Member member);

以上的初始化完以后,那么监听启动后生效,这时候可以看如下方法监听具体的内容实现。

消息监听类:org.apache.catalina.ha.session.ClusterSessionListener

代码语言:javascript复制
/**
 * Receive replicated SessionMessage form other cluster node.
 * @author Peter Rossbach
 */
public class ClusterSessionListener extends ClusterListener {

    private static final Log log =
        LogFactory.getLog(ClusterSessionListener.class);
    private static final StringManager sm = StringManager.getManager(ClusterSessionListener.class);

    //--Constructor---------------------------------------------

    public ClusterSessionListener() {
        // NO-OP
    }

    //--Logic---------------------------------------------------

    /**
     * 接收广播消息
     * Callback from the cluster, when a message is received, The cluster will
     * broadcast it invoking the messageReceived on the receiver.
     *
     * @param myobj
     * ClusterMessage - the message received from the cluster
     */
    @Override
    public void messageReceived(ClusterMessage myobj) {
        //判断消息类型SessionMessage才接收
        if (myobj instanceof SessionMessage) {
            //转换为消息格式
            SessionMessage msg = (SessionMessage) myobj;
            String ctxname = msg.getContextName();
            //check if the message is a EVT_GET_ALL_SESSIONS,
            //if so, wait until we are fully started up
            Map<String,ClusterManager> managers = cluster.getManagers() ;
            //不为空进行循环发接收确认
            if (ctxname == null) {
                for (Map.Entry<String, ClusterManager> entry :
                        managers.entrySet()) {
                    if (entry.getValue() != null) {
                        entry.getValue().messageDataReceived(msg);
                    } else {
                        //this happens a lot before the system has started
                        // up
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("clusterSessionListener.noManager", entry.getKey()));
                        }
                    }
                }
            } else {
                //为空的情况下先进行获取上下文名称不为空,再进行回复收到。
                ClusterManager mgr = managers.get(ctxname);
                if (mgr != null) {
                    mgr.messageDataReceived(msg);
                } else {
                    if (log.isWarnEnabled()) {
                        log.warn(sm.getString("clusterSessionListener.noManager", ctxname));
                    }

                    // A no context manager message is replied in order to avoid
                    // timeout of GET_ALL_SESSIONS sync phase.
                    //如果消息类型为全部,则进行发送其他节点
                    if (msg.getEventType() == SessionMessage.EVT_GET_ALL_SESSIONS) {
                        SessionMessage replymsg = new SessionMessageImpl(ctxname,
                                SessionMessage.EVT_ALL_SESSION_NOCONTEXTMANAGER,
                                null, "NO-CONTEXT-MANAGER","NO-CONTEXT-MANAGER-"   ctxname);
                        cluster.send(replymsg, msg.getAddress());
                    }
                }
            }
        }
    }

    /**
     * Accept only SessionMessage
     * 只接收SessionMessage的消息
     *
     * @param msg
     * ClusterMessage
     * @return boolean - returns true to indicate that messageReceived should be
     * invoked. If false is returned, the messageReceived method will
     * not be invoked.
     */
    @Override
    public boolean accept(ClusterMessage msg) {
        return msg instanceof SessionMessage;
    }
}

当用户请求tomcat后,将这个reuqest会传到Engine进行处理,然后会转到JvmRouteBinderValve,对不符合的会话ID进行处理。

注意:JvmRouteBinderValve 也作为集群节点崩溃的时候故障节点的转移; 类图如下:

核心类:

启动方法:org.apache.catalina.ha.session.JvmRouteBinderValve#startInternal

代码语言:javascript复制
@Override
protected synchronized void startInternal() throws LifecycleException {
    //为空则进行获取容器中的获取与此容器相关的集群
    if (cluster == null) {
        Cluster containerCluster = getContainer().getCluster();
        //判断是否为CatalinaCluster类型,如果是则进行赋值
        if (containerCluster instanceof CatalinaCluster) {
            setCluster((CatalinaCluster)containerCluster);
        }
    }

    if (log.isInfoEnabled()) {
        log.info(sm.getString("jvmRoute.valve.started"));
        if (cluster == null) {
            log.info(sm.getString("jvmRoute.noCluster"));
        }
    }
    //调用父类启动方法
    super.startInternal();
}

停止的方法:org.apache.catalina.ha.session.JvmRouteBinderValve#stopInternal

代码语言:javascript复制
@Override
protected synchronized void stopInternal() throws LifecycleException {
    //先停止父类
    super.stopInternal();
    //集群赋空
    cluster = null;
    //托管的会话数赋为0
    numberOfSessions = 0;
    if (log.isInfoEnabled()) {
        log.info(sm.getString("jvmRoute.valve.stopped"));
    }

}

改变sessionId的方法:org.apache.catalina.ha.session.JvmRouteBinderValve#changeRequestSessionID

代码语言:javascript复制
protected void changeRequestSessionID(Request request, String sessionId, String newSessionID) {
    //设置到请求头中,并且设置到cookie中
    request.changeSessionId(newSessionID);

    // set original sessionid at request, to allow application detect the
    // change
    //不为空设置到请求属性中
    if (sessionIdAttribute != null && !sessionIdAttribute.isEmpty()) {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("jvmRoute.set.originalsessionid",sessionIdAttribute,sessionId));
        }
        request.setAttribute(sessionIdAttribute, sessionId);
    }
}

故障转移核心方法:org.apache.catalina.ha.session.JvmRouteBinderValve#handleJvmRoute

代码语言:javascript复制
protected void handleJvmRoute(
        Request request, String sessionId, String localJvmRoute) {
    // get requested jvmRoute.
    String requestJvmRoute = null;
    //判断会话id中是否包含.如果是进行截取
    int index = sessionId.indexOf('.');
    if (index > 0) {
        requestJvmRoute = sessionId.substring(index   1);
    }
    //如果会话id有值则进行生成新的会话id然后进行转移
    if (requestJvmRoute != null && !requestJvmRoute.equals(localJvmRoute)) {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("jvmRoute.failover", requestJvmRoute,
                    localJvmRoute, sessionId));
        }
        Session catalinaSession = null;
        try {
            catalinaSession = getManager(request).findSession(sessionId);
        } catch (IOException e) {
            // Hups!
        }
        String id = sessionId.substring(0, index);
        String newSessionID = id   "."   localJvmRoute;
        // OK - turnover the session and inform other cluster nodes
        if (catalinaSession != null) {
            changeSessionID(request, sessionId, newSessionID,
                    catalinaSession);
            numberOfSessions  ;
        } else {
            try {
                catalinaSession = getManager(request).findSession(newSessionID);
            } catch (IOException e) {
                // Hups!
            }
            if (catalinaSession != null) {
                // session is rewrite at other request, rewrite this also
                changeRequestSessionID(request, sessionId, newSessionID);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("jvmRoute.cannotFindSession",sessionId));
                }
            }
        }
    }
}

这里要注意下这个类:org.apache.catalina.ha.session.DeltaManager 是用于集群会话的管理器,通过这个来保证一个会话在集群中的任务一台机器都有效。实现类图如下。

主要的核心几个方法如下:

org.apache.catalina.ha.session.DeltaManager#createSession(java.lang.String, boolean) 创建session

代码语言:javascript复制
public Session createSession(String sessionId, boolean distribute) {
    //创建session
    DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
    //如果为true发送整个集群
    if (distribute) {
        sendCreateSession(session.getId(), session);
    }
    if (log.isDebugEnabled()) {
        log.debug(sm.getString("deltaManager.createSession.newSession",
                session.getId(), Integer.valueOf(sessions.size())));
    }
    return session;
}

发送整个集群的方法:org.apache.catalina.ha.session.DeltaManager#sendCreateSession

代码语言:javascript复制
protected void sendCreateSession(String sessionId, DeltaSession session) {
    //判断集群用户是否大于0才进行
        if(cluster.getMembers().length > 0 ) {
            //创建集群消息
        SessionMessage msg =
            new SessionMessageImpl(getName(),
                                   SessionMessage.EVT_SESSION_CREATED,
                                   null,
                                   sessionId,
                                   sessionId   "-"   System.currentTimeMillis());
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("deltaManager.sendMessage.newSession", name, sessionId));
        }
        //用会话时间作为创建时间
        msg.setTimestamp(session.getCreationTime());
        //集群事件 1
        counterSend_EVT_SESSION_CREATED  ;
        //发送
        send(msg);
    }
}

序列化代码块:org.apache.catalina.ha.session.DeltaManager#serializeSessionId

代码语言:javascript复制
protected byte[] serializeSessionId(String sessionId) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeUTF(sessionId);
    oos.flush();
    oos.close();
    return bos.toByteArray();
}

反序列化代码块:org.apache.catalina.ha.session.DeltaManager#deserializeSessionId

代码语言:javascript复制
protected String deserializeSessionId(byte[] data) throws IOException {
    ReplicationStream ois = getReplicationStream(data);
    String sessionId = ois.readUTF();
    ois.close();
    return sessionId;
}初始化:org.apache.catalina.ha.session.DeltaManager#startInternal
代码语言:javascript复制
@Override
protected synchronized void startInternal() throws LifecycleException {
    //调用父类初始化
    super.startInternal();

    // Load unloaded sessions, if any
    try {
        //节点不为空
        if (cluster == null) {
            log.error(sm.getString("deltaManager.noCluster", getName()));
            return;
        } else {
            if (log.isInfoEnabled()) {
                String type = "unknown" ;
                if( cluster.getContainer() instanceof Host){
                    type = "Host" ;
                } else if( cluster.getContainer() instanceof Engine){
                    type = "Engine" ;
                }
                log.info(sm.getString("deltaManager.registerCluster",
                        getName(), type, cluster.getClusterName()));
            }
        }
        if (log.isInfoEnabled()) {
            log.info(sm.getString("deltaManager.startClustering", getName()));
        }
        //获取所有节点会话
        getAllClusterSessions();

    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("deltaManager.managerLoad"), t);
    }
    //设置当前状态为启动状态
    setState(LifecycleState.STARTING);
}停止的方法:org.apache.catalina.ha.session.DeltaManager#stopInternal
代码语言:javascript复制
@Override
protected synchronized void stopInternal() throws LifecycleException {

    if (log.isDebugEnabled()) {
        log.debug(sm.getString("deltaManager.stopped", getName()));
    }
    //设置状态为停止
    setState(LifecycleState.STOPPING);

    // Expire all active sessions
    if (log.isInfoEnabled()) {
        log.info(sm.getString("deltaManager.expireSessions", getName()));
    }
    //获取所有会话然后全部释放掉
    Session sessions[] = findSessions();
    for (Session value : sessions) {
        DeltaSession session = (DeltaSession) value;
        if (!session.isValid()) {
            continue;
        }
        try {
            session.expire(true, isExpireSessionsOnShutdown());
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
        }
    }

    // Require a new random number generator if we are restarted
    //停止父类
    super.stopInternal();
}

org.apache.catalina.ha.session.BackupManager:集群备份会话管理器

这个管理器用于备份会话来使用,整体结构如下:

核心的几个方法如下:

初始化:org.apache.catalina.ha.session.BackupManager#startInternal

代码语言:javascript复制
@Override
protected synchronized void startInternal() throws LifecycleException {
    //父类实始化方法
    super.startInternal();

    try {
        //节点为空抛出异常
        if (cluster == null) {
            throw new LifecycleException(sm.getString("backupManager.noCluster", getName()));
        }
        //初始化map列表 
        LazyReplicatedMap<String,Session> map = new LazyReplicatedMap<>(
                this, cluster.getChannel(), rpcTimeout, getMapName(),
                getClassLoaders(), terminateOnStartFailure);
        map.setChannelSendOptions(mapSendOptions);
        map.setAccessTimeout(accessTimeout);
        this.sessions = map;
    } catch ( Exception x ) {
        log.error(sm.getString("backupManager.startUnable", getName()),x);
        throw new LifecycleException(sm.getString("backupManager.startFailed", getName()),x);
    }
    //设置状态为启动
    setState(LifecycleState.STARTING);
}

停用的方法:org.apache.catalina.ha.session.BackupManager#stopInternal

代码语言:javascript复制
@Override
protected synchronized void stopInternal() throws LifecycleException {

    if (log.isDebugEnabled()) {
        log.debug(sm.getString("backupManager.stopped", getName()));
    }
    //设置状态
    setState(LifecycleState.STOPPING);
    //判断类型是不是LazyReplicatedMap 如果是的话进行清空map缓存
    if (sessions instanceof LazyReplicatedMap) {
        LazyReplicatedMap<String,Session> map =
                (LazyReplicatedMap<String,Session>)sessions;
        map.breakdown();
    }
    //父类停用方法
    super.stopInternal();
}

注意啊,tomcat里面大量的启动和停止都是用synchronized同步锁来进行解决线程同步问题,所以要看地方来使用,而不是很多博主吹嘘不能使用,而又不给出具体原因。~~

Tribes消息发送流程:这里借用刘光瑞《Tomcat架构解析》的序列图如下:

通过组的方式进行发送。

org.apache.catalina.tribes.group.GroupChannel#send(org.apache.catalina.tribes.Member[], java.io.Serializable, int, org.apache.catalina.tribes.ErrorHandler)

代码语言:javascript复制
@Override
public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler)
        throws ChannelException {
    //消息为空,抛出异常
    if ( msg == null ) {
        throw new ChannelException(sm.getString("groupChannel.nullMessage"));
    }
    XByteBuffer buffer = null;
    try {
        //发送的目的地为空,直接抛出异常
        if (destination == null || destination.length == 0) {
            throw new ChannelException(sm.getString("groupChannel.noDestination"));
        }
        //组装信息
        ChannelData data = new ChannelData(true);//generates a unique Id
        data.setAddress(getLocalMember(false));
        data.setTimestamp(System.currentTimeMillis());
        byte[] b = null;
        if ( msg instanceof ByteMessage ){
            b = ((ByteMessage)msg).getMessage();
            options = options | SEND_OPTIONS_BYTE_MESSAGE;
        } else {
            b = XByteBuffer.serialize(msg);
            options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
        }
        data.setOptions(options);
        //XByteBuffer buffer = new XByteBuffer(b.length 128,false);
        buffer = BufferPool.getBufferPool().getBuffer(b.length 128, false);
        buffer.append(b,0,b.length);
        data.setMessage(buffer);
        InterceptorPayload payload = null;
        if ( handler != null ) {
            payload = new InterceptorPayload();
            payload.setErrorHandler(handler);
        }
        //发送消息
        getFirstInterceptor().sendMessage(destination, data, payload);
        if ( Logs.MESSAGES.isTraceEnabled() ) {
            Logs.MESSAGES.trace("GroupChannel - Sent msg:"   new UniqueId(data.getUniqueId())  
                    " at "   new java.sql.Timestamp(System.currentTimeMillis())   " to "  
                    Arrays.toNameString(destination));
            Logs.MESSAGES.trace("GroupChannel - Send Message:"  
                    new UniqueId(data.getUniqueId())   " is "   msg);
        }
        //返回id
        return new UniqueId(data.getUniqueId());
    } catch (RuntimeException | IOException e) {
        throw new ChannelException(e);
    } finally {
        if ( buffer != null ) {
            //出栈
            BufferPool.getBufferPool().returnBuffer(buffer);
        }
    }
}

消息到了org.apache.catalina.tribes.membership.McastService#broadcast 会进行组装然后通过tcp方式进行发送。

代码语言:javascript复制
@Override
public void broadcast(ChannelMessage message) throws ChannelException {
    //消息为空或队列不对直接抛出异常
    if (impl==null || (impl.startLevel & Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ ) {
        throw new ChannelException(sm.getString("mcastService.noStart"));
    }
    //创建发送数据包
    byte[] data = XByteBuffer.createDataPackage((ChannelData)message);
    if (data.length>McastServiceImpl.MAX_PACKET_SIZE) {
        throw new ChannelException(sm.getString("mcastService.exceed.maxPacketSize",
                Integer.toString(data.length) ,
                Integer.toString(McastServiceImpl.MAX_PACKET_SIZE)));
    }
    //接装
    DatagramPacket packet = new DatagramPacket(data,0,data.length);
    try {
        //发送
        impl.send(false, packet);
    } catch (Exception x) {
        throw new ChannelException(x);
    }
}

通过socket发送消息:org.apache.catalina.tribes.transport.bio.MultipointBioSender#sendMessage

代码语言:javascript复制
@Override
public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
    //组装数据
    byte[] data = XByteBuffer.createDataPackage((ChannelData)msg);
    //组装发送列表
    BioSender[] senders = setupForSend(destination);
    ChannelException cx = null;
    for ( int i=0; i<senders.length; i   ) {
        try {
            //发送消息 带上ack标志
            senders[i].sendMessage(data,(msg.getOptions()&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK);
        } catch (Exception x) {
            if (cx == null) {
                cx = new ChannelException(x);
            }
            cx.addFaultyMember(destination[i],x);
        }
    }
    if (cx!=null ) {
        throw cx;
    }
}

详细交互图:

为什么tomcat需要集群?

源码看完了,但是我们自问一个最本质的问题,为什么tomcat需要集群?当然懂的同学可能可以列举一大堆原因,但是请容我再说明如下,说不定刚好跟你的不一样呢?

可伸缩性:由于集群一致是最少需要两个节点,这样一来,可以在流量高峰的时候动态来拓展,当然根据需要可以在流量少的时候减少节点,也是OK的,但至少保留2个。

高可用性:当集群中某一个节点出问题了,不会导致服务不可用,这样一来可以起到高可用状态,但是现实中,一般一个节点有问题会立即告警,然后由研发人员进行跟进处理;

高性能:在集群的情况下,负载均衡可以分配流量到每台机器,一般会使用轮训的方式进行分配,所以可以一定程度提升系统的性能;

最后

建议阅读的同学可以先把集群搭起来,具体怎么搭下面的参考文章里面有,还有搭好集群后,需要看下消息发送节点的流程,一步一步往下跟,这样学习才会清楚是否真实的实现流程,以上的实现说明,仅做参考,光看真的很难有所深入,说句不好听的就是浪费时间了解一些表面的东西,最终还是需要深入到源码去跟进,不要嫌麻烦,因为有时候跟进一个点或复现一个场景可几小时甚至几周的时间才能复现,当然更建议与了解过或学习过的同学学习或参考现成资料这样效率会较高。本文仅是摘取部分个人觉得比较重要的流程,所以绝对不是全的,如果想了解得很深入或全面建议同学你动手吧~

参考文章:

https://tomcat.apache.org/tomcat-9.0-doc/cluster-howto.html

https://www.cnblogs.com/jdbinfo/p/15570440.html

https://tomcat.apache.org/tomcat-8.0-doc/api/org/apache/catalina/tribes/Channel.html

https://www.jianshu.com/p/aa9f71d653af

https://www.cnblogs.com/laoxia/p/8149711.html

https://blog.csdn.net/change2970955076/article/details/77477465

https://www.codeleading.com/article/8097421850/

0 人点赞