PokéLLMon 源码解析(五)

2024-03-08 09:16:42 浏览数 (2)

.PokeLLMonpoke_envplayerplayer.py

代码语言:javascript复制
"""This module defines a base class for players.
"""

import asyncio
import random
from abc import ABC, abstractmethod
from asyncio import Condition, Event, Queue, Semaphore
from logging import Logger
from time import perf_counter
from typing import Any, Awaitable, Dict, List, Optional, Union

import orjson

from poke_env.concurrency import create_in_poke_loop, handle_threaded_coroutines
from poke_env.data import GenData, to_id_str
from poke_env.environment.abstract_battle import AbstractBattle
from poke_env.environment.battle import Battle
from poke_env.environment.double_battle import DoubleBattle
from poke_env.environment.move import Move
from poke_env.environment.pokemon import Pokemon
from poke_env.exceptions import ShowdownException
from poke_env.player.battle_order import (
    BattleOrder,
    DefaultBattleOrder,
    DoubleBattleOrder,
)
from poke_env.ps_client import PSClient
from poke_env.ps_client.account_configuration import (
    CONFIGURATION_FROM_PLAYER_COUNTER,
    AccountConfiguration,
)
from poke_env.ps_client.server_configuration import (
    LocalhostServerConfiguration,
    ServerConfiguration,
)
from poke_env.teambuilder.constant_teambuilder import ConstantTeambuilder
from poke_env.teambuilder.teambuilder import Teambuilder

class Player(ABC):
    """
    Base class for players.
    """

    # Set of messages to ignore during battle
    MESSAGES_TO_IGNORE = {"", "t:", "expire", "uhtmlchange"}

    # Chance of using default order after an error due to invalid choice
    DEFAULT_CHOICE_CHANCE = 1 / 1000
    # 初始化函数,设置各种参数的默认值
    def __init__(
        self,
        account_configuration: Optional[AccountConfiguration] = None,  # 账户配置,默认为None
        *,
        avatar: Optional[int] = None,  # 头像,默认为None
        battle_format: str = "gen9randombattle",  # 战斗格式,默认为"gen9randombattle"
        log_level: Optional[int] = None,  # 日志级别,默认为None
        max_concurrent_battles: int = 1,  # 最大并发战斗数,默认为1
        save_replays: Union[bool, str] = False,  # 是否保存战斗重放,默认为False
        server_configuration: Optional[ServerConfiguration] = None,  # 服务器配置,默认为None
        start_timer_on_battle_start: bool = False,  # 战斗开始时是否启动计时器,默认为False
        start_listening: bool = True,  # 是否开始监听,默认为True
        ping_interval: Optional[float] = 20.0,  # ping间隔时间,默认为20.0
        ping_timeout: Optional[float] = 20.0,  # ping超时时间,默认为20.0
        team: Optional[Union[str, Teambuilder]] = None,  # 队伍信息,默认为None
    # 辅助奖励计算函数
    def reward_computing_helper(
        self,
        battle: AbstractBattle,  # 战斗对象
        *,
        fainted_value: float = 0.0,  # 损失生命值的价值,默认为0.0
        hp_value: float = 0.0,  # 生命值的价值,默认为0.0
        number_of_pokemons: int = 6,  # 参与战斗的宝可梦数量,默认为6
        starting_value: float = 0.0,  # 初始价值,默认为0.0
        status_value: float = 0.0,  # 状态价值,默认为0.0
        victory_value: float = 1.0,  # 胜利价值,默认为1.0
    # 定义一个方法,计算战斗的奖励值
    ) -> float:

        # 如果战斗不在奖励缓冲区中,则将其初始值添加到缓冲区中
        if battle not in self._reward_buffer:
            self._reward_buffer[battle] = starting_value
        current_value = 0

        # 遍历我方队伍中的每只精灵
        for mon in battle.team.values():
            # 根据当前生命值比例计算当前值
            current_value  = mon.current_hp_fraction * hp_value
            # 如果精灵已经倒下,则减去倒下的值
            if mon.fainted:
                current_value -= fainted_value
            # 如果精灵有异常状态,则减去异常状态的值
            elif mon.status is not None:
                current_value -= status_value

        # 根据我方队伍中精灵数量与总精灵数量的差值计算当前值
        current_value  = (number_of_pokemons - len(battle.team)) * hp_value

        # 遍历对手队伍中的每只精灵
        for mon in battle.opponent_team.values():
            # 根据当前生命值比例计算当前值
            current_value -= mon.current_hp_fraction * hp_value
            # 如果精灵已经倒下,则加上倒下的值
            if mon.fainted:
                current_value  = fainted_value
            # 如果精灵有异常状态,则加上异常状态的值
            elif mon.status is not None:
                current_value  = status_value

        # 根据对手队伍中精灵数量与总精灵数量的差值计算当前值
        current_value -= (number_of_pokemons - len(battle.opponent_team)) * hp_value

        # 如果战斗胜利,则加上胜利的值;如果战斗失败,则减去胜利的值
        if battle.won:
            current_value  = victory_value
        elif battle.lost:
            current_value -= victory_value

        # 计算最终奖励值并更新奖励缓冲区
        to_return = current_value - self._reward_buffer[battle]
        self._reward_buffer[battle] = current_value
        return to_return


    # 定义一个方法,创建账户配置
    def _create_account_configuration(self) -> AccountConfiguration:
        key = type(self).__name__
        # 更新配置计数器
        CONFIGURATION_FROM_PLAYER_COUNTER.update([key])
        username = "%s %d" % (key, CONFIGURATION_FROM_PLAYER_COUNTER[key])
        # 如果用户名长度超过18个字符,则截取前18个字符
        if len(username) > 18:
            username = "%s %d" % (
                key[: 18 - len(username)],
                CONFIGURATION_FROM_PLAYER_COUNTER[key],
            )
        return AccountConfiguration(username, None)

    # 定义一个方法,处理战斗结束的回调
    def _battle_finished_callback(self, battle: AbstractBattle):
        pass
    def update_team(self, team: Union[Teambuilder, str]):
        """Updates the team used by the player.

        :param team: The new team to use.
        :type team: str or Teambuilder
        """
        # 如果传入的 team 是 Teambuilder 类型,则直接赋值给 self._team
        if isinstance(team, Teambuilder):
            self._team = team
        else:
            # 如果传入的 team 不是 Teambuilder 类型,则使用 ConstantTeambuilder 类创建一个新的 team
            self._team = ConstantTeambuilder(team)

    async def _get_battle(self, battle_tag: str) -> AbstractBattle:
        # 去掉 battle_tag 的第一个字符
        battle_tag = battle_tag[1:]
        while True:
            # 如果 battle_tag 在 self._battles 中,则返回对应的 AbstractBattle 对象
            if battle_tag in self._battles:
                return self._battles[battle_tag]
            # 否则等待 _battle_start_condition 条件满足
            async with self._battle_start_condition:
                await self._battle_start_condition.wait()

    async def _handle_battle_request(
        self,
        battle: AbstractBattle,
        from_teampreview_request: bool = False,
        maybe_default_order: bool = False,
    ):
        # 如果 maybe_default_order 为 True 且随机数小于 DEFAULT_CHOICE_CHANCE,则选择默认的移动
        if maybe_default_order and random.random() < self.DEFAULT_CHOICE_CHANCE:
            message = self.choose_default_move().message
        elif battle.teampreview:
            # 如果在队伍预览阶段,且不是来自队伍预览请求,则返回
            if not from_teampreview_request:
                return
            message = self.teampreview(battle)
        else:
            # 选择移动或者等待选择移动的结果
            message = self.choose_move(battle)
            if isinstance(message, Awaitable):
                message = await message
            message = message.message

        # 发送消息到 ps_client,指定 battle.battle_tag
        await self.ps_client.send_message(message, battle.battle_tag)

    async def _handle_challenge_request(self, split_message: List[str]):
        """Handles an individual challenge."""
        # 获取挑战者的用户名
        challenging_player = split_message[2].strip()

        # 如果挑战者不是当前用户,并且消息长度大于等于 6,且消息中指定的格式与 self._format 相同,则将挑战者加入到 _challenge_queue 中
        if challenging_player != self.username:
            if len(split_message) >= 6:
                if split_message[5] == self._format:
                    await self._challenge_queue.put(challenging_player)
    async def _update_challenges(self, split_message: List[str]):
        """Update internal challenge state.

        Add corresponding challenges to internal queue of challenges, where they will be
        processed if relevant.

        :param split_message: Recevied message, split.
        :type split_message: List[str]
        """
        # 记录调试信息,更新挑战信息
        self.logger.debug("Updating challenges with %s", split_message)
        # 从消息中解析挑战信息
        challenges = orjson.loads(split_message[2]).get("challengesFrom", {})
        # 遍历挑战信息,将符合条件的挑战加入挑战队列
        for user, format_ in challenges.items():
            if format_ == self._format:
                await self._challenge_queue.put(user)

    async def accept_challenges(
        self,
        opponent: Optional[Union[str, List[str]]],
        n_challenges: int,
        packed_team: Optional[str] = None,
    ):
        """Let the player wait for challenges from opponent, and accept them.

        If opponent is None, every challenge will be accepted. If opponent if a string,
        all challenges from player with that name will be accepted. If opponent is a
        list all challenges originating from players whose name is in the list will be
        accepted.

        Up to n_challenges challenges will be accepted, after what the function will
        wait for these battles to finish, and then return.

        :param opponent: Players from which challenges will be accepted.
        :type opponent: None, str or list of str
        :param n_challenges: Number of challenges that will be accepted
        :type n_challenges: int
        :packed_team: Team to use. Defaults to generating a team with the agent's teambuilder.
        :type packed_team: string, optional.
        """
        # 如果没有指定队伍,则使用下一个队伍
        if packed_team is None:
            packed_team = self.next_team

        # 导入日志模块,记录警告信息
        import logging
        logging.warning("AAAHHH in accept_challenges")
        # 处理多线程协程,接受挑战
        await handle_threaded_coroutines(
            self._accept_challenges(opponent, n_challenges, packed_team)
        )
    # 异步方法,用于接受挑战
    async def _accept_challenges(
        self,
        opponent: Optional[Union[str, List[str]]],  # 对手的用户名,可以是字符串或字符串列表
        n_challenges: int,  # 挑战的次数
        packed_team: Optional[str],  # 打包的队伍信息
    ):
        import logging  # 导入日志模块
        logging.warning("AAAHHH in _accept_challenges")  # 记录警告日志
        if opponent:  # 如果存在对手
            if isinstance(opponent, list):  # 如果对手是列表
                opponent = [to_id_str(o) for o in opponent]  # 将对手列表中的每个元素转换为字符串
            else:
                opponent = to_id_str(opponent)  # 将对手转换为字符串
        await self.ps_client.logged_in.wait()  # 等待客户端登录完成
        self.logger.debug("Event logged in received in accept_challenge")  # 记录调试日志

        for _ in range(n_challenges):  # 循环处理挑战次数
            while True:  # 无限循环
                username = to_id_str(await self._challenge_queue.get())  # 从挑战队列中获取用户名并转换为字符串
                self.logger.debug(
                    "Consumed %s from challenge queue in accept_challenge", username
                )  # 记录调试日志
                if (
                    (opponent is None)  # 如果对手为空
                    or (opponent == username)  # 或对手等于用户名
                    or (isinstance(opponent, list) and (username in opponent))  # 或对手是列表且用户名在对手列表中
                ):
                    await self.ps_client.accept_challenge(username, packed_team)  # 接受挑战
                    await self._battle_semaphore.acquire()  # 获取战斗信号量
                    break  # 跳出循环
        await self._battle_count_queue.join()  # 等待所有战斗完成

    @abstractmethod
    def choose_move(
        self, battle: AbstractBattle
    ) -> Union[BattleOrder, Awaitable[BattleOrder]]:
        """Abstract method to choose a move in a battle.

        :param battle: The battle.
        :type battle: AbstractBattle
        :return: The move order.
        :rtype: str
        """
        pass

    # 选择默认的战斗指令
    def choose_default_move(self) -> DefaultBattleOrder:
        """Returns showdown's default move order.

        This order will result in the first legal order - according to showdown's
        ordering - being chosen.
        """
        return DefaultBattleOrder()
    # 选择一个随机的单个行动,返回一个 BattleOrder 对象
    def choose_random_singles_move(self, battle: Battle) -> BattleOrder:
        # 创建一个包含可用行动的 BattleOrder 对象列表
        available_orders = [BattleOrder(move) for move in battle.available_moves]
        # 将可用交换的 BattleOrder 对象也添加到列表中
        available_orders.extend(
            [BattleOrder(switch) for switch in battle.available_switches]
        )

        # 如果可以超级进化,将带有 mega 标记的 BattleOrder 对象也添加到列表中
        if battle.can_mega_evolve:
            available_orders.extend(
                [BattleOrder(move, mega=True) for move in battle.available_moves]
            )

        # 如果可以极巨化,将带有 dynamax 标记的 BattleOrder 对象也添加到列表中
        if battle.can_dynamax:
            available_orders.extend(
                [BattleOrder(move, dynamax=True) for move in battle.available_moves]
            )

        # 如果可以使用 tera,将带有 terastallize 标记的 BattleOrder 对象也添加到列表中
        if battle.can_tera:
            available_orders.extend(
                [
                    BattleOrder(move, terastallize=True)
                    for move in battle.available_moves
                ]
            )

        # 如果可以使用 Z 招式且有活跃的精灵,将带有 z_move 标记的 BattleOrder 对象也添加到列表中
        if battle.can_z_move and battle.active_pokemon:
            available_z_moves = set(battle.active_pokemon.available_z_moves)
            available_orders.extend(
                [
                    BattleOrder(move, z_move=True)
                    for move in battle.available_moves
                    if move in available_z_moves
                ]
            )

        # 如果有可用的行动,则随机选择一个返回
        if available_orders:
            return available_orders[int(random.random() * len(available_orders))]
        # 如果没有可用的行动,则选择默认行动
        else:
            return self.choose_default_move()
    # 从给定的战斗中选择一个随机的合法移动
    def choose_random_move(self, battle: AbstractBattle) -> BattleOrder:
        """Returns a random legal move from battle.

        :param battle: The battle in which to move.
        :type battle: AbstractBattle
        :return: Move order
        :rtype: str
        """
        # 如果战斗是单打模式
        if isinstance(battle, Battle):
            # 调用选择单打模式下的随机移动方法
            return self.choose_random_singles_move(battle)
        # 如果战斗是双打模式
        elif isinstance(battle, DoubleBattle):
            # 调用选择双打模式下的随机移动方法
            return self.choose_random_doubles_move(battle)
        else:
            # 抛出数值错误,提示战斗应该是 Battle 或 DoubleBattle 类型
            raise ValueError(
                "battle should be Battle or DoubleBattle. Received %d" % (type(battle))
            )

    # 在天梯上进行游戏
    async def ladder(self, n_games: int):
        """Make the player play games on the ladder.

        n_games defines how many battles will be played.

        :param n_games: Number of battles that will be played
        :type n_games: int
        """
        # 等待玩家登录完成
        await handle_threaded_coroutines(self._ladder(n_games))

    # 在天梯上进行游戏的内部方法
    async def _ladder(self, n_games: int):
        # 等待玩家登录完成
        await self.ps_client.logged_in.wait()
        # 记录开始时间
        start_time = perf_counter()

        # 循环进行指定次数的游戏
        for _ in range(n_games):
            # 异步等待战斗开始条件
            async with self._battle_start_condition:
                # 搜索天梯游戏并等待战斗开始
                await self.ps_client.search_ladder_game(self._format, self.next_team)
                await self._battle_start_condition.wait()
                # 当战斗计数队列已满时
                while self._battle_count_queue.full():
                    # 异步等待战斗结束条件
                    async with self._battle_end_condition:
                        await self._battle_end_condition.wait()
                # 获取战斗信号量
                await self._battle_semaphore.acquire()
        # 等待战斗计数队列完成
        await self._battle_count_queue.join()
        # 记录日志,显示天梯游戏完成所花费的时间
        self.logger.info(
            "Laddering (%d battles) finished in %fs",
            n_games,
            perf_counter() - start_time,
        )
    # 异步方法,使玩家与对手进行指定次数的对战
    async def battle_against(self, opponent: "Player", n_battles: int = 1):
        """Make the player play n_battles against opponent.

        This function is a wrapper around send_challenges and accept challenges.

        :param opponent: The opponent to play against.
        :type opponent: Player
        :param n_battles: The number of games to play. Defaults to 1.
        :type n_battles: int
        """
        # 调用异步方法处理线程化的协程
        await handle_threaded_coroutines(self._battle_against(opponent, n_battles))

    # 异步方法,实际进行玩家与对手的对战
    async def _battle_against(self, opponent: "Player", n_battles: int):
        # 并发执行发送挑战和接受挑战的操作
        await asyncio.gather(
            self.send_challenges(
                to_id_str(opponent.username),
                n_battles,
                to_wait=opponent.ps_client.logged_in,
            ),
            opponent.accept_challenges(
                to_id_str(self.username), n_battles, opponent.next_team
            ),
        )

    # 异步方法,使玩家发送挑战给对手
    async def send_challenges(
        self, opponent: str, n_challenges: int, to_wait: Optional[Event] = None
    ):
        """Make the player send challenges to opponent.

        opponent must be a string, corresponding to the name of the player to challenge.

        n_challenges defines how many challenges will be sent.

        to_wait is an optional event that can be set, in which case it will be waited
        before launching challenges.

        :param opponent: Player username to challenge.
        :type opponent: str
        :param n_challenges: Number of battles that will be started
        :type n_challenges: int
        :param to_wait: Optional event to wait before launching challenges.
        :type to_wait: Event, optional.
        """
        # 调用异步方法处理线程化的协程
        await handle_threaded_coroutines(
            self._send_challenges(opponent, n_challenges, to_wait)
        )

    # 异步方法,实际发送挑战给对手
    async def _send_challenges(
        self, opponent: str, n_challenges: int, to_wait: Optional[Event] = None
    ): 
        # 等待玩家登录完成
        await self.ps_client.logged_in.wait()
        # 在发送挑战时记录事件已登录
        self.logger.info("Event logged in received in send challenge")

        # 如果有需要等待的事件,则等待
        if to_wait is not None:
            await to_wait.wait()

        # 记录开始时间
        start_time = perf_counter()

        # 发起指定数量的挑战
        for _ in range(n_challenges):
            await self.ps_client.challenge(opponent, self._format, self.next_team)
            await self._battle_semaphore.acquire()
        # 等待所有挑战结束
        await self._battle_count_queue.join()
        # 记录挑战完成所用时间
        self.logger.info(
            "Challenges (%d battles) finished in %fs",
            n_challenges,
            perf_counter() - start_time,
        )

    # 生成随机的队伍预览顺序
    def random_teampreview(self, battle: AbstractBattle) -> str:
        """Returns a random valid teampreview order for the given battle.

        :param battle: The battle.
        :type battle: AbstractBattle
        :return: The random teampreview order.
        :rtype: str
        """
        # 生成队伍成员列表
        members = list(range(1, len(battle.team)   1))
        # 随机打乱成员顺序
        random.shuffle(members)
        return "/team "   "".join([str(c) for c in members])

    # 重置玩家的内部战斗追踪器
    def reset_battles(self):
        """Resets the player's inner battle tracker."""
        # 遍历所有战斗,如果有未结束的战斗则抛出异常
        for battle in list(self._battles.values()):
            if not battle.finished:
                raise EnvironmentError(
                    "Can not reset player's battles while they are still running"
                )
        # 重置战斗追踪器
        self._battles = {}
    # 定义一个方法,用于生成给定战斗的队伍预览顺序
    def teampreview(self, battle: AbstractBattle) -> str:
        """Returns a teampreview order for the given battle.

        This order must be of the form /team TEAM, where TEAM is a string defining the
        team chosen by the player. Multiple formats are supported, among which '3461'
        and '3, 4, 6, 1' are correct and indicate leading with pokemon 3, with pokemons
        4, 6 and 1 in the back in single battles or leading with pokemons 3 and 4 with
        pokemons 6 and 1 in the back in double battles.

        Please refer to Pokemon Showdown's protocol documentation for more information.

        :param battle: The battle.
        :type battle: AbstractBattle
        :return: The teampreview order.
        :rtype: str
        """
        # 返回一个随机的队伍预览顺序
        return self.random_teampreview(battle)

    @staticmethod
    def create_order(
        order: Union[Move, Pokemon],
        mega: bool = False,
        z_move: bool = False,
        dynamax: bool = False,
        terastallize: bool = False,
        move_target: int = DoubleBattle.EMPTY_TARGET_POSITION,
    ) -> BattleOrder:
        """Formats an move order corresponding to the provided pokemon or move.

        :param order: Move to make or Pokemon to switch to.
        :type order: Move or Pokemon
        :param mega: Whether to mega evolve the pokemon, if a move is chosen.
        :type mega: bool
        :param z_move: Whether to make a zmove, if a move is chosen.
        :type z_move: bool
        :param dynamax: Whether to dynamax, if a move is chosen.
        :type dynamax: bool
        :param terastallize: Whether to terastallize, if a move is chosen.
        :type terastallize: bool
        :param move_target: Target Pokemon slot of a given move
        :type move_target: int
        :return: Formatted move order
        :rtype: str
        """
        # 格式化一个移动顺序,对应于提供的精灵或招式
        return BattleOrder(
            order,
            mega=mega,
            move_target=move_target,
            z_move=z_move,
            dynamax=dynamax,
            terastallize=terastallize,
        )

    @property
    def battles(self) -> Dict[str, AbstractBattle]:
        # 返回私有属性 _battles
        return self._battles

    @property
    def format(self) -> str:
        # 返回私有属性 _format
        return self._format

    @property
    def format_is_doubles(self) -> bool:
        # 检查比赛格式是否为双打
        format_lowercase = self._format.lower()
        return (
            "vgc" in format_lowercase
            or "double" in format_lowercase
            or "metronome" in format_lowercase
        )

    @property
    def n_finished_battles(self) -> int:
        # 返回已完成的比赛数量
        return len([None for b in self._battles.values() if b.finished])

    @property
    def n_lost_battles(self) -> int:
        # 返回已输掉的比赛数量
        return len([None for b in self._battles.values() if b.lost])

    @property
    def n_tied_battles(self) -> int:
        # 返回打平的比赛数量
        return self.n_finished_battles - self.n_lost_battles - self.n_won_battles

    @property
    def n_won_battles(self) -> int:
        # 返回已赢得的比赛数量
        return len([None for b in self._battles.values() if b.won])

    @property
    # 计算胜率,即已赢得的战斗数除以已完成的战斗数
    def win_rate(self) -> float:
        return self.n_won_battles / self.n_finished_battles

    # 返回日志记录器对象
    @property
    def logger(self) -> Logger:
        return self.ps_client.logger

    # 返回用户名
    @property
    def username(self) -> str:
        return self.ps_client.username

    # 返回下一个队伍的名称,如果存在的话
    @property
    def next_team(self) -> Optional[str]:
        # 如果存在队伍对象,则返回其生成的队伍名称
        if self._team:
            return self._team.yield_team()
        # 否则返回空值
        return None

.PokeLLMonpoke_envplayerrandom_player.py

代码语言:javascript复制
# 定义一个随机玩家基线的模块
"""This module defines a random players baseline
"""

# 从 poke_env.environment 模块中导入 AbstractBattle 类
from poke_env.environment import AbstractBattle
# 从 poke_env.player.battle_order 模块中导入 BattleOrder 类
from poke_env.player.battle_order import BattleOrder
# 从 poke_env.player.player 模块中导入 Player 类
from poke_env.player.player import Player

# 定义一个 RandomPlayer 类,继承自 Player 类
class RandomPlayer(Player):
    # 定义 choose_move 方法,接受一个 AbstractBattle 对象作为参数,返回一个 BattleOrder 对象
    def choose_move(self, battle: AbstractBattle) -> BattleOrder:
        # 调用 Player 类的 choose_random_move 方法选择一个随机的行动
        return self.choose_random_move(battle)

.PokeLLMonpoke_envplayerutils.py

代码语言:javascript复制
"""This module contains utility functions and objects related to Player classes.
"""

# 导入必要的模块和对象
import asyncio
import math
from concurrent.futures import Future
from typing import Dict, List, Optional, Tuple

from poke_env.concurrency import POKE_LOOP
from poke_env.data import to_id_str
from poke_env.player.baselines import MaxBasePowerPlayer, SimpleHeuristicsPlayer
from poke_env.player.player import Player
from poke_env.player.random_player import RandomPlayer

# 定义不同玩家类型的评估等级
_EVALUATION_RATINGS = {
    RandomPlayer: 1,
    MaxBasePowerPlayer: 7.665994,
    SimpleHeuristicsPlayer: 128.757145,
}

# 后台交叉评估函数,返回一个 Future 对象
def background_cross_evaluate(
    players: List[Player], n_challenges: int
) -> "Future[Dict[str, Dict[str, Optional[float]]]]":
    return asyncio.run_coroutine_threadsafe(
        cross_evaluate(players, n_challenges), POKE_LOOP
    )

# 异步交叉评估函数
async def cross_evaluate(
    players: List[Player], n_challenges: int
) -> Dict[str, Dict[str, Optional[float]]]:
    # 初始化结果字典
    results: Dict[str, Dict[str, Optional[float]]] = {
        p_1.username: {p_2.username: None for p_2 in players} for p_1 in players
    }
    # 遍历玩家列表,进行交叉评估
    for i, p_1 in enumerate(players):
        for j, p_2 in enumerate(players):
            if j <= i:
                continue
            # 并发发送挑战和接受挑战
            await asyncio.gather(
                p_1.send_challenges(
                    opponent=to_id_str(p_2.username),
                    n_challenges=n_challenges,
                    to_wait=p_2.ps_client.logged_in,
                ),
                p_2.accept_challenges(
                    opponent=to_id_str(p_1.username),
                    n_challenges=n_challenges,
                    packed_team=p_2.next_team,
                ),
            )
            # 更新结果字典中的胜率信息
            results[p_1.username][p_2.username] = p_1.win_rate
            results[p_2.username][p_1.username] = p_2.win_rate

            # 重置战斗状态
            p_1.reset_battles()
            p_2.reset_battles()
    return results

# 从结果中估计实力函数
def _estimate_strength_from_results(
    number_of_games: int, number_of_wins: int, opponent_rating: float
# 估计玩家实力基于游戏结果和对手评分
def evaluate_player(
    number_of_games: int,  # 游戏评估的数量
    number_of_wins: int,  # 赢得的评估游戏数量
    opponent_rating: float,  # 对手的评分
) -> Tuple[float, Tuple[float, float]]:  # 返回估计的玩家实力和95%置信区间的元组

    n, p = number_of_games, number_of_wins / number_of_games  # 计算游戏数量和胜率
    q = 1 - p  # 计算失败率

    if n * p * q < 9:  # 如果无法应用二项分布的正态近似
        raise ValueError(
            "The results obtained in evaluate_player are too extreme to obtain an "
            "accurate player evaluation. You can try to solve this issue by increasing"
            " the total number of battles. Obtained results: %d victories out of %d"
            " games." % (p * n, n)
        )

    estimate = opponent_rating * p / q  # 估计玩家实力
    error = (
        math.sqrt(n * p * q) / n * 1.96
    )  # 95%置信区间的正态分布

    lower_bound = max(0, p - error)  # 下限
    lower_bound = opponent_rating * lower_bound / (1 - lower_bound)  # 下限的评估

    higher_bound = min(1, p   error)  # 上限

    if higher_bound == 1:  # 如果上限为1
        higher_bound = math.inf  # 上限为正无穷大
    else:
        higher_bound = opponent_rating * higher_bound / (1 - higher_bound)  # 上限的评估

    return estimate, (lower_bound, higher_bound)  # 返回估计值和置信区间的元组


# 后台评估玩家
def background_evaluate_player(
    player: Player,  # 玩家对象
    n_battles: int = 1000,  # 战斗数量默认为1000
    n_placement_battles: int = 30,  # 放置战斗数量默认为30
) -> "Future[Tuple[float, Tuple[float, float]]]":  # 返回Future对象,包含估计值和置信区间的元组

    return asyncio.run_coroutine_threadsafe(
        evaluate_player(player, n_battles, n_placement_battles), POKE_LOOP
    )  # 在POKE_LOOP中异步运行评估玩家函数


# 异步评估玩家
async def evaluate_player(
    player: Player,  # 玩家对象
    n_battles: int = 1000,  # 战斗数量默认为1000
    n_placement_battles: int = 30,  # 放置战斗数量默认为30
# 估算玩家实力的函数
def estimate_player_strength(player: Player, n_battles: int, n_placement_battles: int) -> Tuple[float, Tuple[float, float]]:
    """Estimate player strength.

    This functions calculates an estimate of a player's strength, measured as its
    expected performance against a random opponent in a gen 8 random battle. The
    returned number can be interpreted as follows: a strength of k means that the
    probability of the player winning a gen 8 random battle against a random player is k
    times higher than the probability of the random player winning.

    The function returns a tuple containing the best guess based on the played games
    as well as a tuple describing a 95% confidence interval for that estimated strength.

    The actual evaluation can be performed against any baseline player for which an
    accurate strength estimate is available. This baseline is determined at the start of
    the process, by playing a limited number of placement battles and choosing the
    opponent closest to the player in terms of performance.

    :param player: The player to evaluate.
    :type player: Player
    :param n_battles: The total number of battle to perform, including placement
        battles.
    :type n_battles: int
    :param n_placement_battles: Number of placement battles to perform per baseline
        player.
    :type n_placement_battles: int
    :raises: ValueError if the results are too extreme to be interpreted.
    :raises: AssertionError if the player is not configured to play gen8battles or the
        selected number of games to play it too small.
    :return: A tuple containing the estimated player strength and a 95% confidence
        interval
    :rtype: tuple of float and tuple of floats
    """
    # 检查输入
    assert player.format == "gen8randombattle", (
        "Player %s can not be evaluated as its current format (%s) is not "
        "gen8randombattle." % (player, player.format)
    )
    # 如果放置战斗的数量乘以评估等级的数量大于总战斗数量的一半,则进行警告
    if n_placement_battles * len(_EVALUATION_RATINGS) > n_battles // 2:
        player.logger.warning(
            "Number of placement battles reduced from %d to %d due to limited number of"
            " battles (%d). A more accurate evaluation can be performed by increasing "
            "the total number of players.",
            n_placement_battles,
            n_battles // len(_EVALUATION_RATINGS) // 2,
            n_battles,
        )
        # 将放置战斗数量减少到总战斗数量的一半除以评估等级的数量
        n_placement_battles = n_battles // len(_EVALUATION_RATINGS) // 2

    # 断言放置战斗数量大于0,否则抛出异常
    assert (
        n_placement_battles > 0
    ), "Not enough battles to perform placement battles. Please increase the number of "
    "battles to perform to evaluate the player."

    # 初始化放置战斗
    baselines = [p(max_concurrent_battles=n_battles) for p in _EVALUATION_RATINGS]

    # 对每个基准玩家进行放置战斗
    for p in baselines:
        await p.battle_against(player, n_placement_battles)

    # 选择最佳对手进行评估
    best_opp = min(
        baselines, key=lambda p: (abs(p.win_rate - 0.5), -_EVALUATION_RATINGS[type(p)])
    )

    # 执行主要评估
    remaining_battles = n_battles - len(_EVALUATION_RATINGS) * n_placement_battles
    await best_opp.battle_against(player, remaining_battles)

    # 从结果中估计玩家的实力
    return _estimate_strength_from_results(
        best_opp.n_finished_battles,
        best_opp.n_lost_battles,
        _EVALUATION_RATINGS[type(best_opp)],
    )

.PokeLLMonpoke_envplayer__init__.py

代码语言:javascript复制
# 初始化 poke_env.player 模块
"""
# 导入并引入并发模块 POKE_LOOP
from poke_env.concurrency import POKE_LOOP
# 导入随机玩家、工具类
from poke_env.player import random_player, utils
# 导入基线玩家、简单启发式玩家
from poke_env.player.baselines import MaxBasePowerPlayer, SimpleHeuristicsPlayer
# 导入 GPT 玩家
from poke_env.player.gpt_player import LLMPlayer
# 导入 LLAMA 玩家
from poke_env.player.llama_player import LLAMAPlayer
# 导入战斗指令相关类
from poke_env.player.battle_order import (
    BattleOrder,
    DefaultBattleOrder,
    DoubleBattleOrder,
    ForfeitBattleOrder,
)
# 导入 OpenAI API 相关类
from poke_env.player.openai_api import ActType, ObsType, OpenAIGymEnv
# 导入玩家类
from poke_env.player.player import Player
# 导入随机玩家类
from poke_env.player.random_player import RandomPlayer
# 导入工具类中的函数
from poke_env.player.utils import (
    background_cross_evaluate,
    background_evaluate_player,
    cross_evaluate,
    evaluate_player,
)
# 导入 PS 客户端
from poke_env.ps_client import PSClient

# 导出的模块列表
__all__ = [
    "openai_api",
    "player",
    "random_player",
    "utils",
    "ActType",
    "ObsType",
    "ForfeitBattleOrder",
    "POKE_LOOP",
    "OpenAIGymEnv",
    "PSClient",
    "Player",
    "RandomPlayer",
    "cross_evaluate",
    "background_cross_evaluate",
    "background_evaluate_player",
    "evaluate_player",
    "BattleOrder",
    "DefaultBattleOrder",
    "DoubleBattleOrder",
    "MaxBasePowerPlayer",
    "SimpleHeuristicsPlayer",
]

.PokeLLMonpoke_envps_clientaccount_configuration.py

代码语言:javascript复制
# 该模块包含与玩家配置相关的对象
"""
# 导入必要的模块
from typing import Counter, NamedTuple, Optional

# 创建一个计数器对象,用于统计从玩家获取的配置信息
CONFIGURATION_FROM_PLAYER_COUNTER: Counter[str] = Counter()

# 定义一个命名元组对象,表示玩家配置。包含用户名和密码两个条目
class AccountConfiguration(NamedTuple):
    """Player configuration object. Represented with a tuple with two entries: username and
    password."""

    # 用户名
    username: str
    # 密码(可选)
    password: Optional[str]

.PokeLLMonpoke_envps_clientps_client.py

代码语言:javascript复制
"""
这个模块定义了一个与 Showdown 服务器通信的基类。
"""
# 导入必要的库
import asyncio
import json
import logging
from asyncio import CancelledError, Event, Lock, create_task, sleep
from logging import Logger
from time import perf_counter
from typing import Any, List, Optional, Set

import requests
import websockets.client as ws
from websockets.exceptions import ConnectionClosedOK

from poke_env.concurrency import (
    POKE_LOOP,
    create_in_poke_loop,
    handle_threaded_coroutines,
)
from poke_env.exceptions import ShowdownException
from poke_env.ps_client.account_configuration import AccountConfiguration
from poke_env.ps_client.server_configuration import ServerConfiguration

# 定义 Pokemon Showdown 客户端类
class PSClient:
    """
    Pokemon Showdown 客户端。

    负责与 Showdown 服务器通信。还实现了一些用于基本任务的高级方法,如更改头像和低级消息处理。
    """

    def __init__(
        self,
        account_configuration: AccountConfiguration,
        *,
        avatar: Optional[int] = None,
        log_level: Optional[int] = None,
        server_configuration: ServerConfiguration,
        start_listening: bool = True,
        ping_interval: Optional[float] = 20.0,
        ping_timeout: Optional[float] = 20.0,
        """
        :param account_configuration: Account configuration.
        :type account_configuration: AccountConfiguration
        :param avatar: Player avatar id. Optional.
        :type avatar: int, optional
        :param log_level: The player's logger level.
        :type log_level: int. Defaults to logging's default level.
        :param server_configuration: Server configuration.
        :type server_configuration: ServerConfiguration
        :param start_listening: Whether to start listening to the server. Defaults to
            True.
        :type start_listening: bool
        :param ping_interval: How long between keepalive pings (Important for backend
            websockets). If None, disables keepalive entirely.
        :type ping_interval: float, optional
        :param ping_timeout: How long to wait for a timeout of a specific ping
            (important for backend websockets.
            Increase only if timeouts occur during runtime).
            If None pings will never time out.
        :type ping_timeout: float, optional
        """
        # 初始化活动任务集合
        self._active_tasks: Set[Any] = set()
        # 设置 ping_interval 和 ping_timeout
        self._ping_interval = ping_interval
        self._ping_timeout = ping_timeout

        # 设置服务器配置和账户配置
        self._server_configuration = server_configuration
        self._account_configuration = account_configuration

        # 设置玩家头像
        self._avatar = avatar

        # 创建登录事件和发送锁
        self._logged_in: Event = create_in_poke_loop(Event)
        self._sending_lock = create_in_poke_loop(Lock)

        # 初始化 websocket 和日志记录器
        self.websocket: ws.WebSocketClientProtocol
        self._logger: Logger = self._create_logger(log_level)

        # 如果需要开始监听服务器,则在 POKE_LOOP 线程安全地运行监听协程
        if start_listening:
            self._listening_coroutine = asyncio.run_coroutine_threadsafe(
                self.listen(), POKE_LOOP
            )
    # 异步方法,接受挑战,发送接受挑战的消息给指定用户名
    async def accept_challenge(self, username: str, packed_team: Optional[str]):
        # 断言当前用户已登录
        assert (
            self.logged_in.is_set()
        ), f"Expected player {self.username} to be logged in."
        # 设置队伍
        await self.set_team(packed_team)
        # 发送消息给指定用户名,接受挑战
        await self.send_message("/accept %s" % username)

    # 异步方法,发起挑战,发送挑战消息给指定用户名和格式
    async def challenge(self, username: str, format_: str, packed_team: Optional[str]):
        # 断言当前用户已登录
        assert (
            self.logged_in.is_set()
        ), f"Expected player {self.username} to be logged in."
        # 设置队伍
        await self.set_team(packed_team)
        # 发送挑战消息给指定用户名和格式
        await self.send_message(f"/challenge {username}, {format_}")

    # 创建日志记录器
    def _create_logger(self, log_level: Optional[int]) -> Logger:
        """Creates a logger for the client.

        Returns a Logger displaying asctime and the account's username before messages.

        :param log_level: The logger's level.
        :type log_level: int
        :return: The logger.
        :rtype: Logger
        """
        # 创建以用户名为名称的日志记录器
        logger = logging.getLogger(self.username)

        # 创建流处理器
        stream_handler = logging.StreamHandler()
        # 如果有指定日志级别,设置日志级别
        if log_level is not None:
            logger.setLevel(log_level)

        # 设置日志格式
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        stream_handler.setFormatter(formatter)

        # 添加流处理器到日志记录器
        logger.addHandler(stream_handler)
        return logger

    # 异步方法,停止监听
    async def _stop_listening(self):
        # 关闭 WebSocket 连接
        await self.websocket.close()

    # 异步方法,更改玩家的头像
    async def change_avatar(self, avatar_id: Optional[int]):
        """Changes the player's avatar.

        :param avatar_id: The new avatar id. If None, nothing happens.
        :type avatar_id: int
        """
        # 等待用户登录
        await self.wait_for_login()
        # 如果有指定头像 id,发送更改头像的消息
        if avatar_id is not None:
            await self.send_message(f"/avatar {avatar_id}")
    # 异步方法,用于监听 showdown websocket 并分发消息以进行处理
    async def listen(self):
        # 记录日志,表示开始监听 showdown websocket
        self.logger.info("Starting listening to showdown websocket")
        try:
            # 使用 async with 连接到 websocket
            async with ws.connect(
                self.websocket_url,
                max_queue=None,
                ping_interval=self._ping_interval,
                ping_timeout=self._ping_timeout,
            ) as websocket:
                # 将 websocket 赋值给实例变量
                self.websocket = websocket
                # 遍历 websocket 接收到的消息
                async for message in websocket:
                    # 记录接收到的消息
                    self.logger.info("33[92m33[1m<<<33[0m %s", message)
                    # 创建任务来处理接收到的消息
                    task = create_task(self._handle_message(str(message)))
                    # 将任务添加到活动任务集合中
                    self._active_tasks.add(task)
                    # 添加任务完成时的回调函数,从活动任务集合中移除任务
                    task.add_done_callback(self._active_tasks.discard)

        except ConnectionClosedOK:
            # 记录警告日志,表示 websocket 连接已关闭
            self.logger.warning(
                "Websocket connection with %s closed", self.websocket_url
            )
        except (CancelledError, RuntimeError) as e:
            # 记录严重错误日志,表示监听被中断
            self.logger.critical("Listen interrupted by %s", e)
        except Exception as e:
            # 记录异常日志
            self.logger.exception(e)
    # 异步方法,用于登录玩家,需要传入分割后的消息列表
    async def log_in(self, split_message: List[str]):
        """Log the player with specified username and password.

        Split message contains information sent by the server. This information is
        necessary to log in.

        :param split_message: Message received from the server that triggers logging in.
        :type split_message: List[str]
        """
        # 如果存在账户密码
        if self.account_configuration.password:
            # 发送登录请求,包括用户名、密码和服务器信息
            log_in_request = requests.post(
                self.server_configuration.authentication_url,
                data={
                    "act": "login",
                    "name": self.account_configuration.username,
                    "pass": self.account_configuration.password,
                    "challstr": split_message[2]   "|"   split_message[3],
                },
            )
            # 记录发送认证请求的信息
            self.logger.info("Sending authentication request")
            # 从返回的数据中获取认证信息
            assertion = json.loads(log_in_request.text[1:])["assertion"]
        else:
            # 如果不存在账户密码,则跳过认证请求
            self.logger.info("Bypassing authentication request")
            assertion = ""

        # 发送消息,包括用户名和认证信息
        await self.send_message(f"/trn {self.username},0,{assertion}")

        # 更改头像
        await self.change_avatar(self._avatar)

    # 异步方法,用于搜索排位赛游戏,需要传入比赛格式和打包的队伍信息
    async def search_ladder_game(self, format_: str, packed_team: Optional[str]):
        # 设置队伍信息
        await self.set_team(packed_team)
        # 发送搜索游戏消息,包括比赛格式
        await self.send_message(f"/search {format_}")

    # 异步方法,用于发送消息,可以指定房间和第二条消息
    async def send_message(
        self, message: str, room: str = "", message_2: Optional[str] = None
    ):
        """Sends a message to the specified room.

        `message_2` can be used to send a sequence of length 2.

        :param message: The message to send.
        :type message: str
        :param room: The room to which the message should be sent.
        :type room: str
        :param message_2: Second element of the sequence to be sent. Optional.
        :type message_2: str, optional
        """
        # 如果存在第二个消息,将消息和房间名以及第二个消息用竖线连接起来
        if message_2:
            to_send = "|".join([room, message, message_2])
        else:
            to_send = "|".join([room, message])
        # 发送消息
        await self.websocket.send(to_send)

    async def set_team(self, packed_team: Optional[str]):
        # 如果存在打包的团队信息,发送消息 "/utm {packed_team}"
        if packed_team:
            await self.send_message(f"/utm {packed_team}")
        else:
            # 否则发送消息 "/utm null"
            await self.send_message("/utm null")

    async def stop_listening(self):
        # 停止监听
        await handle_threaded_coroutines(self._stop_listening())

    async def wait_for_login(self, checking_interval: float = 0.001, wait_for: int = 5):
        start = perf_counter()
        # 在指定时间内等待登录
        while perf_counter() - start < wait_for:
            await sleep(checking_interval)
            if self.logged_in:
                return
        # 如果超时仍未登录,则抛出异常
        assert self.logged_in, f"Expected player {self.username} to be logged in."

    @property
    def account_configuration(self) -> AccountConfiguration:
        """The client's account configuration.

        :return: The client's account configuration.
        :rtype: AccountConfiguration
        """
        # 返回客户端的账户配置
        return self._account_configuration

    @property
    def logged_in(self) -> Event:
        """Event object associated with user login.

        :return: The logged-in event
        :rtype: Event
        """
        # 返回与用户登录相关的事件对象
        return self._logged_in

    @property
    def logger(self) -> Logger:
        """Logger associated with the player.

        :return: The logger.
        :rtype: Logger
        """
        # 返回与玩家相关的日志记录器
        return self._logger

    @property
    def server_configuration(self) -> ServerConfiguration:
        """获取客户端的服务器配置信息。

        :return: 客户端的服务器配置信息。
        :rtype: ServerConfiguration
        """
        return self._server_configuration

    @property
    def username(self) -> str:
        """玩家的用户名。

        :return: 玩家的用户名。
        :rtype: str
        """
        return self.account_configuration.username

    @property
    def websocket_url(self) -> str:
        """WebSocket 的 URL。

        它是从服务器 URL 派生而来。

        :return: WebSocket 的 URL。
        :rtype: str
        """
        return f"ws://{self.server_configuration.server_url}/showdown/websocket"

.PokeLLMonpoke_envps_clientserver_configuration.py

代码语言:javascript复制
# 该模块包含与服务器配置相关的对象
from typing import NamedTuple

# 定义一个名为ServerConfiguration的命名元组,表示服务器配置对象,包含两个条目:服务器URL和认证端点URL
class ServerConfiguration(NamedTuple):
    server_url: str  # 服务器URL
    authentication_url: str  # 认证端点URL

# 使用本地主机和smogon的认证端点创建一个名为LocalhostServerConfiguration的ServerConfiguration对象
LocalhostServerConfiguration = ServerConfiguration(
    "localhost:8000", "https://play.pokemonshowdown.com/action.php?"
)

# 使用smogon的服务器和认证端点创建一个名为ShowdownServerConfiguration的ServerConfiguration对象
ShowdownServerConfiguration = ServerConfiguration(
    "sim.smogon.com:8000", "https://play.pokemonshowdown.com/action.php?"
)

.PokeLLMonpoke_envps_client__init__.py

代码语言:javascript复制
# 导入所需的模块和类
from poke_env.ps_client.account_configuration import AccountConfiguration
from poke_env.ps_client.ps_client import PSClient
from poke_env.ps_client.server_configuration import (
    LocalhostServerConfiguration,
    ServerConfiguration,
    ShowdownServerConfiguration,
)

# 定义 __all__ 列表,包含需要导出的模块和类
__all__ = [
    "AccountConfiguration",
    "LocalhostServerConfiguration",
    "PSClient",
    "ServerConfiguration",
    "ShowdownServerConfiguration",
]

.PokeLLMonpoke_envstats.py

代码语言:javascript复制
# 该模块包含与统计相关的实用函数和对象

import math
from typing import List

from poke_env.data import GenData

# 定义将统计名称映射到索引的字典
STATS_TO_IDX = {
    "hp": 0,
    "atk": 1,
    "def": 2,
    "spa": 3,
    "spd": 4,
    "spe": 5,
    "satk": 3,
    "sdef": 4,
}

# 计算原始统计值的函数
def _raw_stat(base: int, ev: int, iv: int, level: int, nature_multiplier: float) -> int:
    """Converts to raw stat
    :param base: the base stat
    :param ev: Stat Effort Value (EV)
    :param iv: Stat Individual Values (IV)
    :param level: pokemon level
    :param nature_multiplier: stat multiplier of the nature (either 0.9, 1 or 1.1)
    :return: the raw stat
    """
    s = math.floor(
        (5   math.floor((math.floor(ev / 4)   iv   2 * base) * level / 100))
        * nature_multiplier
    )
    return int(s)

# 计算原始 HP 值的函数
def _raw_hp(base: int, ev: int, iv: int, level: int) -> int:
    """Converts to raw hp
    :param base: the base stat
    :param ev: HP Effort Value (EV)
    :param iv: HP Individual Value (IV)
    :param level: pokemon level
    :return: the raw hp
    """
    s = math.floor((math.floor(ev / 4)   iv   2 * base) * level / 100)   level   10
    return int(s)

# 计算原始统计值的函数
def compute_raw_stats(
    species: str, evs: List[int], ivs: List[int], level: int, nature: str, data: GenData
) -> List[int]:
    """Converts to raw stats
    :param species: pokemon species
    :param evs: list of pokemon's EVs (size 6)
    :param ivs: list of pokemon's IVs (size 6)
    :param level: pokemon level
    :param nature: pokemon nature
    :return: the raw stats in order [hp, atk, def, spa, spd, spe]
    """

    assert len(evs) == 6
    assert len(ivs) == 6

    base_stats = [0] * 6
    # 从数据中获取种类的基础统计值
    for stat, value in data.pokedex[species]["baseStats"].items():
        base_stats[STATS_TO_IDX[stat]] = value

    nature_multiplier = [1.0] * 6
    # 从数据中获取自然属性的统计值倍增器
    for stat, multiplier in data.natures[nature].items():
        if stat != "num":
            nature_multiplier[STATS_TO_IDX[stat]] = multiplier

    raw_stats = [0] * 6
    # 如果精灵种类是"shedinja",则将生命值设为1
    if species == "shedinja":
        raw_stats[0] = 1
    # 否则,根据基础状态值、努力值、个体值和等级计算生命值
    else:
        raw_stats[0] = _raw_hp(base_stats[0], evs[0], ivs[0], level)

    # 遍历除生命值外的其他五项状态值
    for i in range(1, 6):
        # 根据基础状态值、努力值、个体值、等级和性格系数计算状态值
        raw_stats[i] = _raw_stat(
            base_stats[i], evs[i], ivs[i], level, nature_multiplier[i]
        )

    # 返回计算后的状态值列表
    return raw_stats

.PokeLLMonpoke_envteambuilderconstant_teambuilder.py

代码语言:javascript复制
"""This module defines the ConstantTeambuilder class, which is a subclass of
ShowdownTeamBuilder that yields a constant team.
"""
# 导入Teambuilder类
from poke_env.teambuilder.teambuilder import Teambuilder

# 定义ConstantTeambuilder类,继承自Teambuilder类
class ConstantTeambuilder(Teambuilder):
    # 初始化方法,接受一个team字符串作为参数
    def __init__(self, team: str):
        # 如果team字符串中包含"|",则直接将其赋值给converted_team属性
        if "|" in team:
            self.converted_team = team
        # 如果team字符串中不包含"|",则解析team字符串并将解析后的结果赋值给converted_team属性
        else:
            mons = self.parse_showdown_team(team)
            self.converted_team = self.join_team(mons)

    # 返回converted_team属性的值
    def yield_team(self) -> str:
        return self.converted_team

.PokeLLMonpoke_envteambuilderteambuilder.py

代码语言:javascript复制
"""This module defines the Teambuilder abstract class, which represents objects yielding
Pokemon Showdown teams in the context of communicating with Pokemon Showdown.
"""
# 导入所需的模块
from abc import ABC, abstractmethod
from typing import List

from poke_env.stats import STATS_TO_IDX
from poke_env.teambuilder.teambuilder_pokemon import TeambuilderPokemon

# 定义 Teambuilder 抽象类
class Teambuilder(ABC):
    """Teambuilder objects allow the generation of teams by Player instances.

    They must implement the yield_team method, which must return a valid
    packed-formatted showdown team every time it is called.

    This format is a custom format described in Pokemon's showdown protocol
    documentation:
    https://github.com/smogon/pokemon-showdown/blob/master/PROTOCOL.md#team-format

    This class also implements a helper function to convert teams from the classical
    showdown team text format into the packed-format.
    """

    @abstractmethod
    def yield_team(self) -> str:
        """Returns a packed-format team."""

    @staticmethod
    @staticmethod
    def join_team(team: List[TeambuilderPokemon]) -> str:
        """Converts a list of TeambuilderPokemon objects into the corresponding packed
        showdown team format.

        :param team: The list of TeambuilderPokemon objects that form the team.
        :type team: list of TeambuilderPokemon
        :return: The formatted team string.
        :rtype: str"""
        # 将给定的 TeambuilderPokemon 对象列表转换为对应的打包格式的 showdown 队伍格式
        return "]".join([mon.formatted for mon in team])

0 人点赞