diff --git a/src/emulator.py b/src/emulator.py index 1129f35..60a97fa 100644 --- a/src/emulator.py +++ b/src/emulator.py @@ -1,3 +1,5 @@ +"""Emulator server, responsible for handling user inputs and outputting video & sound.""" + import asyncio import logging import os @@ -30,8 +32,8 @@ from settings import ( RTMP_STREAM_URI, ) -core: mgba.core = mgba.core.load_path(EMULATOR_ROM_PATH) -screen: mgba.image = mgba.image.Image(EMULATOR_WIDTH, EMULATOR_HEIGHT) +core: mgba.core.Core = mgba.core.load_path(EMULATOR_ROM_PATH) +screen: mgba.image.Image = mgba.image.Image(EMULATOR_WIDTH, EMULATOR_HEIGHT) core.set_video_buffer(screen) core.reset() @@ -41,20 +43,7 @@ mgba.log.silence() r: redis.Redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0) -def next_action(): - """Select the next key from the redis database. - - Returns: - int: key used by mgba - """ - votes: list[int] = list(map(int, r.mget(KEYS_ID))) - if any(votes): - r.mset(KEYS_RESET) - return votes.index(max(votes)) - else: - return -1 - - +# Launch ffmpeg process stream = Popen( [ "/usr/bin/ffmpeg", @@ -91,13 +80,37 @@ stream = Popen( ) +def next_action(): + """Select the next key from the redis database. + + Returns: + int: key used by mgba. + """ + votes: list[int] = list(map(int, r.mget(KEYS_ID))) + if any(votes): + r.mset(KEYS_RESET) + return votes.index(max(votes)) + else: + return -1 + + def state_manager(loop: asyncio.AbstractEventLoop): + """Subscribe and respond to messages received from redis. + + Args: + loop (asyncio.AbstractEventLoop): the asyncio event loop. + """ ps = r.pubsub() ps.subscribe("admin") + while True: for message in ps.listen(): if message["type"] == "message": data = message["data"].decode("utf-8") + + # TODO: voir si plus clean possible ? + # TODO: dev dans un docker ? + if data == "save": asyncio.ensure_future(utils.save(core), loop=loop) elif data.startswith("load:"): @@ -105,27 +118,32 @@ def state_manager(loop: asyncio.AbstractEventLoop): async def emulator(): + """Start the main loop responsible for handling inputs and sending images to ffmpeg.""" while True: last_frame_t = time.time() + # poll redis for keys if not (core.frame_counter % EMULATOR_POLLING_RATE): core.clear_keys(*KEYS_MGBA) next_key = next_action() if next_key != -1: core.set_keys(next_key) + # mGBA run next frame core.run_frame() + # save frame to PNG image image = screen.to_pil().convert("RGB") image.save(stream.stdin, "PNG") + # sleep until next frame, if necessary sleep_t = last_frame_t - time.time() + EMULATOR_SPF if sleep_t > 0: await asyncio.sleep(sleep_t) async def main(): - + """Start the emulator.""" loop = asyncio.get_event_loop() # setup states in redis @@ -145,3 +163,5 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) + + # TODO: write code when ctrl+C -> save redis database ? diff --git a/src/server.py b/src/server.py index 97d2fdd..801bb10 100644 --- a/src/server.py +++ b/src/server.py @@ -1,16 +1,18 @@ +"""Websocket server, responsible for proxying user inputs.""" + import asyncio -import json import logging import time -from typing import Union import redis import websockets +import websockets.exceptions +import websockets.server +import websockets.typing from settings import ( KEYS_ID, KEYS_RESET, - PASSWORD_ADMIN, REDIS_HOST, REDIS_PORT, USER_TIMEOUT, @@ -27,71 +29,46 @@ r.mset(KEYS_RESET) USERS: Users = Users() -async def parse_message(user: User, message: dict[str, str]) -> None: +async def parse_message(user: User, message: websockets.typing.Data) -> None: """Parse the user's message. Args: user (User): the sender of the message. - message (dict[str, str]): the data received (through the websocket). + message (str): the key received (through the websocket). """ - if "auth" in message: - data = message["auth"] - if USERS.admin is None and data == PASSWORD_ADMIN: - USERS.admin = user - logging.debug(f"admin authenticated: {user}") - - response: dict[str, Union[str, list[str]]] = dict() - response["auth"] = "success" - states = r.smembers("states") - stringlist = [x.decode("utf-8") for x in states] - response["states"] = sorted(stringlist) - await user.send(json.dumps(response)) - - if "admin" in message: - if user == USERS.admin: - data = message["admin"] - if data == "save": - r.publish("admin", "save") - elif data.startswith("load:"): - r.publish("admin", data) - else: - logging.error(f"unsupported admin action: {data}") - else: - logging.error(f"user is not admin: {user}") - - if "action" in message: - data = message["action"] - - if user.last_message + USER_TIMEOUT > time.time(): - logging.debug(f"dropping action: {data}") - return None - elif data in KEYS_ID: - r.incr(data) - user.last_message = time.time() - else: - logging.error(f"unsupported action: {data}") + if user.last_message + USER_TIMEOUT > time.time(): + logging.debug(f"dropping action: {message!r} from {user}") + return None + elif message in KEYS_ID: + r.incr(message) + user.last_message = time.time() + logging.debug(f"received action: {message!r} from {user}") + else: + logging.error(f"unsupported action: {message!r} from {user}") -async def handler(websocket, path: str): +async def handler(websocket: websockets.server.WebSocketServerProtocol, path: str): """Handle the messages sent by a user. Args: websocket: the websocket used by the user. - path (str): the path used by the websocket. (?) + path (str): the path used by the websocket. """ - try: - # Register user - user = User(websocket) - USERS.register(user) - # Manage received messages - async for json_message in websocket: - message: dict[str, str] = json.loads(json_message) + # Register user + user = User(websocket) + USERS.register(user) + logging.debug(f"registered user {user}") + + try: # Manage received messages + async for message in user.websocket: await parse_message(user, message) + except websockets.exceptions.ConnectionClosed: + logging.error(f"connection with user {user} is already closed") + except RuntimeError: + logging.error(f"two coroutines called recv() concurrently, user={user}") finally: - # Unregister user - if user == USERS.admin: - USERS.admin = None USERS.unregister(user) + logging.debug(f"unregistered user {user}") async def main(): diff --git a/src/utils.py b/src/utils.py index cbaf1b1..7b35fe6 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,15 +1,17 @@ import logging import time from dataclasses import dataclass -from typing import Any, Optional +from typing import Any +import websockets.server +import websockets.typing from mgba._pylib import ffi class User: """Store infos related to a connected user.""" - websocket: Any + websocket: websockets.server.WebSocketServerProtocol last_message: float def __init__(self, websocket: Any) -> None: @@ -42,8 +44,6 @@ class User: class Users(set): """Store `User`s connected to the server.""" - admin: Optional[User] = None - def register(self, user: User): """Register a user in the set. @@ -73,7 +73,8 @@ async def save(core): async def load(core, filename): - state = ffi.new("unsigned char[397312]") # pulled 397312 from my ass + state = ffi.new("unsigned char[397312]") # pulled 397312 straight from my ass + # TODO: checker les sources mgba pour savoir d'où sort 397312 with open(f"states/{filename}.state", "rb") as state_file: for i in range(len(state)): state[i] = int.from_bytes(state_file.read(4), byteorder="big", signed=False)