refactor: moved logical structures to separate files

feat(states): added a watchdog to sync state list with redis
This commit is contained in:
Laureηt 2022-02-12 23:25:08 +01:00
parent 3e77086bfc
commit 10bd66d827
No known key found for this signature in database
GPG key ID: D88C6B294FD40994
8 changed files with 251 additions and 138 deletions

38
poetry.lock generated
View file

@ -440,6 +440,17 @@ six = ">=1.9.0,<2"
docs = ["proselint (>=0.10.2)", "sphinx (>=3)", "sphinx-argparse (>=0.2.5)", "sphinx-rtd-theme (>=0.4.3)", "towncrier (>=21.3)"]
testing = ["coverage (>=4)", "coverage-enable-subprocess (>=1)", "flaky (>=3)", "pytest (>=4)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.1)", "pytest-mock (>=2)", "pytest-randomly (>=1)", "pytest-timeout (>=1)", "packaging (>=20.0)"]
[[package]]
name = "watchdog"
version = "2.1.6"
description = "Filesystem events monitoring"
category = "main"
optional = false
python-versions = ">=3.6"
[package.extras]
watchmedo = ["PyYAML (>=3.10)"]
[[package]]
name = "websockets"
version = "10.1"
@ -451,7 +462,7 @@ python-versions = ">=3.7"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "6f9f817034ddae03ae6983e80bc86ed2f53efbc4d7330bcf5e44fe1b35b24364"
content-hash = "e199192c3a65de46d43975d91f086a08e7918a9d3147dea63e7a065bb168504b"
[metadata.files]
asyncio = [
@ -774,6 +785,31 @@ virtualenv = [
{file = "virtualenv-20.13.1-py2.py3-none-any.whl", hash = "sha256:45e1d053cad4cd453181ae877c4ffc053546ae99e7dd049b9ff1d9be7491abf7"},
{file = "virtualenv-20.13.1.tar.gz", hash = "sha256:e0621bcbf4160e4e1030f05065c8834b4e93f4fcc223255db2a823440aca9c14"},
]
watchdog = [
{file = "watchdog-2.1.6-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:9693f35162dc6208d10b10ddf0458cc09ad70c30ba689d9206e02cd836ce28a3"},
{file = "watchdog-2.1.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:aba5c812f8ee8a3ff3be51887ca2d55fb8e268439ed44110d3846e4229eb0e8b"},
{file = "watchdog-2.1.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4ae38bf8ba6f39d5b83f78661273216e7db5b00f08be7592062cb1fc8b8ba542"},
{file = "watchdog-2.1.6-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:ad6f1796e37db2223d2a3f302f586f74c72c630b48a9872c1e7ae8e92e0ab669"},
{file = "watchdog-2.1.6-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:922a69fa533cb0c793b483becaaa0845f655151e7256ec73630a1b2e9ebcb660"},
{file = "watchdog-2.1.6-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:b2fcf9402fde2672545b139694284dc3b665fd1be660d73eca6805197ef776a3"},
{file = "watchdog-2.1.6-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:3386b367e950a11b0568062b70cc026c6f645428a698d33d39e013aaeda4cc04"},
{file = "watchdog-2.1.6-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8f1c00aa35f504197561060ca4c21d3cc079ba29cf6dd2fe61024c70160c990b"},
{file = "watchdog-2.1.6-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:b52b88021b9541a60531142b0a451baca08d28b74a723d0c99b13c8c8d48d604"},
{file = "watchdog-2.1.6-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8047da932432aa32c515ec1447ea79ce578d0559362ca3605f8e9568f844e3c6"},
{file = "watchdog-2.1.6-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e92c2d33858c8f560671b448205a268096e17870dcf60a9bb3ac7bfbafb7f5f9"},
{file = "watchdog-2.1.6-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:b7d336912853d7b77f9b2c24eeed6a5065d0a0cc0d3b6a5a45ad6d1d05fb8cd8"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_aarch64.whl", hash = "sha256:cca7741c0fcc765568350cb139e92b7f9f3c9a08c4f32591d18ab0a6ac9e71b6"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_armv7l.whl", hash = "sha256:25fb5240b195d17de949588628fdf93032ebf163524ef08933db0ea1f99bd685"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_i686.whl", hash = "sha256:be9be735f827820a06340dff2ddea1fb7234561fa5e6300a62fe7f54d40546a0"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_ppc64.whl", hash = "sha256:d0d19fb2441947b58fbf91336638c2b9f4cc98e05e1045404d7a4cb7cddc7a65"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:3becdb380d8916c873ad512f1701f8a92ce79ec6978ffde92919fd18d41da7fb"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_s390x.whl", hash = "sha256:ae67501c95606072aafa865b6ed47343ac6484472a2f95490ba151f6347acfc2"},
{file = "watchdog-2.1.6-py3-none-manylinux2014_x86_64.whl", hash = "sha256:e0f30db709c939cabf64a6dc5babb276e6d823fd84464ab916f9b9ba5623ca15"},
{file = "watchdog-2.1.6-py3-none-win32.whl", hash = "sha256:e02794ac791662a5eafc6ffeaf9bcc149035a0e48eb0a9d40a8feb4622605a3d"},
{file = "watchdog-2.1.6-py3-none-win_amd64.whl", hash = "sha256:bd9ba4f332cf57b2c1f698be0728c020399ef3040577cde2939f2e045b39c1e5"},
{file = "watchdog-2.1.6-py3-none-win_ia64.whl", hash = "sha256:a0f1c7edf116a12f7245be06120b1852275f9506a7d90227648b250755a03923"},
{file = "watchdog-2.1.6.tar.gz", hash = "sha256:a36e75df6c767cbf46f61a91c70b3ba71811dfa0aca4a324d9407a06a8b7a2e7"},
]
websockets = [
{file = "websockets-10.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:38db6e2163b021642d0a43200ee2dec8f4980bdbda96db54fde72b283b54cbfc"},
{file = "websockets-10.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e1b60fd297adb9fc78375778a5220da7f07bf54d2a33ac781319650413fc6a60"},

View file

@ -14,6 +14,7 @@ cached-property = "^1.5.2"
Pillow = "^8.4.0"
redis = "^3.5.3"
mgba = { path="mgba/src/platform/python" }
watchdog = "^2.1.6"
[tool.poetry.dev-dependencies]
black = "^22.1"

View file

@ -2,88 +2,30 @@
import asyncio
import logging
import os
import random as rd
import threading
import random
import time
from subprocess import PIPE, STDOUT, Popen # nosec
import mgba.core
import mgba.image
import mgba.log
import redis
import utils
from ffmpeg_manager import ffmpeg_stream
from redis_manager import RedisManager
from settings import (
EMULATOR_FPS,
EMULATOR_HEIGHT,
EMULATOR_POLLING_RATE,
EMULATOR_RAND_RATE,
EMULATOR_ROM_PATH,
EMULATOR_SPF,
EMULATOR_WIDTH,
FFMPEG_BITRATE,
FFMPEG_FPS,
FFMPEG_HEIGHT,
FFMPEG_WIDTH,
KEYS_ID,
KEYS_MGBA,
KEYS_RESET,
REDIS_HOST,
REDIS_PORT,
RTMP_STREAM_URI,
)
core: mgba.core.Core = mgba.core.load_path(EMULATOR_ROM_PATH)
screen: mgba.image.Image = mgba.image.Image(EMULATOR_WIDTH, EMULATOR_HEIGHT)
core.set_video_buffer(screen)
core.reset()
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)-8s %(message)s", datefmt="(%F %T)")
# change log levels for some libs
logging.getLogger("asyncio").setLevel(logging.ERROR)
logging.getLogger("asyncio.coroutines").setLevel(logging.ERROR)
mgba.log.silence()
r: redis.Redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
# launch ffmpeg process
stream = Popen(
[
"/usr/bin/ffmpeg",
"-y",
"-f",
"image2pipe",
"-vcodec",
"png",
"-r",
f"{EMULATOR_FPS}",
"-s",
f"{EMULATOR_WIDTH}x{EMULATOR_HEIGHT}",
"-i",
"-",
"-f",
"flv",
"-s",
f"{FFMPEG_WIDTH}x{FFMPEG_HEIGHT}",
"-r",
f"{FFMPEG_FPS}",
"-b:v",
FFMPEG_BITRATE,
"-fflags",
"nobuffer",
"-flags",
"low_delay",
"-strict",
"experimental",
RTMP_STREAM_URI,
],
stdin=PIPE,
stdout=PIPE,
stderr=STDOUT,
)
from state_manager import StateManager
def next_action(core: mgba.core.Core) -> None:
@ -96,33 +38,12 @@ def next_action(core: mgba.core.Core) -> None:
if any(votes):
r.mset(KEYS_RESET)
core.set_keys(votes.index(max(votes)))
elif EMULATOR_RAND_RATE != 0.0 and rd.random() < EMULATOR_RAND_RATE: # nosec
core.set_keys(rd.choice(KEYS_MGBA)) # nosec
elif EMULATOR_RAND_RATE != 0.0 and random.random() < EMULATOR_RAND_RATE:
core.set_keys(random.choice(KEYS_MGBA))
else:
core.clear_keys(*KEYS_MGBA)
def state_manager(loop: asyncio.AbstractEventLoop) -> None:
"""Subscribe and respond to messages received from redis.
Args:
loop (asyncio.AbstractEventLoop): the asyncio event loop.
"""
ps = r.pubsub()
ps.subscribe("admin")
while True:
for message in ps.listen():
if message["type"] == "message":
match message["data"].decode("utf-8").split(":", 1):
case ["save"]:
asyncio.ensure_future(utils.save(core), loop=loop)
case ["load", filename]:
asyncio.ensure_future(utils.load(core, filename), loop=loop)
case _:
logging.debug(f"Command not understood: {message}")
async def emulator() -> None:
"""Start the main loop responsible for handling inputs and sending images to ffmpeg."""
while True:
@ -137,7 +58,9 @@ async def emulator() -> None:
# save frame to PNG image
image = screen.to_pil().convert("RGB")
image.save(stream.stdin, "PNG")
image.save(ffmpeg_stream.stdin, "PNG")
# TODO: get audio
# sleep until next frame, if necessary
sleep_t = last_frame_t - time.time() + EMULATOR_SPF
@ -150,15 +73,13 @@ async def main() -> None:
logging.debug("Emulator started !")
loop = asyncio.get_event_loop()
# setup states in redis
files = os.listdir("states")
states = list(filter(lambda x: x.endswith(".state"), files))
for state in states:
r.sadd("states", state.removesuffix(".state")) # voir si oneline possible
# launch the thread to manage state files
state_manager = StateManager(r)
state_manager.start()
# launch the thread to save/load states/games
thread = threading.Thread(target=state_manager, args=(loop,))
thread.start()
# launch the thread to manage incoming messages from redis
redis_manager = RedisManager(r, loop, core)
redis_manager.start()
# launch the event loop, which the emulator relies on
task_emulator = loop.create_task(emulator())
@ -166,6 +87,27 @@ async def main() -> None:
if __name__ == "__main__":
asyncio.run(main())
# setup mGBA emulator
core: mgba.core.Core = mgba.core.load_path(EMULATOR_ROM_PATH)
screen: mgba.image.Image = mgba.image.Image(EMULATOR_WIDTH, EMULATOR_HEIGHT)
core.set_video_buffer(screen)
core.reset()
# TODO: save redis database when SIGINT ?
# setup logging format
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)-8s %(message)s", datefmt="(%F %T)"
)
# change log levels for some libs
logging.getLogger("asyncio").setLevel(logging.ERROR)
logging.getLogger("asyncio.coroutines").setLevel(logging.ERROR)
logging.getLogger("watchdog.observers").setLevel(logging.ERROR)
mgba.log.silence()
# connect to redis database
r: redis.Redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
# TODO: handle signals (SIGINT, ...)
# start the emulator
asyncio.run(main())

48
src/ffmpeg_manager.py Normal file
View file

@ -0,0 +1,48 @@
import subprocess
from settings import (
EMULATOR_FPS,
EMULATOR_HEIGHT,
EMULATOR_WIDTH,
FFMPEG_BITRATE,
FFMPEG_FPS,
FFMPEG_HEIGHT,
FFMPEG_WIDTH,
RTMP_STREAM_URI,
)
# launch ffmpeg process
ffmpeg_stream = subprocess.Popen(
[
"/usr/bin/ffmpeg",
"-y",
"-f",
"image2pipe",
"-vcodec",
"png",
"-r",
f"{EMULATOR_FPS}",
"-s",
f"{EMULATOR_WIDTH}x{EMULATOR_HEIGHT}",
"-i",
"-",
"-f",
"flv",
"-s",
f"{FFMPEG_WIDTH}x{FFMPEG_HEIGHT}",
"-r",
f"{FFMPEG_FPS}",
"-b:v",
FFMPEG_BITRATE,
"-fflags",
"nobuffer",
"-flags",
"low_delay",
"-strict",
"experimental",
RTMP_STREAM_URI,
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)

51
src/redis_manager.py Normal file
View file

@ -0,0 +1,51 @@
import asyncio
import logging
import threading
import time
import mgba.core
import redis
from mgba._pylib import ffi
async def save(core: mgba.core.Core) -> None:
state = core.save_raw_state()
current_time = time.strftime("%Y-%m-%dT%H:%M:%S")
with open(f"states/{current_time}.state", "wb") as state_file:
for byte in state:
state_file.write(byte.to_bytes(4, byteorder="big", signed=False))
logging.debug(f"state saved : {current_time}.state")
async def load(core: mgba.core.Core, filename: str) -> None:
state = ffi.new("unsigned char[397312]") # pulled 397312 straight from my ass, TODO: check mGBA sources ?
with open(f"states/{filename}.state", "rb") as state_file:
for i in range(len(state)):
state[i] = int.from_bytes(state_file.read(4), byteorder="big", signed=False)
core.load_raw_state(state)
logging.debug(f"state loaded : {filename}")
class RedisManager(threading.Thread):
def __init__(self, redis: redis.Redis, loop: asyncio.AbstractEventLoop, core: mgba.core.Core) -> None:
super().__init__()
self.loop = loop
self.pubsub = redis.pubsub()
self.pubsub.subscribe("admin")
self.core = core
def parse_message(self, message: dict[str, str]) -> None:
if message["type"] == "message":
match message["data"].decode("utf-8").split(":", 1):
case ["save"]:
asyncio.ensure_future(save(self.core), loop=self.loop)
case ["load", filename]:
asyncio.ensure_future(load(self.core, filename), loop=self.loop)
case _:
logging.debug(f"Command not understood: {message}")
def run(self) -> None:
while True:
for message in self.pubsub.listen():
self.parse_message(message)

View file

@ -21,19 +21,6 @@ from settings import (
)
from utils import User, Users
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)-8s %(message)s", datefmt="(%F %T)")
# change log levels for some libs
logging.getLogger("asyncio").setLevel(logging.ERROR)
logging.getLogger("asyncio.coroutines").setLevel(logging.ERROR)
logging.getLogger("websockets.server").setLevel(logging.ERROR)
logging.getLogger("websockets.protocol").setLevel(logging.ERROR)
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
r.mset(KEYS_RESET) # type: ignore
USERS: Users = Users()
async def parse_message(user: User, message: websockets.typing.Data) -> None:
"""Parse the user's message.
@ -42,15 +29,15 @@ async def parse_message(user: User, message: websockets.typing.Data) -> None:
user (User): the sender of the message.
message (str): the key received (through the websocket).
"""
msg = KEYS_ID[int(message)]
if user.last_message + USER_TIMEOUT > time.time():
logging.debug(f"dropping action: {message!r} from {user}")
return None
elif (msg := KEYS_ID[int(message)]) in KEYS_ID:
logging.debug(f"dropping action: {msg} from {user}")
elif msg in KEYS_ID:
r.incr(msg)
user.last_message = time.time()
logging.debug(f"received action: {msg} from {user}")
else:
logging.error(f"unsupported action: {message!r} from {user}")
logging.error(f"unsupported action: {msg} from {user}")
async def handler(websocket: websockets.server.WebSocketServerProtocol, path: str) -> None:
@ -78,9 +65,29 @@ async def handler(websocket: websockets.server.WebSocketServerProtocol, path: st
async def main() -> None:
"""Start the websocket server."""
logging.debug("Server started !")
async with websockets.server.serve(handler, WEBSOCKET_SERVE, WEBSOCKET_PORT): # nosec
async with websockets.server.serve(handler, WEBSOCKET_SERVE, WEBSOCKET_PORT):
await asyncio.Future() # run forever
if __name__ == "__main__":
# setup logging format
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)-8s %(message)s", datefmt="(%F %T)"
)
# change log levels for some libs
logging.getLogger("asyncio").setLevel(logging.ERROR)
logging.getLogger("asyncio.coroutines").setLevel(logging.ERROR)
logging.getLogger("websockets.server").setLevel(logging.ERROR)
logging.getLogger("websockets.protocol").setLevel(logging.ERROR)
# connect to redis database
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
r.mset(KEYS_RESET) # type: ignore
# create a User set
USERS: Users = Users()
# start the websocket server
asyncio.run(main())

49
src/state_manager.py Normal file
View file

@ -0,0 +1,49 @@
import logging
import os
import redis
import watchdog.observers
from watchdog.events import FileCreatedEvent, FileDeletedEvent, FileSystemEventHandler
class StateHandler(FileSystemEventHandler):
def __init__(self, redis: redis.Redis) -> None:
super().__init__()
self.redis = redis
def on_created(self, event: FileCreatedEvent) -> None:
"""Called when a file or directory is created.
Args
event (FileCreatedEvent): Event representing file/directory creation.
"""
filename = event.src_path.split("/")[-1]
filename = filename.removesuffix(".state")
self.redis.sadd("states", filename)
logging.debug(f"new statefile: {filename}")
def on_deleted(self, event: FileDeletedEvent) -> None:
"""Called when a file or directory is deleted.
Args
event (FileDeletedEvent): Event representing file/directory deletion.
"""
filename = event.src_path.split("/")[-1]
filename = filename.removesuffix(".state")
self.redis.srem("state", filename)
logging.debug(f"deleted statefile: {filename}")
class StateManager(watchdog.observers.Observer):
def __init__(self, redis: redis.Redis):
super().__init__()
# setup states in redis
files = os.listdir("./states")
statefiles = list(filter(lambda x: x.endswith(".state"), files))
states = list(map(lambda x: x.removesuffix(".state"), statefiles))
redis.sadd("states", *states)
logging.debug("redis server populated with states")
state_handler = StateHandler(redis)
self.schedule(state_handler, "./states", recursive=False)

View file

@ -1,12 +1,9 @@
import logging
import time
from dataclasses import dataclass
from typing import Any
import mgba.core
import websockets.server
import websockets.typing
from mgba._pylib import ffi
class User:
@ -15,11 +12,11 @@ class User:
websocket: websockets.server.WebSocketServerProtocol
last_message: float
def __init__(self, websocket: Any) -> None:
def __init__(self, websocket: websockets.server.WebSocketServerProtocol) -> None:
"""Construct a User object.
Args:
websocket (Any): the websocket used by the user.
websocket (WebSocketServerProtocol): the websocket used by the user.
"""
self.websocket = websocket
self.last_message = time.time()
@ -62,21 +59,3 @@ class Users(set):
"""
self.remove(user)
logging.debug(f"user unregistered: {user}")
async def save(core: mgba.core.Core) -> None:
state = core.save_raw_state()
current_time = time.strftime("%Y-%m-%dT%H:%M:%S")
with open(f"states/{current_time}.state", "wb") as state_file:
for byte in state:
state_file.write(byte.to_bytes(4, byteorder="big", signed=False))
logging.debug(f"state saved : {current_time}.state")
async def load(core: mgba.core.Core, filename: str) -> None:
state = ffi.new("unsigned char[397312]") # pulled 397312 straight from my ass, TODO: check mGBA sources ?
with open(f"states/{filename}.state", "rb") as state_file:
for i in range(len(state)):
state[i] = int.from_bytes(state_file.read(4), byteorder="big", signed=False)
core.load_raw_state(state)
logging.debug(f"state loaded : {filename}")