事件系统的设计方法

2020-07-06 10:28:52 浏览数 (1)

前言

事件功能用过段时间,像用户登入登出,充值,完成任务等非常的方便,有时间看看源码学习,设计的时候比较复杂,但完成后使用很简单,选了创建角色事件走遍流程,下面是JAVA代码

事件实体GameEvent

代码语言:java复制
public abstract class GameEvent {
    /** 派发类型 */
    private int dispatchType;
    /** 事件的key {@code EventKey} */
    public String name;
    /**
     * @param name        事件名称
     * @param actorId    当前角色
     */
    public GameEvent(String name) {
        this.name = name;
    }
    public GameEvent(String name, int dispatchType) {
        this.name = name;
        this.dispatchType = dispatchType;
    }

角色事件

代码语言:java复制
public abstract class ActorEvent extends GameEvent {
    /** * 角色id */
    private long actorId;
    public ActorEvent(String name, long actorId) {
        super(name, DispatchType.ACTOR);
        this.actorId = actorId;
    }
    @Override
    public long getUniqueId() {
        return actorId;
    }
    public long getActorId() {
        return actorId;
    }
}

创建角色事件

代码语言:java复制
/*** 创建角色事件*/
public class ActorCreateEvent extends ActorEvent {
    public ActorCreateEvent(long actorId) {
        super(EventKey.ACTOR_CREATE_EVENT, actorId);
    }
}

EventKey 事件类型

代码语言:java复制
public interface EventKey {
    /** System 系统底层事件:用于在线角色定时触发的事件 @EventOnline */
    public final static String ONLINE_EVENT = "ONLINE_EVENT_%s_%s";
    /** User角色登陆事件*/
    public final static String ACTOR_LOGIN = "ACTOR_LOGIN";
    /** 创建角色事件*/
    public final static String ACTOR_CREATE_EVENT = "ACTOR_CREATE_EVENT";

消息派发类型线程池

代码语言:shell复制
/*** 消息派发类型线程池*/
public interface DispatchType {
    /** 运维专用线程 */
    public static int MAINTAIN = 1;
    /** 角色线程池(大部份模块用该线程),角色之间的数据修改需要通过事件方式 */
    public static int ACTOR = 2;
    /** 用户登陆  */
    public static int USER_LOGIN = 3;
    /** 帐号相关的线程 */
    public static int SETTING = 4;
    /** 节点相关的线程 */
    public static int NODE = 5;
    /** 
     * 注意!!!!!!!!!!!
     * 以下为每个进程的线程配置。线程数为1的,一定要谨慎修改!!切记!切记!
     */
    public static List<ThreadInfo> getGameThreadInfo() {
        List<ThreadInfo> list = new ArrayList<>();
        list.add(ThreadInfo.valueOf("maintain", MAINTAIN, 1));
        list.add(ThreadInfo.valueOf("actor", ACTOR, Runtime.getRuntime().availableProcessors() * 2   1));
        list.add(ThreadInfo.valueOf("setting", SETTING, 1));
        list.add(ThreadInfo.valueOf("user_login", USER_LOGIN, 1));
        list.add(ThreadInfo.valueOf("node", NODE, 1));
        return list;
    }
}

事件派发管理

创建角色方法

创建的ActorCreateEvent继承GameEvent类,dispatchType = ACTOR,DispatchHelper派发事件

代码语言:java复制
    @Override
    public TResult<Long> createActor(long uid, int platformId, int channelId, int serverId, String actorName, boolean sex, int avatarId,
            String createIP) {
        Result checkResult = this.checkActorName(actorName);
        if (checkResult.isFail()) {
            return TResult.valueOf(checkResult);
        }
        if (actorDao.getActorId(uid, serverId) > 0) {
            return TResult.valueOf(CREATE_ACTOR_FAIL);
        }
        this.checkActorName(actorName);
        int level = globalConfigService.findGlobalConfig(GlobalConfigKey.ACTOR_INIT_LEVEL).findInt();
        int initSaveDiamond = globalConfigService.findGlobalConfig(GlobalConfigKey.SAVE_DIAMOND_INIT).findInt();
        Actor actor = actorDao.createActor(uid, platformId, channelId, serverId, actorName, sex, avatarId, level, initSaveDiamond);
        int passStoryRewardTime = globalConfigService.findGlobalConfig(GlobalConfigKey.PASS_STORY_TIME).findInt();
        long expireTime = actor.getCreateTime()   passStoryRewardTime * TimeUtils.ONE_HOUR_MILLISECOND;
        actor.setPassStoryRewardExpireTime(expireTime);
        dbQueue.updateQueue(actor);
        List<RewardObject> rewardList = globalConfigService.findGlobalObject(GlobalConfigKey.ACTOR_INIT_REWARD, RewardObjectListConfig.class).getVs();
        RewardHelper.sendRewardList(actor.getActorId(), rewardList, OperationType.ACTOR_CREATE);
        VipConfig vipConfig = VipConfigService.getVipConfig(actor.getVipLevel());
        prerogativeFacade.addPrerogative(actor.getActorId(), vipConfig.getPrerogativeIdList(), OperationType.ACTOR_CREATE);
        DispatchHelper.postEvent(new ActorCreateEvent(actor.getActorId()));
        if (!ActorHelper.isRobot(actor.getActorId())) {
            GameOssLogger.newUser(uid, platformId, channelId, serverId, actor.getActorId());
            GameOssLogger.actorInfo(actor, createIP);
        }
        return TResult.sucess(actor.getActorId());
    }

触发事件方法

触发@Event(name = EventKey.ACTOR_CREATE_EVENT)的创角事件方法

代码语言:java复制
@Event(name = EventKey.ACTOR_CREATE_EVENT)
    public void onActorCreateEvent(ActorEvent event) {
        long actorId = event.getActorId();
        TResult<Actor> result = this.getActor(actorId);
        if (result.isFail()) {
            return;
        }
        Actor actor = result.item;
        if (actor.getPlatformId() == PlatformId.WAN_BA || actor.getPlatformId() == PlatformId.WAN_BA_IOS) {
            ActorHelper.reportRegchar(actor.getUid());
        }
    }

还有活动多次事件触发等

代码语言:java复制
@Override
    public void onEvent(GameEvent e) {
        List<ActivityOpenConfig> activityOpenConfigList = ActivityOpenConfigService.getActivityOpenConfigList(getType());
        switch (e.name) {
        case EventKey.GOD_EQUIPMENT_EVENT:
            // 神装召唤事件
            GodEquipmentEvent godEquipmentEvent = e.convert();
        case EventKey.GACHA_ACTIVITY_EVENT:
            // 抽奖事件

线程调度DispatchHelpe

调度EventContext

代码语言:java复制
@Component
public class DispatchHelper {
    @Autowired
    private DispatchContext dispatchContext;
    @Autowired
    private EventContext eventContext;
    @Autowired
    private RouterContext routerContext;
    private static ObjectReference<DispatchHelper> ref = new ObjectReference<DispatchHelper>();
    @PostConstruct
    protected void init() {
        ref.set(this);
    }
    public static void postEvent(GameEvent e) {
        ref.get().eventContext.post(e);
    }
}

事件派发环境

dispatchContext排序事件类型从小到大执行

代码语言:java复制
public class EventContext {
    @Autowired
    DispatchContext dispatchContext;
    private static Logger LOGGER = LoggerFactory.getLogger(EventContext.class);
    /** OnEventListener接口事件注册列表 */
    private Map<String, List<OnEventListener>> onEventMaps = new HashMap<>();
    /** Event注解事件注册列表  */
    private Map<String, List<ASMMethod>> asmEventMaps = new HashMap<>();
    /** 事件队列 */
    private static ConcurrentLinkedQueue<GameEvent> EVENT_QUEUE = new ConcurrentLinkedQueue<>();
    /** 事件队列线程执行器 */
    private NamedScheduleExecutor eventExecutor;

    @PostConstruct
    public void initEventQueue() {
        eventExecutor = new NamedScheduleExecutor(1, "eventQueueThread");
        eventExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    for (;;) {
                        GameEvent e = EVENT_QUEUE.poll();
                        if (e == null) {
                            return;
                        }
                        EventDispatch dispatch = createEventDispatch(e);
                        dispatchContext.post(dispatch);
                    }
                } catch (Exception e) {
                    LOGGER.error("scene queue thread error:{}", e);
                }
            }
        }, 10, 10, TimeUnit.MILLISECONDS);
    }

    /**
     * 创建事件派发包
     * @param event
     * @return
     */
    private EventDispatch createEventDispatch(GameEvent event) {
        List<OnEventListener> eventListenerList = this.onEventMaps.get(event.getName());
        List<ASMMethod> asmEventList = this.asmEventMaps.get(event.getName());
        EventDispatch dispatch = new EventDispatch(event, eventListenerList, asmEventList);
        return dispatch;
    }

    public boolean register(Object obj, Class<?> clazz, Method method, String... eventNames) {
        int eventOrderId = 0;
        EventOrder eventOrder = clazz.getAnnotation(EventOrder.class);
        if (eventOrder != null) {
            eventOrderId = eventOrder.orderId();
        }
        MethodAccess methodAccess = MethodAccess.get(clazz);
        ASMMethod asmMethod = ASMMethod.valueOf(method, methodAccess, obj, eventOrderId);
        boolean register = false;
        for (String eventName : eventNames) {
            if (eventName == null || eventName.equals("")) {
                LOGGER.error("event name is null! method={}", method.getName());
                continue;
            }
            List<ASMMethod> valueList = asmEventMaps.get(eventName);
            if (valueList == null) {
                valueList = new ArrayList<>();
                asmEventMaps.put(eventName, valueList);
            }
            valueList.add(asmMethod);
            Collections.sort(valueList);
            register = true;
        }

        return register;
    }

    public boolean register(Object obj, Class<?> clazz, Method method) {
        method.setAccessible(true);
        // 普通的事件注解
        Event event = method.getAnnotation(Event.class);
        if (event != null) {
            if (event.name() == null) {
                LOGGER.error("event name is null! method={}", method.getName());
                return false;
            }
            return register(obj, clazz, method, event.name());
        }
        return false;
    }

    public void registeEventListener(OnEventListener listener) {
        Set<String> eventSet = new HashSet<>();
        listener.registerEvent(eventSet);
        if (eventSet.isEmpty()) {
            return;
        }
        for (String name : eventSet) {
            if (name == null || name.equals("")) {
                continue;
            }
            List<OnEventListener> valueList = onEventMaps.get(name);
            if (valueList == null) {
                valueList = new ArrayList<>();
                onEventMaps.put(name, valueList);
            }
            valueList.add(listener);
            Collections.sort(valueList);
        }
    }
    public void post(GameEvent event) {
        if (event != null) {
            // 事件先进队列,以防ringbuffer满环的时候被自己的消费者线程投递消息,造成死环.
            EVENT_QUEUE.add(event);
        }
    }
    public static boolean isEmpty() {
        return EVENT_QUEUE.isEmpty();
    }
    public static int getQueueSize() {
        return EVENT_QUEUE.size();
    }
}

事件监听器

添加和接收事件

代码语言:java复制
public interface OnEventListener extends Comparable<OnEventListener> {
    @Override
    default int compareTo(OnEventListener o) {
        int orderId = 0;
        EventOrder eventOrder = this.getClass().getAnnotation(EventOrder.class);
        if (eventOrder != null) {
            orderId = eventOrder.orderId();
        }
        int targetOrderId = 0;
        EventOrder targetEventOrder = o.getClass().getAnnotation(EventOrder.class);
        if (targetEventOrder != null) {
            targetOrderId = targetEventOrder.orderId();
        }
        if (orderId > targetOrderId) {
            return 1;
        } else if (orderId < targetOrderId) {
            return -1;
        } else {
            return this.getClass().getName().compareTo(o.getClass().getName());
        }
    }
    /**
     * 事件名集合
     * @param eventSet    添加事件名 {@code EventKey}
     */
    public void registerEvent(Set<String> eventSet);
    /**
     * 接收事件的方法
     * @param event
     */
    public void onEvent(GameEvent event);
}

ASM字节码方法

代码语言:java复制
public class ASMMethod implements Comparable<ASMMethod> {
    /**
     * ASM方法访问对象
     */
    private MethodAccess access;
    /**
     * 方法索引
     */
    private int index;
    /**
     * 方法所属对象实体
     */
    private Object instance;
    /**
     * 执行顺序ID
     */
    private int orderId;
    /**
     * valueOf.
     * @param method        方法对象
     * @param methodAccess    asm的方法访问对象
     * @param instance        当前方法所属类的实例
     * @return
     */
    public static ASMMethod valueOf(Method method, MethodAccess methodAccess, Object instance) {
        ASMMethod asmMethod = new ASMMethod();
        asmMethod.access = methodAccess;
        asmMethod.index = asmMethod.access.getIndex(method.getName(), method.getParameterTypes());
        asmMethod.instance = instance;
        return asmMethod;
    }
    public static ASMMethod valueOf(Method method, MethodAccess methodAccess, Object instance, int orderId) {
        ASMMethod asmMethod = new ASMMethod();
        asmMethod.access = methodAccess;
        asmMethod.index = asmMethod.access.getIndex(method.getName(), method.getParameterTypes());
        asmMethod.instance = instance;
        asmMethod.orderId = orderId;
        return asmMethod;
    }
    public Object invoke(Object... args) {
        return access.invoke(instance, index, args);
    }
    public int getOrderId() {
        return orderId;
    }
    @Override
    public String toString() {
        return instance.getClass().getName();
    }
    @Override
    public int compareTo(ASMMethod o) {
        if (this.orderId > o.orderId) {
            return 1;
        } else if (this.orderId < o.orderId) {
            return -1;
        } else {
            return this.instance.getClass().getName().compareTo(o.instance.getClass().getName());
        }
    }
}

事件执行线程

线程命名的执行器封装类

代码语言:java复制
public class NamedScheduleExecutor {
    private ScheduledExecutorService executor;
    public NamedScheduleExecutor(int poolSize, final String name) {
        executor = Executors.newScheduledThreadPool(poolSize, new NamedThreadFactory(name));
    }
    public ScheduledExecutorService getExecutor() {
        return executor;
    }
    public Future<?> submit(Runnable run) {
        return executor.submit(run);
    }
    public ScheduledFuture<?> schedule(Runnable run, int delay, TimeUnit timeUnit) {
        return executor.schedule(run, delay, timeUnit);
    }
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable run, int initialDelay, int period, TimeUnit unit) {
        return executor.scheduleAtFixedRate(run, initialDelay, period, unit);
    }
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable run, int initialDelay, int delay, TimeUnit unit) {
        return executor.scheduleWithFixedDelay(run, initialDelay, delay, unit);
    }
}

0 人点赞