Top

sc2.sc2process module

from typing import Any, Optional, List

import sys
import signal
import time
import asyncio
import os.path
import shutil
import tempfile
import subprocess
import portpicker
import aiohttp

import logging
logger = logging.getLogger(__name__)

from .paths import Paths
from .controller import Controller

class kill_switch:
    _to_kill: List[Any] = []

    @classmethod
    def add(cls, value):
        logger.debug("kill_switch: Add switch")
        cls._to_kill.append(value)

    @classmethod
    def kill_all(cls):
        logger.info("kill_switch: Process cleanup")
        for p in cls._to_kill:
            p._clean()

class SC2Process:
    def __init__(self, host: str = "127.0.0.1", port: Optional[int] = None, fullscreen: bool = False,
                 render: bool = False) -> None:
        assert isinstance(host, str)
        assert isinstance(port, int) or port is None

        self._render = render
        self._fullscreen = fullscreen
        self._host = host
        if port is None:
            self._port = portpicker.pick_unused_port()
        else:
            self._port = port
        self._tmp_dir = tempfile.mkdtemp(prefix="SC2_")
        self._process = None
        self._session = None
        self._ws = None

    async def __aenter__(self):
        kill_switch.add(self)

        def signal_handler():
            kill_switch.kill_all()

        signal.signal(signal.SIGINT, signal_handler)

        try:
            self._process = self._launch()
            self._ws = await self._connect()
        except:
            await self._close_connection()
            self._clean()
            raise

        return Controller(self._ws, self)

    async def __aexit__(self, *args):
        kill_switch.kill_all()
        signal.signal(signal.SIGINT, signal.SIG_DFL)

    @property
    def ws_url(self):
        return f"ws://{self._host}:{self._port}/sc2api"

    def _launch(self):
        args = [
            str(Paths.EXECUTABLE),
            "-listen", self._host,
            "-port", str(self._port),
            "-displayMode", "1" if self._fullscreen else "0",
            "-dataDir", str(Paths.BASE),
            "-tempDir", self._tmp_dir,
        ]
        if self._render:
            args.extend(["-eglpath", "libEGL.so"])

        if logger.getEffectiveLevel() <= logging.DEBUG:
            args.append("-verbose")

        return subprocess.Popen(args,
            cwd=(str(Paths.CWD) if Paths.CWD else None),
            #, env=run_config.env
        )

    async def _connect(self):
        for i in range(60):
            if self._process is None:
                # The ._clean() was called, clearing the process
                logger.debug("Process cleanup complete, exit")
                sys.exit()

            await asyncio.sleep(1)
            try:
                self._session = aiohttp.ClientSession()
                ws = await self._session.ws_connect(self.ws_url, timeout=120)
                logger.debug("Websocket connection ready")
                return ws
            except aiohttp.client_exceptions.ClientConnectorError:
                await self._session.close()
                if i > 15:
                    logger.debug("Connection refused (startup not complete (yet))")

        logger.debug("Websocket connection to SC2 process timed out")
        raise TimeoutError("Websocket")

    async def _close_connection(self):
        logger.info("Closing connection...")

        if self._ws is not None:
            await self._ws.close()

        if self._session is not None:
            await self._session.close()

    def _clean(self):
        logger.info("Cleaning up...")

        if self._process is not None:
            if self._process.poll() is None:
                for _ in range(3):
                    self._process.terminate()
                    time.sleep(0.5)
                    if self._process.poll() is not None:
                        break
                else:
                    self._process.kill()
                    self._process.wait()
                    logger.error("KILLED")

        if os.path.exists(self._tmp_dir):
            shutil.rmtree(self._tmp_dir)

        self._process = None
        self._ws = None
        logger.info("Cleanup complete")

Module variables

var logger

Classes

class SC2Process

class SC2Process:
    def __init__(self, host: str = "127.0.0.1", port: Optional[int] = None, fullscreen: bool = False,
                 render: bool = False) -> None:
        assert isinstance(host, str)
        assert isinstance(port, int) or port is None

        self._render = render
        self._fullscreen = fullscreen
        self._host = host
        if port is None:
            self._port = portpicker.pick_unused_port()
        else:
            self._port = port
        self._tmp_dir = tempfile.mkdtemp(prefix="SC2_")
        self._process = None
        self._session = None
        self._ws = None

    async def __aenter__(self):
        kill_switch.add(self)

        def signal_handler():
            kill_switch.kill_all()

        signal.signal(signal.SIGINT, signal_handler)

        try:
            self._process = self._launch()
            self._ws = await self._connect()
        except:
            await self._close_connection()
            self._clean()
            raise

        return Controller(self._ws, self)

    async def __aexit__(self, *args):
        kill_switch.kill_all()
        signal.signal(signal.SIGINT, signal.SIG_DFL)

    @property
    def ws_url(self):
        return f"ws://{self._host}:{self._port}/sc2api"

    def _launch(self):
        args = [
            str(Paths.EXECUTABLE),
            "-listen", self._host,
            "-port", str(self._port),
            "-displayMode", "1" if self._fullscreen else "0",
            "-dataDir", str(Paths.BASE),
            "-tempDir", self._tmp_dir,
        ]
        if self._render:
            args.extend(["-eglpath", "libEGL.so"])

        if logger.getEffectiveLevel() <= logging.DEBUG:
            args.append("-verbose")

        return subprocess.Popen(args,
            cwd=(str(Paths.CWD) if Paths.CWD else None),
            #, env=run_config.env
        )

    async def _connect(self):
        for i in range(60):
            if self._process is None:
                # The ._clean() was called, clearing the process
                logger.debug("Process cleanup complete, exit")
                sys.exit()

            await asyncio.sleep(1)
            try:
                self._session = aiohttp.ClientSession()
                ws = await self._session.ws_connect(self.ws_url, timeout=120)
                logger.debug("Websocket connection ready")
                return ws
            except aiohttp.client_exceptions.ClientConnectorError:
                await self._session.close()
                if i > 15:
                    logger.debug("Connection refused (startup not complete (yet))")

        logger.debug("Websocket connection to SC2 process timed out")
        raise TimeoutError("Websocket")

    async def _close_connection(self):
        logger.info("Closing connection...")

        if self._ws is not None:
            await self._ws.close()

        if self._session is not None:
            await self._session.close()

    def _clean(self):
        logger.info("Cleaning up...")

        if self._process is not None:
            if self._process.poll() is None:
                for _ in range(3):
                    self._process.terminate()
                    time.sleep(0.5)
                    if self._process.poll() is not None:
                        break
                else:
                    self._process.kill()
                    self._process.wait()
                    logger.error("KILLED")

        if os.path.exists(self._tmp_dir):
            shutil.rmtree(self._tmp_dir)

        self._process = None
        self._ws = None
        logger.info("Cleanup complete")

Ancestors (in MRO)

Static methods

def __init__(

self, host='127.0.0.1', port=None, fullscreen=False, render=False)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, host: str = "127.0.0.1", port: Optional[int] = None, fullscreen: bool = False,
             render: bool = False) -> None:
    assert isinstance(host, str)
    assert isinstance(port, int) or port is None
    self._render = render
    self._fullscreen = fullscreen
    self._host = host
    if port is None:
        self._port = portpicker.pick_unused_port()
    else:
        self._port = port
    self._tmp_dir = tempfile.mkdtemp(prefix="SC2_")
    self._process = None
    self._session = None
    self._ws = None

Instance variables

var ws_url

class kill_switch

class kill_switch:
    _to_kill: List[Any] = []

    @classmethod
    def add(cls, value):
        logger.debug("kill_switch: Add switch")
        cls._to_kill.append(value)

    @classmethod
    def kill_all(cls):
        logger.info("kill_switch: Process cleanup")
        for p in cls._to_kill:
            p._clean()

Ancestors (in MRO)

Methods

def add(

cls, value)

@classmethod
def add(cls, value):
    logger.debug("kill_switch: Add switch")
    cls._to_kill.append(value)

def kill_all(

cls)

@classmethod
def kill_all(cls):
    logger.info("kill_switch: Process cleanup")
    for p in cls._to_kill:
        p._clean()