feat(emulator): created parse_message
function
This commit is contained in:
parent
9270b4a09a
commit
0473db2a28
|
@ -39,48 +39,49 @@ mgba.log.silence()
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
async def parse_message(message: dict[str, str]):
|
||||||
|
if "action" in message:
|
||||||
|
data = message["action"]
|
||||||
|
if data in KEYMAP:
|
||||||
|
key = KEYMAP[data]
|
||||||
|
core.set_keys(key)
|
||||||
|
logging.debug(f"pressing: {key}")
|
||||||
|
elif data == "null":
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
logging.error(f"unsupported action: {data}")
|
||||||
|
|
||||||
|
if "admin" in message:
|
||||||
|
data = message["admin"]
|
||||||
|
if data == "save": # voodoo magic incomming
|
||||||
|
state = core.save_raw_state()
|
||||||
|
with open("states/test.state", "wb") as state_file:
|
||||||
|
for byte in state:
|
||||||
|
state_file.write(byte.to_bytes(4, byteorder="big", signed=False))
|
||||||
|
elif data == "load": # black magic incomming
|
||||||
|
state = ffi.new("unsigned char[397312]")
|
||||||
|
with open("states/test.state", "rb") as state_file:
|
||||||
|
for i in range(len(state)):
|
||||||
|
state[i] = int.from_bytes(state_file.read(4), byteorder="big", signed=False)
|
||||||
|
core.load_raw_state(state)
|
||||||
|
else:
|
||||||
|
logging.error(f"unsupported admin: {data}")
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
with pyvirtualcam.Camera(width=WIDTH, height=HEIGHT, fps=FPS) as cam:
|
with pyvirtualcam.Camera(width=WIDTH, height=HEIGHT, fps=FPS) as cam:
|
||||||
logging.debug(f"Using virtual camera: {cam.device}")
|
logging.debug(f"Using virtual camera: {cam.device}")
|
||||||
async with websockets.connect(URI) as websocket:
|
async with websockets.connect(URI) as websocket:
|
||||||
await websocket.send('{"auth":"password"}')
|
await websocket.send('{"auth":"password"}')
|
||||||
logging.debug(f"connected to: {websocket}")
|
logging.debug(f"connected to: {websocket}")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if not (core.frame_counter % POLLING_RATE):
|
if not (core.frame_counter % POLLING_RATE):
|
||||||
|
|
||||||
core.clear_keys(*KEYMAP.values())
|
core.clear_keys(*KEYMAP.values())
|
||||||
await websocket.send('{"emu":"get"}')
|
await websocket.send('{"emu":"get"}')
|
||||||
message = await websocket.recv()
|
json_message = await websocket.recv()
|
||||||
data = json.loads(message)
|
message = json.loads(json_message)
|
||||||
|
parse_message(message)
|
||||||
if "action" in data:
|
|
||||||
action = data["action"]
|
|
||||||
if action in KEYMAP:
|
|
||||||
key = KEYMAP[action]
|
|
||||||
core.set_keys(key)
|
|
||||||
logging.debug(f"pressing: {key}")
|
|
||||||
elif action == "null":
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
logging.error(f"unsupported action: {data}")
|
|
||||||
|
|
||||||
if "admin" in data:
|
|
||||||
admin = data["admin"]
|
|
||||||
if admin == "save":
|
|
||||||
state = core.save_raw_state()
|
|
||||||
with open("states/test.state", "wb") as state_file:
|
|
||||||
for byte in state:
|
|
||||||
state_file.write(byte.to_bytes(4, byteorder="big", signed=False))
|
|
||||||
elif admin == "load":
|
|
||||||
state = ffi.new("unsigned char[397312]")
|
|
||||||
with open("states/test.state", "rb") as state_file:
|
|
||||||
for i in range(len(state)):
|
|
||||||
ptdr = state_file.read(4)
|
|
||||||
state[i] = int.from_bytes(ptdr, byteorder="big", signed=False)
|
|
||||||
core.load_raw_state(state)
|
|
||||||
|
|
||||||
else:
|
|
||||||
logging.error(f"unsupported admin: {data}")
|
|
||||||
|
|
||||||
core.run_frame()
|
core.run_frame()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue