feat(emulator): added random keys when no input
This commit is contained in:
parent
bbb1402f4e
commit
457790362a
|
@ -3,6 +3,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import random as rd
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from subprocess import PIPE, Popen # nosec
|
from subprocess import PIPE, Popen # nosec
|
||||||
|
@ -17,6 +18,7 @@ from settings import (
|
||||||
EMULATOR_FPS,
|
EMULATOR_FPS,
|
||||||
EMULATOR_HEIGHT,
|
EMULATOR_HEIGHT,
|
||||||
EMULATOR_POLLING_RATE,
|
EMULATOR_POLLING_RATE,
|
||||||
|
EMULATOR_RAND_RATE,
|
||||||
EMULATOR_ROM_PATH,
|
EMULATOR_ROM_PATH,
|
||||||
EMULATOR_SPF,
|
EMULATOR_SPF,
|
||||||
EMULATOR_WIDTH,
|
EMULATOR_WIDTH,
|
||||||
|
@ -80,7 +82,7 @@ stream = Popen(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def next_action():
|
def next_action(core: mgba.core.Core):
|
||||||
"""Select the next key from the redis database.
|
"""Select the next key from the redis database.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -89,9 +91,11 @@ def next_action():
|
||||||
votes: list[int] = list(map(int, r.mget(KEYS_ID)))
|
votes: list[int] = list(map(int, r.mget(KEYS_ID)))
|
||||||
if any(votes):
|
if any(votes):
|
||||||
r.mset(KEYS_RESET)
|
r.mset(KEYS_RESET)
|
||||||
return votes.index(max(votes))
|
core.set_keys(votes.index(max(votes)))
|
||||||
|
elif EMULATOR_RAND_RATE != 0.0 and rd.random() < EMULATOR_RAND_RATE: # nosec
|
||||||
|
core.set_keys(rd.choice(KEYS_MGBA)) # nosec
|
||||||
else:
|
else:
|
||||||
return -1 # TODO: add random
|
core.clear_keys(*KEYS_MGBA)
|
||||||
|
|
||||||
|
|
||||||
def state_manager(loop: asyncio.AbstractEventLoop):
|
def state_manager(loop: asyncio.AbstractEventLoop):
|
||||||
|
@ -122,10 +126,7 @@ async def emulator():
|
||||||
|
|
||||||
# poll redis for keys
|
# poll redis for keys
|
||||||
if not (core.frame_counter % EMULATOR_POLLING_RATE):
|
if not (core.frame_counter % EMULATOR_POLLING_RATE):
|
||||||
core.clear_keys(*KEYS_MGBA)
|
next_action(core)
|
||||||
next_key = next_action()
|
|
||||||
if next_key != -1:
|
|
||||||
core.set_keys(next_key)
|
|
||||||
|
|
||||||
# mGBA run next frame
|
# mGBA run next frame
|
||||||
core.run_frame()
|
core.run_frame()
|
||||||
|
@ -162,4 +163,4 @@ async def main():
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
||||||
# TODO: write code when ctrl+C -> save redis database ?
|
# TODO: save redis database when SIGINT ?
|
||||||
|
|
|
@ -22,6 +22,7 @@ EMULATOR_SPF: float = 1.0 / EMULATOR_FPS
|
||||||
EMULATOR_INPUT_HZ: int = int(getenv("EMULATOR_INPUT_HZ", 10))
|
EMULATOR_INPUT_HZ: int = int(getenv("EMULATOR_INPUT_HZ", 10))
|
||||||
EMULATOR_POLLING_RATE: int = EMULATOR_FPS // EMULATOR_INPUT_HZ
|
EMULATOR_POLLING_RATE: int = EMULATOR_FPS // EMULATOR_INPUT_HZ
|
||||||
EMULATOR_ROM_PATH: str = getenv("EMULATOR_ROM_PATH", "roms/pokemon.gba")
|
EMULATOR_ROM_PATH: str = getenv("EMULATOR_ROM_PATH", "roms/pokemon.gba")
|
||||||
|
EMULATOR_RAND_RATE: float = float(getenv("EMULATOR_RAND_RATE", 0.0))
|
||||||
|
|
||||||
FFMPEG_WIDTH: int = int(getenv("FFMPEG_WIDTH", EMULATOR_WIDTH))
|
FFMPEG_WIDTH: int = int(getenv("FFMPEG_WIDTH", EMULATOR_WIDTH))
|
||||||
FFMPEG_HEIGHT: int = int(getenv("FFMPEG_HEIGHT", EMULATOR_HEIGHT))
|
FFMPEG_HEIGHT: int = int(getenv("FFMPEG_HEIGHT", EMULATOR_HEIGHT))
|
||||||
|
@ -44,6 +45,6 @@ KEYMAP: dict[str, int] = {
|
||||||
"r": 8,
|
"r": 8,
|
||||||
"l": 9,
|
"l": 9,
|
||||||
}
|
}
|
||||||
KEYS_ID: tuple = tuple(KEYMAP.keys())
|
KEYS_ID: tuple[str, ...] = tuple(KEYMAP.keys())
|
||||||
KEYS_MGBA: tuple = tuple(KEYMAP.values())
|
KEYS_MGBA: tuple[int, ...] = tuple(KEYMAP.values())
|
||||||
KEYS_RESET: dict = dict([(x, 0) for x in KEYS_ID])
|
KEYS_RESET: dict[str, int] = dict([(x, 0) for x in KEYS_ID])
|
||||||
|
|
|
@ -3,6 +3,7 @@ import time
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
import mgba.core
|
||||||
import websockets.server
|
import websockets.server
|
||||||
import websockets.typing
|
import websockets.typing
|
||||||
from mgba._pylib import ffi
|
from mgba._pylib import ffi
|
||||||
|
@ -63,7 +64,7 @@ class Users(set):
|
||||||
logging.debug(f"user unregistered: {user}")
|
logging.debug(f"user unregistered: {user}")
|
||||||
|
|
||||||
|
|
||||||
async def save(core):
|
async def save(core: mgba.core.Core):
|
||||||
state = core.save_raw_state()
|
state = core.save_raw_state()
|
||||||
current_time = time.strftime("%Y-%m-%dT%H:%M:%S")
|
current_time = time.strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
with open(f"states/{current_time}.state", "wb") as state_file:
|
with open(f"states/{current_time}.state", "wb") as state_file:
|
||||||
|
@ -72,9 +73,8 @@ async def save(core):
|
||||||
logging.debug(f"state saved : {current_time}.state")
|
logging.debug(f"state saved : {current_time}.state")
|
||||||
|
|
||||||
|
|
||||||
async def load(core, filename):
|
async def load(core: mgba.core.Core, filename: str):
|
||||||
state = ffi.new("unsigned char[397312]") # pulled 397312 straight from my ass
|
state = ffi.new("unsigned char[397312]") # pulled 397312 straight from my ass, TODO: check mGBA sources ?
|
||||||
# TODO: checker les sources mgba pour savoir d'où sort 397312
|
|
||||||
with open(f"states/{filename}.state", "rb") as state_file:
|
with open(f"states/{filename}.state", "rb") as state_file:
|
||||||
for i in range(len(state)):
|
for i in range(len(state)):
|
||||||
state[i] = int.from_bytes(state_file.read(4), byteorder="big", signed=False)
|
state[i] = int.from_bytes(state_file.read(4), byteorder="big", signed=False)
|
||||||
|
|
Loading…
Reference in a new issue