sc2.client module
from s2clientprotocol import ( sc2api_pb2 as sc_pb, common_pb2 as common_pb, query_pb2 as query_pb, debug_pb2 as debug_pb, raw_pb2 as raw_pb, ) import logging from sc2.ids.ability_id import AbilityId from sc2.ids.unit_typeid import UnitTypeId from sc2.renderer import Renderer logger = logging.getLogger(__name__) from .cache import method_cache_forever from .protocol import Protocol, ProtocolError from .game_info import GameInfo from .game_data import GameData, AbilityData from .data import Status, Result from .data import Race, ActionResult, ChatChannel from .action import combine_actions from .position import Point2, Point3 from .unit import Unit from .units import Units from typing import List, Dict, Set, Tuple, Any, Optional, Union # mypy type checking class Client(Protocol): def __init__(self, ws): super().__init__(ws) self.game_step = 8 self._player_id = None self._game_result = None self._debug_texts = list() self._debug_lines = list() self._debug_boxes = list() self._debug_spheres = list() self._renderer = None @property def in_game(self): return self._status == Status.in_game async def join_game(self, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None): ifopts = sc_pb.InterfaceOptions(raw=True, score=True) if rgb_render_config: assert isinstance(rgb_render_config, dict) assert 'window_size' in rgb_render_config and 'minimap_size' in rgb_render_config window_size = rgb_render_config['window_size'] minimap_size = rgb_render_config['minimap_size'] self._renderer = Renderer(self, window_size, minimap_size) map_width, map_height = window_size minimap_width, minimap_height = minimap_size ifopts.render.resolution.x = map_width ifopts.render.resolution.y = map_height ifopts.render.minimap_resolution.x = minimap_width ifopts.render.minimap_resolution.y = minimap_height if race is None: assert isinstance(observed_player_id, int) # join as observer req = sc_pb.RequestJoinGame( observed_player_id=observed_player_id, options=ifopts ) else: assert isinstance(race, Race) req = sc_pb.RequestJoinGame( race=race.value, options=ifopts ) if portconfig: req.shared_port = portconfig.shared req.server_ports.game_port = portconfig.server[0] req.server_ports.base_port = portconfig.server[1] for ppc in portconfig.players: p = req.client_ports.add() p.game_port = ppc[0] p.base_port = ppc[1] result = await self._execute(join_game=req) self._game_result = None self._player_id = result.join_game.player_id return result.join_game.player_id async def leave(self): """ You can use 'await self._client.leave()' to surrender midst game. """ is_resign = self._game_result is None if is_resign: # For all clients that can leave, result of leaving the game either # loss, or the client will ignore the result self._game_result = {self._player_id: Result.Defeat} try: await self._execute(leave_game=sc_pb.RequestLeaveGame()) except ProtocolError: if is_resign: raise async def save_replay(self, path): logger.debug(f"Requesting replay from server") result = await self._execute(save_replay=sc_pb.RequestSaveReplay()) with open(path, "wb") as f: f.write(result.save_replay.data) logger.info(f"Saved replay to {path}") async def observation(self): result = await self._execute(observation=sc_pb.RequestObservation()) if not self.in_game or result.observation.player_result: # Sometimes game ends one step before results are available if not result.observation.player_result: result = await self._execute(observation=sc_pb.RequestObservation()) assert result.observation.player_result player_id_to_result = {} for pr in result.observation.player_result: player_id_to_result[pr.player_id] = Result(pr.result) self._game_result = player_id_to_result # if render_data is available, then RGB rendering was requested if self._renderer and result.observation.observation.HasField('render_data'): await self._renderer.render(result.observation) return result async def step(self): """ EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second """ result = await self._execute(step=sc_pb.RequestStep(count=self.game_step)) return result async def get_game_data(self) -> GameData: result = await self._execute(data=sc_pb.RequestData( ability_id=True, unit_type_id=True, upgrade_id=True )) return GameData(result.data) async def get_game_info(self) -> GameInfo: result = await self._execute(game_info=sc_pb.RequestGameInfo()) return GameInfo(result.game_info) async def actions(self, actions, game_data, return_successes=False): if not isinstance(actions, list): res = await self.actions([actions], game_data, return_successes) if res: return res[0] else: return None else: actions = combine_actions(actions) res = await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action(action_raw=a) for a in actions] )) res = [ActionResult(r) for r in res.action.result] if return_successes: return res else: return [r for r in res if r != ActionResult.Success] async def query_pathing(self, start: Union[Unit, Point2, Point3], end: Union[Point2, Point3]) -> Optional[Union[int, float]]: """ Caution: returns 0 when path not found """ assert isinstance(start, (Point2, Unit)) assert isinstance(end, Point2) if isinstance(start, Point2): result = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( start_pos=common_pb.Point2D(x=start.x, y=start.y), end_pos=common_pb.Point2D(x=end.x, y=end.y) )] )) else: result = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( unit_tag=start.tag, end_pos=common_pb.Point2D(x=end.x, y=end.y) )] )) distance = float(result.query.pathing[0].distance) if distance <= 0.0: return None return distance async def query_pathings(self, zipped_list: List[List[Union[Unit, Point2, Point3]]]) -> List[Union[float, int]]: """ Usage: await self.query_pathings([[unit1, target2], [unit2, target2]]) -> returns [distance1, distance2] Caution: returns 0 when path not found Might merge this function with the function above """ assert zipped_list assert isinstance(zipped_list, list) assert isinstance(zipped_list[0], list) assert len(zipped_list[0]) == 2 assert isinstance(zipped_list[0][0], (Point2, Unit)) assert isinstance(zipped_list[0][1], Point2) if isinstance(zipped_list[0][0], Point2): results = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( start_pos=common_pb.Point2D(x=p1.x, y=p1.y), end_pos=common_pb.Point2D(x=p2.x, y=p2.y) ) for p1, p2 in zipped_list] )) else: results = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( unit_tag=p1.tag, end_pos=common_pb.Point2D(x=p2.x, y=p2.y) ) for p1, p2 in zipped_list] )) results = [float(d.distance) for d in results.query.pathing] return results async def query_building_placement(self, ability: AbilityId, positions: List[Union[Unit, Point2, Point3]], ignore_resources: bool=True) -> List[ActionResult]: assert isinstance(ability, AbilityData) result = await self._execute(query=query_pb.RequestQuery( placements=[query_pb.RequestQueryBuildingPlacement( ability_id=ability.id.value, target_pos=common_pb.Point2D(x=position.x, y=position.y) ) for position in positions], ignore_resource_requirements=ignore_resources )) return [ActionResult(p.result) for p in result.query.placements] async def query_available_abilities(self, units: Union[List[Unit], "Units"], ignore_resource_requirements: bool=False) -> List[List[AbilityId]]: """ Query abilities of multiple units """ if not isinstance(units, list): """ Deprecated, accepting a single unit may be removed in the future, query a list of units instead """ assert isinstance(units, Unit) units = [units] input_was_a_list = False else: input_was_a_list = True assert units result = await self._execute(query=query_pb.RequestQuery( abilities=[query_pb.RequestQueryAvailableAbilities( unit_tag=unit.tag) for unit in units], ignore_resource_requirements=ignore_resource_requirements) ) """ Fix for bots that only query a single unit """ if not input_was_a_list: return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities][0] return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities] async def chat_send(self, message: str, team_only: bool): """ Writes a message to the chat """ ch = ChatChannel.Team if team_only else ChatChannel.Broadcast r = await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action(action_chat=sc_pb.ActionChat( channel=ch.value, message=message ))] )) async def debug_create_unit(self, unit_spawn_commands: List[List[Union[UnitTypeId, int, Point2, Point3]]]): """ Usage example (will spawn 1 marine in the center of the map for player ID 1): await self._client.debug_create_unit([[UnitTypeId.MARINE, 1, self._game_info.map_center, 1]]) """ assert isinstance(unit_spawn_commands, list) assert unit_spawn_commands assert isinstance(unit_spawn_commands[0], list) assert len(unit_spawn_commands[0]) == 4 assert isinstance(unit_spawn_commands[0][0], UnitTypeId) assert unit_spawn_commands[0][1] > 0 # careful, in realtime=True this function may create more units assert isinstance(unit_spawn_commands[0][2], (Point2, Point3)) assert 1 <= unit_spawn_commands[0][3] <= 2 await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(create_unit=debug_pb.DebugCreateUnit( unit_type=unit_type.value, owner=owner_id, pos=common_pb.Point2D(x=position.x, y=position.y), quantity=amount_of_units )) for unit_type, amount_of_units, position, owner_id in unit_spawn_commands] )) async def debug_kill_unit(self, unit_tags: Union[Units, List[int], Set[int]]): if isinstance(unit_tags, Units): unit_tags = unit_tags.tags assert unit_tags await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(kill_unit=debug_pb.DebugKillUnit( tag=unit_tags ))] )) async def move_camera(self, position: Union[Unit, Point2, Point3]): """ Moves camera to the target position """ assert isinstance(position, (Unit, Point2, Point3)) if isinstance(position, Unit): position = position.position await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action( action_raw=raw_pb.ActionRaw( camera_move=raw_pb.ActionRawCameraMove( center_world_space=common_pb.Point(x=position.x, y=position.y) ) ) )] )) async def move_camera_spatial(self, position: Union[Point2, Point3]): """ Moves camera to the target position using the spatial aciton interface """ from s2clientprotocol import spatial_pb2 as spatial_pb assert isinstance(position, (Point2, Point3)) action = sc_pb.Action( action_render=spatial_pb.ActionSpatial( camera_move=spatial_pb.ActionSpatialCameraMove( center_minimap=common_pb.PointI(x=position.x, y=position.y) ) ) ) await self._execute(action=sc_pb.RequestAction(actions=[action])) async def debug_text(self, texts: Union[str, list], positions: Union[list, set], color=(0, 255, 0), size_px=16): """ Deprecated, may be removed soon """ if isinstance(positions, (set, list)): if not positions: return if isinstance(texts, str): texts = [texts] * len(positions) assert len(texts) == len(positions) await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(draw=debug_pb.DebugDraw( text=[debug_pb.DebugText( text=t, color=debug_pb.Color(r=color[0], g=color[1], b=color[2]), world_pos=common_pb.Point(x=p.x, y=p.y, z=getattr(p, "z", 10)), size=size_px ) for t, p in zip(texts, positions)] ))] )) else: await self.debug_text([texts], [positions], color) def debug_text_simple(self, text: str): """ Draws a text in the top left corner of the screen (up to a max of 6 messages it seems). Don't forget to add 'await self._client.send_debug'. """ self._debug_texts.append(self.to_debug_message(text)) def debug_text_screen(self, text: str, pos: Union[Point2, Point3, tuple, list], color=None, size: int=8): """ Draws a text on the screen with coordinates 0 <= x, y <= 1. Don't forget to add 'await self._client.send_debug'. """ assert len(pos) >= 2 assert 0 <= pos[0] <= 1 assert 0 <= pos[1] <= 1 pos = Point2((pos[0], pos[1])) self._debug_texts.append(self.to_debug_message(text, color, pos, size)) def debug_text_2d(self, text: str, pos: Union[Point2, Point3, tuple, list], color=None, size: int=8): return self.debug_text_screen(text, pos, color, size) def debug_text_world(self, text: str, pos: Union[Unit, Point2, Point3], color=None, size: int=8): """ Draws a text at Point3 position. Don't forget to add 'await self._client.send_debug'. To grab a unit's 3d position, use unit.position3d Usually the Z value of a Point3 is between 8 and 14 (except for flying units) """ if isinstance(pos, Point2) and not isinstance(pos, Point3): # a Point3 is also a Point2 pos = Point3((pos.x, pos.y, 0)) self._debug_texts.append(self.to_debug_message(text, color, pos, size)) def debug_text_3d(self, text: str, pos: Union[Unit, Point2, Point3], color=None, size: int=8): return self.debug_text_world(text, pos, color, size) def debug_line_out(self, p0: Union[Unit, Point2, Point3], p1: Union[Unit, Point2, Point3], color=None): """ Draws a line from p0 to p1. Don't forget to add 'await self._client.send_debug'. """ self._debug_lines.append(debug_pb.DebugLine( line=debug_pb.Line(p0=self.to_debug_point(p0), p1=self.to_debug_point(p1)), color=self.to_debug_color(color))) def debug_box_out(self, p_min: Union[Unit, Point2, Point3], p_max: Union[Unit, Point2, Point3], color=None): """ Draws a box with p_min and p_max as corners. Don't forget to add 'await self._client.send_debug'. """ self._debug_boxes.append(debug_pb.DebugBox( min=self.to_debug_point(p_min), max=self.to_debug_point(p_max), color=self.to_debug_color(color) )) def debug_sphere_out(self, p: Union[Unit, Point2, Point3], r: Union[int, float], color=None): """ Draws a sphere at point p with radius r. Don't forget to add 'await self._client.send_debug'. """ self._debug_spheres.append(debug_pb.DebugSphere( p=self.to_debug_point(p), r=r, color=self.to_debug_color(color) )) async def send_debug(self): """ Sends the debug draw execution. Put this after your debug creation functions. """ await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(draw=debug_pb.DebugDraw( text=self._debug_texts if self._debug_texts else None, lines=self._debug_lines if self._debug_lines else None, boxes=self._debug_boxes if self._debug_boxes else None, spheres=self._debug_spheres if self._debug_spheres else None ))])) self._debug_texts.clear() self._debug_lines.clear() self._debug_boxes.clear() self._debug_spheres.clear() def to_debug_color(self, color): """ Helper function for color conversion """ if color is None: return debug_pb.Color(r=255, g=255, b=255) else: r = getattr(color, "r", getattr(color, "x", 255)) g = getattr(color, "g", getattr(color, "y", 255)) b = getattr(color, "b", getattr(color, "z", 255)) if max(r, g, b) <= 1: r *= 255 g *= 255 b *= 255 return debug_pb.Color(r=int(r), g=int(g), b=int(b)) def to_debug_point(self, point: Union[Unit, Point2, Point3]) -> common_pb.Point: """ Helper function for point conversion """ if isinstance(point, Unit): point = point.position3d return common_pb.Point(x=point.x, y=point.y, z=getattr(point, "z", 0)) def to_debug_message(self, text: str, color=None, pos: Optional[Union[Point2, Point3]]=None, size: int=8) -> debug_pb.DebugText: """ Helper function to create debug texts """ color = self.to_debug_color(color) pt3d = self.to_debug_point(pos) if isinstance(pos, Point3) else None virtual_pos = self.to_debug_point(pos) if not isinstance(pos, Point3) else None return debug_pb.DebugText( color=color, text=text, virtual_pos=virtual_pos, world_pos=pt3d, size=size )
Module variables
var logger
Classes
class Client
class Client(Protocol): def __init__(self, ws): super().__init__(ws) self.game_step = 8 self._player_id = None self._game_result = None self._debug_texts = list() self._debug_lines = list() self._debug_boxes = list() self._debug_spheres = list() self._renderer = None @property def in_game(self): return self._status == Status.in_game async def join_game(self, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None): ifopts = sc_pb.InterfaceOptions(raw=True, score=True) if rgb_render_config: assert isinstance(rgb_render_config, dict) assert 'window_size' in rgb_render_config and 'minimap_size' in rgb_render_config window_size = rgb_render_config['window_size'] minimap_size = rgb_render_config['minimap_size'] self._renderer = Renderer(self, window_size, minimap_size) map_width, map_height = window_size minimap_width, minimap_height = minimap_size ifopts.render.resolution.x = map_width ifopts.render.resolution.y = map_height ifopts.render.minimap_resolution.x = minimap_width ifopts.render.minimap_resolution.y = minimap_height if race is None: assert isinstance(observed_player_id, int) # join as observer req = sc_pb.RequestJoinGame( observed_player_id=observed_player_id, options=ifopts ) else: assert isinstance(race, Race) req = sc_pb.RequestJoinGame( race=race.value, options=ifopts ) if portconfig: req.shared_port = portconfig.shared req.server_ports.game_port = portconfig.server[0] req.server_ports.base_port = portconfig.server[1] for ppc in portconfig.players: p = req.client_ports.add() p.game_port = ppc[0] p.base_port = ppc[1] result = await self._execute(join_game=req) self._game_result = None self._player_id = result.join_game.player_id return result.join_game.player_id async def leave(self): """ You can use 'await self._client.leave()' to surrender midst game. """ is_resign = self._game_result is None if is_resign: # For all clients that can leave, result of leaving the game either # loss, or the client will ignore the result self._game_result = {self._player_id: Result.Defeat} try: await self._execute(leave_game=sc_pb.RequestLeaveGame()) except ProtocolError: if is_resign: raise async def save_replay(self, path): logger.debug(f"Requesting replay from server") result = await self._execute(save_replay=sc_pb.RequestSaveReplay()) with open(path, "wb") as f: f.write(result.save_replay.data) logger.info(f"Saved replay to {path}") async def observation(self): result = await self._execute(observation=sc_pb.RequestObservation()) if not self.in_game or result.observation.player_result: # Sometimes game ends one step before results are available if not result.observation.player_result: result = await self._execute(observation=sc_pb.RequestObservation()) assert result.observation.player_result player_id_to_result = {} for pr in result.observation.player_result: player_id_to_result[pr.player_id] = Result(pr.result) self._game_result = player_id_to_result # if render_data is available, then RGB rendering was requested if self._renderer and result.observation.observation.HasField('render_data'): await self._renderer.render(result.observation) return result async def step(self): """ EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second """ result = await self._execute(step=sc_pb.RequestStep(count=self.game_step)) return result async def get_game_data(self) -> GameData: result = await self._execute(data=sc_pb.RequestData( ability_id=True, unit_type_id=True, upgrade_id=True )) return GameData(result.data) async def get_game_info(self) -> GameInfo: result = await self._execute(game_info=sc_pb.RequestGameInfo()) return GameInfo(result.game_info) async def actions(self, actions, game_data, return_successes=False): if not isinstance(actions, list): res = await self.actions([actions], game_data, return_successes) if res: return res[0] else: return None else: actions = combine_actions(actions) res = await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action(action_raw=a) for a in actions] )) res = [ActionResult(r) for r in res.action.result] if return_successes: return res else: return [r for r in res if r != ActionResult.Success] async def query_pathing(self, start: Union[Unit, Point2, Point3], end: Union[Point2, Point3]) -> Optional[Union[int, float]]: """ Caution: returns 0 when path not found """ assert isinstance(start, (Point2, Unit)) assert isinstance(end, Point2) if isinstance(start, Point2): result = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( start_pos=common_pb.Point2D(x=start.x, y=start.y), end_pos=common_pb.Point2D(x=end.x, y=end.y) )] )) else: result = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( unit_tag=start.tag, end_pos=common_pb.Point2D(x=end.x, y=end.y) )] )) distance = float(result.query.pathing[0].distance) if distance <= 0.0: return None return distance async def query_pathings(self, zipped_list: List[List[Union[Unit, Point2, Point3]]]) -> List[Union[float, int]]: """ Usage: await self.query_pathings([[unit1, target2], [unit2, target2]]) -> returns [distance1, distance2] Caution: returns 0 when path not found Might merge this function with the function above """ assert zipped_list assert isinstance(zipped_list, list) assert isinstance(zipped_list[0], list) assert len(zipped_list[0]) == 2 assert isinstance(zipped_list[0][0], (Point2, Unit)) assert isinstance(zipped_list[0][1], Point2) if isinstance(zipped_list[0][0], Point2): results = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( start_pos=common_pb.Point2D(x=p1.x, y=p1.y), end_pos=common_pb.Point2D(x=p2.x, y=p2.y) ) for p1, p2 in zipped_list] )) else: results = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( unit_tag=p1.tag, end_pos=common_pb.Point2D(x=p2.x, y=p2.y) ) for p1, p2 in zipped_list] )) results = [float(d.distance) for d in results.query.pathing] return results async def query_building_placement(self, ability: AbilityId, positions: List[Union[Unit, Point2, Point3]], ignore_resources: bool=True) -> List[ActionResult]: assert isinstance(ability, AbilityData) result = await self._execute(query=query_pb.RequestQuery( placements=[query_pb.RequestQueryBuildingPlacement( ability_id=ability.id.value, target_pos=common_pb.Point2D(x=position.x, y=position.y) ) for position in positions], ignore_resource_requirements=ignore_resources )) return [ActionResult(p.result) for p in result.query.placements] async def query_available_abilities(self, units: Union[List[Unit], "Units"], ignore_resource_requirements: bool=False) -> List[List[AbilityId]]: """ Query abilities of multiple units """ if not isinstance(units, list): """ Deprecated, accepting a single unit may be removed in the future, query a list of units instead """ assert isinstance(units, Unit) units = [units] input_was_a_list = False else: input_was_a_list = True assert units result = await self._execute(query=query_pb.RequestQuery( abilities=[query_pb.RequestQueryAvailableAbilities( unit_tag=unit.tag) for unit in units], ignore_resource_requirements=ignore_resource_requirements) ) """ Fix for bots that only query a single unit """ if not input_was_a_list: return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities][0] return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities] async def chat_send(self, message: str, team_only: bool): """ Writes a message to the chat """ ch = ChatChannel.Team if team_only else ChatChannel.Broadcast r = await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action(action_chat=sc_pb.ActionChat( channel=ch.value, message=message ))] )) async def debug_create_unit(self, unit_spawn_commands: List[List[Union[UnitTypeId, int, Point2, Point3]]]): """ Usage example (will spawn 1 marine in the center of the map for player ID 1): await self._client.debug_create_unit([[UnitTypeId.MARINE, 1, self._game_info.map_center, 1]]) """ assert isinstance(unit_spawn_commands, list) assert unit_spawn_commands assert isinstance(unit_spawn_commands[0], list) assert len(unit_spawn_commands[0]) == 4 assert isinstance(unit_spawn_commands[0][0], UnitTypeId) assert unit_spawn_commands[0][1] > 0 # careful, in realtime=True this function may create more units assert isinstance(unit_spawn_commands[0][2], (Point2, Point3)) assert 1 <= unit_spawn_commands[0][3] <= 2 await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(create_unit=debug_pb.DebugCreateUnit( unit_type=unit_type.value, owner=owner_id, pos=common_pb.Point2D(x=position.x, y=position.y), quantity=amount_of_units )) for unit_type, amount_of_units, position, owner_id in unit_spawn_commands] )) async def debug_kill_unit(self, unit_tags: Union[Units, List[int], Set[int]]): if isinstance(unit_tags, Units): unit_tags = unit_tags.tags assert unit_tags await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(kill_unit=debug_pb.DebugKillUnit( tag=unit_tags ))] )) async def move_camera(self, position: Union[Unit, Point2, Point3]): """ Moves camera to the target position """ assert isinstance(position, (Unit, Point2, Point3)) if isinstance(position, Unit): position = position.position await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action( action_raw=raw_pb.ActionRaw( camera_move=raw_pb.ActionRawCameraMove( center_world_space=common_pb.Point(x=position.x, y=position.y) ) ) )] )) async def move_camera_spatial(self, position: Union[Point2, Point3]): """ Moves camera to the target position using the spatial aciton interface """ from s2clientprotocol import spatial_pb2 as spatial_pb assert isinstance(position, (Point2, Point3)) action = sc_pb.Action( action_render=spatial_pb.ActionSpatial( camera_move=spatial_pb.ActionSpatialCameraMove( center_minimap=common_pb.PointI(x=position.x, y=position.y) ) ) ) await self._execute(action=sc_pb.RequestAction(actions=[action])) async def debug_text(self, texts: Union[str, list], positions: Union[list, set], color=(0, 255, 0), size_px=16): """ Deprecated, may be removed soon """ if isinstance(positions, (set, list)): if not positions: return if isinstance(texts, str): texts = [texts] * len(positions) assert len(texts) == len(positions) await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(draw=debug_pb.DebugDraw( text=[debug_pb.DebugText( text=t, color=debug_pb.Color(r=color[0], g=color[1], b=color[2]), world_pos=common_pb.Point(x=p.x, y=p.y, z=getattr(p, "z", 10)), size=size_px ) for t, p in zip(texts, positions)] ))] )) else: await self.debug_text([texts], [positions], color) def debug_text_simple(self, text: str): """ Draws a text in the top left corner of the screen (up to a max of 6 messages it seems). Don't forget to add 'await self._client.send_debug'. """ self._debug_texts.append(self.to_debug_message(text)) def debug_text_screen(self, text: str, pos: Union[Point2, Point3, tuple, list], color=None, size: int=8): """ Draws a text on the screen with coordinates 0 <= x, y <= 1. Don't forget to add 'await self._client.send_debug'. """ assert len(pos) >= 2 assert 0 <= pos[0] <= 1 assert 0 <= pos[1] <= 1 pos = Point2((pos[0], pos[1])) self._debug_texts.append(self.to_debug_message(text, color, pos, size)) def debug_text_2d(self, text: str, pos: Union[Point2, Point3, tuple, list], color=None, size: int=8): return self.debug_text_screen(text, pos, color, size) def debug_text_world(self, text: str, pos: Union[Unit, Point2, Point3], color=None, size: int=8): """ Draws a text at Point3 position. Don't forget to add 'await self._client.send_debug'. To grab a unit's 3d position, use unit.position3d Usually the Z value of a Point3 is between 8 and 14 (except for flying units) """ if isinstance(pos, Point2) and not isinstance(pos, Point3): # a Point3 is also a Point2 pos = Point3((pos.x, pos.y, 0)) self._debug_texts.append(self.to_debug_message(text, color, pos, size)) def debug_text_3d(self, text: str, pos: Union[Unit, Point2, Point3], color=None, size: int=8): return self.debug_text_world(text, pos, color, size) def debug_line_out(self, p0: Union[Unit, Point2, Point3], p1: Union[Unit, Point2, Point3], color=None): """ Draws a line from p0 to p1. Don't forget to add 'await self._client.send_debug'. """ self._debug_lines.append(debug_pb.DebugLine( line=debug_pb.Line(p0=self.to_debug_point(p0), p1=self.to_debug_point(p1)), color=self.to_debug_color(color))) def debug_box_out(self, p_min: Union[Unit, Point2, Point3], p_max: Union[Unit, Point2, Point3], color=None): """ Draws a box with p_min and p_max as corners. Don't forget to add 'await self._client.send_debug'. """ self._debug_boxes.append(debug_pb.DebugBox( min=self.to_debug_point(p_min), max=self.to_debug_point(p_max), color=self.to_debug_color(color) )) def debug_sphere_out(self, p: Union[Unit, Point2, Point3], r: Union[int, float], color=None): """ Draws a sphere at point p with radius r. Don't forget to add 'await self._client.send_debug'. """ self._debug_spheres.append(debug_pb.DebugSphere( p=self.to_debug_point(p), r=r, color=self.to_debug_color(color) )) async def send_debug(self): """ Sends the debug draw execution. Put this after your debug creation functions. """ await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(draw=debug_pb.DebugDraw( text=self._debug_texts if self._debug_texts else None, lines=self._debug_lines if self._debug_lines else None, boxes=self._debug_boxes if self._debug_boxes else None, spheres=self._debug_spheres if self._debug_spheres else None ))])) self._debug_texts.clear() self._debug_lines.clear() self._debug_boxes.clear() self._debug_spheres.clear() def to_debug_color(self, color): """ Helper function for color conversion """ if color is None: return debug_pb.Color(r=255, g=255, b=255) else: r = getattr(color, "r", getattr(color, "x", 255)) g = getattr(color, "g", getattr(color, "y", 255)) b = getattr(color, "b", getattr(color, "z", 255)) if max(r, g, b) <= 1: r *= 255 g *= 255 b *= 255 return debug_pb.Color(r=int(r), g=int(g), b=int(b)) def to_debug_point(self, point: Union[Unit, Point2, Point3]) -> common_pb.Point: """ Helper function for point conversion """ if isinstance(point, Unit): point = point.position3d return common_pb.Point(x=point.x, y=point.y, z=getattr(point, "z", 0)) def to_debug_message(self, text: str, color=None, pos: Optional[Union[Point2, Point3]]=None, size: int=8) -> debug_pb.DebugText: """ Helper function to create debug texts """ color = self.to_debug_color(color) pt3d = self.to_debug_point(pos) if isinstance(pos, Point3) else None virtual_pos = self.to_debug_point(pos) if not isinstance(pos, Point3) else None return debug_pb.DebugText( color=color, text=text, virtual_pos=virtual_pos, world_pos=pt3d, size=size )
Ancestors (in MRO)
- Client
- sc2.protocol.Protocol
- builtins.object
Static methods
def __init__(
self, ws)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, ws): super().__init__(ws) self.game_step = 8 self._player_id = None self._game_result = None self._debug_texts = list() self._debug_lines = list() self._debug_boxes = list() self._debug_spheres = list() self._renderer = None
def actions(
self, actions, game_data, return_successes=False)
async def actions(self, actions, game_data, return_successes=False): if not isinstance(actions, list): res = await self.actions([actions], game_data, return_successes) if res: return res[0] else: return None else: actions = combine_actions(actions) res = await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action(action_raw=a) for a in actions] )) res = [ActionResult(r) for r in res.action.result] if return_successes: return res else: return [r for r in res if r != ActionResult.Success]
def chat_send(
self, message, team_only)
Writes a message to the chat
async def chat_send(self, message: str, team_only: bool): """ Writes a message to the chat """ ch = ChatChannel.Team if team_only else ChatChannel.Broadcast r = await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action(action_chat=sc_pb.ActionChat( channel=ch.value, message=message ))] ))
def debug_box_out(
self, p_min, p_max, color=None)
Draws a box with p_min and p_max as corners. Don't forget to add 'await self._client.send_debug'.
def debug_box_out(self, p_min: Union[Unit, Point2, Point3], p_max: Union[Unit, Point2, Point3], color=None): """ Draws a box with p_min and p_max as corners. Don't forget to add 'await self._client.send_debug'. """ self._debug_boxes.append(debug_pb.DebugBox( min=self.to_debug_point(p_min), max=self.to_debug_point(p_max), color=self.to_debug_color(color) ))
def debug_create_unit(
self, unit_spawn_commands)
Usage example (will spawn 1 marine in the center of the map for player ID 1): await self._client.debug_create_unit([[UnitTypeId.MARINE, 1, self._game_info.map_center, 1]])
async def debug_create_unit(self, unit_spawn_commands: List[List[Union[UnitTypeId, int, Point2, Point3]]]): """ Usage example (will spawn 1 marine in the center of the map for player ID 1): await self._client.debug_create_unit([[UnitTypeId.MARINE, 1, self._game_info.map_center, 1]]) """ assert isinstance(unit_spawn_commands, list) assert unit_spawn_commands assert isinstance(unit_spawn_commands[0], list) assert len(unit_spawn_commands[0]) == 4 assert isinstance(unit_spawn_commands[0][0], UnitTypeId) assert unit_spawn_commands[0][1] > 0 # careful, in realtime=True this function may create more units assert isinstance(unit_spawn_commands[0][2], (Point2, Point3)) assert 1 <= unit_spawn_commands[0][3] <= 2 await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(create_unit=debug_pb.DebugCreateUnit( unit_type=unit_type.value, owner=owner_id, pos=common_pb.Point2D(x=position.x, y=position.y), quantity=amount_of_units )) for unit_type, amount_of_units, position, owner_id in unit_spawn_commands] ))
def debug_kill_unit(
self, unit_tags)
async def debug_kill_unit(self, unit_tags: Union[Units, List[int], Set[int]]): if isinstance(unit_tags, Units): unit_tags = unit_tags.tags assert unit_tags await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(kill_unit=debug_pb.DebugKillUnit( tag=unit_tags ))] ))
def debug_line_out(
self, p0, p1, color=None)
Draws a line from p0 to p1. Don't forget to add 'await self._client.send_debug'.
def debug_line_out(self, p0: Union[Unit, Point2, Point3], p1: Union[Unit, Point2, Point3], color=None): """ Draws a line from p0 to p1. Don't forget to add 'await self._client.send_debug'. """ self._debug_lines.append(debug_pb.DebugLine( line=debug_pb.Line(p0=self.to_debug_point(p0), p1=self.to_debug_point(p1)), color=self.to_debug_color(color)))
def debug_sphere_out(
self, p, r, color=None)
Draws a sphere at point p with radius r. Don't forget to add 'await self._client.send_debug'.
def debug_sphere_out(self, p: Union[Unit, Point2, Point3], r: Union[int, float], color=None): """ Draws a sphere at point p with radius r. Don't forget to add 'await self._client.send_debug'. """ self._debug_spheres.append(debug_pb.DebugSphere( p=self.to_debug_point(p), r=r, color=self.to_debug_color(color) ))
def debug_text(
self, texts, positions, color=(0, 255, 0), size_px=16)
Deprecated, may be removed soon
async def debug_text(self, texts: Union[str, list], positions: Union[list, set], color=(0, 255, 0), size_px=16): """ Deprecated, may be removed soon """ if isinstance(positions, (set, list)): if not positions: return if isinstance(texts, str): texts = [texts] * len(positions) assert len(texts) == len(positions) await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(draw=debug_pb.DebugDraw( text=[debug_pb.DebugText( text=t, color=debug_pb.Color(r=color[0], g=color[1], b=color[2]), world_pos=common_pb.Point(x=p.x, y=p.y, z=getattr(p, "z", 10)), size=size_px ) for t, p in zip(texts, positions)] ))] )) else: await self.debug_text([texts], [positions], color)
def debug_text_2d(
self, text, pos, color=None, size=8)
def debug_text_2d(self, text: str, pos: Union[Point2, Point3, tuple, list], color=None, size: int=8): return self.debug_text_screen(text, pos, color, size)
def debug_text_3d(
self, text, pos, color=None, size=8)
def debug_text_3d(self, text: str, pos: Union[Unit, Point2, Point3], color=None, size: int=8): return self.debug_text_world(text, pos, color, size)
def debug_text_screen(
self, text, pos, color=None, size=8)
Draws a text on the screen with coordinates 0 <= x, y <= 1. Don't forget to add 'await self._client.send_debug'.
def debug_text_screen(self, text: str, pos: Union[Point2, Point3, tuple, list], color=None, size: int=8): """ Draws a text on the screen with coordinates 0 <= x, y <= 1. Don't forget to add 'await self._client.send_debug'. """ assert len(pos) >= 2 assert 0 <= pos[0] <= 1 assert 0 <= pos[1] <= 1 pos = Point2((pos[0], pos[1])) self._debug_texts.append(self.to_debug_message(text, color, pos, size))
def debug_text_simple(
self, text)
Draws a text in the top left corner of the screen (up to a max of 6 messages it seems). Don't forget to add 'await self._client.send_debug'.
def debug_text_simple(self, text: str): """ Draws a text in the top left corner of the screen (up to a max of 6 messages it seems). Don't forget to add 'await self._client.send_debug'. """ self._debug_texts.append(self.to_debug_message(text))
def debug_text_world(
self, text, pos, color=None, size=8)
Draws a text at Point3 position. Don't forget to add 'await self._client.send_debug'. To grab a unit's 3d position, use unit.position3d Usually the Z value of a Point3 is between 8 and 14 (except for flying units)
def debug_text_world(self, text: str, pos: Union[Unit, Point2, Point3], color=None, size: int=8): """ Draws a text at Point3 position. Don't forget to add 'await self._client.send_debug'. To grab a unit's 3d position, use unit.position3d Usually the Z value of a Point3 is between 8 and 14 (except for flying units) """ if isinstance(pos, Point2) and not isinstance(pos, Point3): # a Point3 is also a Point2 pos = Point3((pos.x, pos.y, 0)) self._debug_texts.append(self.to_debug_message(text, color, pos, size))
def get_game_data(
self)
async def get_game_data(self) -> GameData: result = await self._execute(data=sc_pb.RequestData( ability_id=True, unit_type_id=True, upgrade_id=True )) return GameData(result.data)
def get_game_info(
self)
async def get_game_info(self) -> GameInfo: result = await self._execute(game_info=sc_pb.RequestGameInfo()) return GameInfo(result.game_info)
def join_game(
self, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None)
async def join_game(self, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None): ifopts = sc_pb.InterfaceOptions(raw=True, score=True) if rgb_render_config: assert isinstance(rgb_render_config, dict) assert 'window_size' in rgb_render_config and 'minimap_size' in rgb_render_config window_size = rgb_render_config['window_size'] minimap_size = rgb_render_config['minimap_size'] self._renderer = Renderer(self, window_size, minimap_size) map_width, map_height = window_size minimap_width, minimap_height = minimap_size ifopts.render.resolution.x = map_width ifopts.render.resolution.y = map_height ifopts.render.minimap_resolution.x = minimap_width ifopts.render.minimap_resolution.y = minimap_height if race is None: assert isinstance(observed_player_id, int) # join as observer req = sc_pb.RequestJoinGame( observed_player_id=observed_player_id, options=ifopts ) else: assert isinstance(race, Race) req = sc_pb.RequestJoinGame( race=race.value, options=ifopts ) if portconfig: req.shared_port = portconfig.shared req.server_ports.game_port = portconfig.server[0] req.server_ports.base_port = portconfig.server[1] for ppc in portconfig.players: p = req.client_ports.add() p.game_port = ppc[0] p.base_port = ppc[1] result = await self._execute(join_game=req) self._game_result = None self._player_id = result.join_game.player_id return result.join_game.player_id
def leave(
self)
You can use 'await self._client.leave()' to surrender midst game.
async def leave(self): """ You can use 'await self._client.leave()' to surrender midst game. """ is_resign = self._game_result is None if is_resign: # For all clients that can leave, result of leaving the game either # loss, or the client will ignore the result self._game_result = {self._player_id: Result.Defeat} try: await self._execute(leave_game=sc_pb.RequestLeaveGame()) except ProtocolError: if is_resign: raise
def move_camera(
self, position)
Moves camera to the target position
async def move_camera(self, position: Union[Unit, Point2, Point3]): """ Moves camera to the target position """ assert isinstance(position, (Unit, Point2, Point3)) if isinstance(position, Unit): position = position.position await self._execute(action=sc_pb.RequestAction( actions=[sc_pb.Action( action_raw=raw_pb.ActionRaw( camera_move=raw_pb.ActionRawCameraMove( center_world_space=common_pb.Point(x=position.x, y=position.y) ) ) )] ))
def move_camera_spatial(
self, position)
Moves camera to the target position using the spatial aciton interface
async def move_camera_spatial(self, position: Union[Point2, Point3]): """ Moves camera to the target position using the spatial aciton interface """ from s2clientprotocol import spatial_pb2 as spatial_pb assert isinstance(position, (Point2, Point3)) action = sc_pb.Action( action_render=spatial_pb.ActionSpatial( camera_move=spatial_pb.ActionSpatialCameraMove( center_minimap=common_pb.PointI(x=position.x, y=position.y) ) ) ) await self._execute(action=sc_pb.RequestAction(actions=[action]))
def observation(
self)
async def observation(self): result = await self._execute(observation=sc_pb.RequestObservation()) if not self.in_game or result.observation.player_result: # Sometimes game ends one step before results are available if not result.observation.player_result: result = await self._execute(observation=sc_pb.RequestObservation()) assert result.observation.player_result player_id_to_result = {} for pr in result.observation.player_result: player_id_to_result[pr.player_id] = Result(pr.result) self._game_result = player_id_to_result # if render_data is available, then RGB rendering was requested if self._renderer and result.observation.observation.HasField('render_data'): await self._renderer.render(result.observation) return result
def ping(
self)
async def ping(self): result = await self._execute(ping=sc_pb.RequestPing()) return result
def query_available_abilities(
self, units, ignore_resource_requirements=False)
Query abilities of multiple units
async def query_available_abilities(self, units: Union[List[Unit], "Units"], ignore_resource_requirements: bool=False) -> List[List[AbilityId]]: """ Query abilities of multiple units """ if not isinstance(units, list): """ Deprecated, accepting a single unit may be removed in the future, query a list of units instead """ assert isinstance(units, Unit) units = [units] input_was_a_list = False else: input_was_a_list = True assert units result = await self._execute(query=query_pb.RequestQuery( abilities=[query_pb.RequestQueryAvailableAbilities( unit_tag=unit.tag) for unit in units], ignore_resource_requirements=ignore_resource_requirements) ) """ Fix for bots that only query a single unit """ if not input_was_a_list: return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities][0] return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities]
def query_building_placement(
self, ability, positions, ignore_resources=True)
async def query_building_placement(self, ability: AbilityId, positions: List[Union[Unit, Point2, Point3]], ignore_resources: bool=True) -> List[ActionResult]: assert isinstance(ability, AbilityData) result = await self._execute(query=query_pb.RequestQuery( placements=[query_pb.RequestQueryBuildingPlacement( ability_id=ability.id.value, target_pos=common_pb.Point2D(x=position.x, y=position.y) ) for position in positions], ignore_resource_requirements=ignore_resources )) return [ActionResult(p.result) for p in result.query.placements]
def query_pathing(
self, start, end)
Caution: returns 0 when path not found
async def query_pathing(self, start: Union[Unit, Point2, Point3], end: Union[Point2, Point3]) -> Optional[Union[int, float]]: """ Caution: returns 0 when path not found """ assert isinstance(start, (Point2, Unit)) assert isinstance(end, Point2) if isinstance(start, Point2): result = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( start_pos=common_pb.Point2D(x=start.x, y=start.y), end_pos=common_pb.Point2D(x=end.x, y=end.y) )] )) else: result = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( unit_tag=start.tag, end_pos=common_pb.Point2D(x=end.x, y=end.y) )] )) distance = float(result.query.pathing[0].distance) if distance <= 0.0: return None return distance
def query_pathings(
self, zipped_list)
Usage: await self.query_pathings([[unit1, target2], [unit2, target2]]) -> returns [distance1, distance2] Caution: returns 0 when path not found Might merge this function with the function above
async def query_pathings(self, zipped_list: List[List[Union[Unit, Point2, Point3]]]) -> List[Union[float, int]]: """ Usage: await self.query_pathings([[unit1, target2], [unit2, target2]]) -> returns [distance1, distance2] Caution: returns 0 when path not found Might merge this function with the function above """ assert zipped_list assert isinstance(zipped_list, list) assert isinstance(zipped_list[0], list) assert len(zipped_list[0]) == 2 assert isinstance(zipped_list[0][0], (Point2, Unit)) assert isinstance(zipped_list[0][1], Point2) if isinstance(zipped_list[0][0], Point2): results = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( start_pos=common_pb.Point2D(x=p1.x, y=p1.y), end_pos=common_pb.Point2D(x=p2.x, y=p2.y) ) for p1, p2 in zipped_list] )) else: results = await self._execute(query=query_pb.RequestQuery( pathing=[query_pb.RequestQueryPathing( unit_tag=p1.tag, end_pos=common_pb.Point2D(x=p2.x, y=p2.y) ) for p1, p2 in zipped_list] )) results = [float(d.distance) for d in results.query.pathing] return results
def quit(
self)
async def quit(self): await self._execute(quit=sc_pb.RequestQuit())
def save_replay(
self, path)
async def save_replay(self, path): logger.debug(f"Requesting replay from server") result = await self._execute(save_replay=sc_pb.RequestSaveReplay()) with open(path, "wb") as f: f.write(result.save_replay.data) logger.info(f"Saved replay to {path}")
def send_debug(
self)
Sends the debug draw execution. Put this after your debug creation functions.
async def send_debug(self): """ Sends the debug draw execution. Put this after your debug creation functions. """ await self._execute(debug=sc_pb.RequestDebug( debug=[debug_pb.DebugCommand(draw=debug_pb.DebugDraw( text=self._debug_texts if self._debug_texts else None, lines=self._debug_lines if self._debug_lines else None, boxes=self._debug_boxes if self._debug_boxes else None, spheres=self._debug_spheres if self._debug_spheres else None ))])) self._debug_texts.clear() self._debug_lines.clear() self._debug_boxes.clear() self._debug_spheres.clear()
def step(
self)
EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second
async def step(self): """ EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second """ result = await self._execute(step=sc_pb.RequestStep(count=self.game_step)) return result
def to_debug_color(
self, color)
Helper function for color conversion
def to_debug_color(self, color): """ Helper function for color conversion """ if color is None: return debug_pb.Color(r=255, g=255, b=255) else: r = getattr(color, "r", getattr(color, "x", 255)) g = getattr(color, "g", getattr(color, "y", 255)) b = getattr(color, "b", getattr(color, "z", 255)) if max(r, g, b) <= 1: r *= 255 g *= 255 b *= 255 return debug_pb.Color(r=int(r), g=int(g), b=int(b))
def to_debug_message(
self, text, color=None, pos=None, size=8)
Helper function to create debug texts
def to_debug_message(self, text: str, color=None, pos: Optional[Union[Point2, Point3]]=None, size: int=8) -> debug_pb.DebugText: """ Helper function to create debug texts """ color = self.to_debug_color(color) pt3d = self.to_debug_point(pos) if isinstance(pos, Point3) else None virtual_pos = self.to_debug_point(pos) if not isinstance(pos, Point3) else None return debug_pb.DebugText( color=color, text=text, virtual_pos=virtual_pos, world_pos=pt3d, size=size )
def to_debug_point(
self, point)
Helper function for point conversion
def to_debug_point(self, point: Union[Unit, Point2, Point3]) -> common_pb.Point: """ Helper function for point conversion """ if isinstance(point, Unit): point = point.position3d return common_pb.Point(x=point.x, y=point.y, z=getattr(point, "z", 0))
Instance variables
var game_step
var in_game