booplaysgba/src/server.py

123 lines
3.7 KiB
Python
Raw Normal View History

import asyncio
import json
import logging
import os
import time
from typing import Any
import websockets
from utils import User, Users, Votes
2021-11-01 22:15:16 +00:00
# logging.basicConfig(level=logging.DEBUG)
2021-11-01 22:15:16 +00:00
PASSWORD_ADMIN: str = "admin_password"
PASSWORD_EMU: str = "emulator_password"
2021-11-01 22:15:16 +00:00
VOTES: Votes = Votes()
USERS: Users = Users()
async def send_states(user: User) -> None:
files = os.listdir("states")
states = list(filter(lambda x: x.endswith(".state"), files))
message = json.dumps({"state": states})
await user.send(message)
async def parse_message(user: User, message: dict[str, str]) -> None:
2021-11-02 18:32:54 +00:00
"""Parse the user's message.
2021-11-01 20:13:01 +00:00
Args:
user (User): the sender of the message.
2021-11-02 18:32:54 +00:00
message (dict[str, str]): the data received (through the websocket).
2021-11-01 20:13:01 +00:00
"""
2021-11-02 18:32:54 +00:00
if "auth" in message:
data = message["auth"]
if USERS.emulator is None and data == PASSWORD_EMU and user != USERS.admin:
2021-11-01 22:15:16 +00:00
USERS.emulator = user
2021-11-01 20:13:01 +00:00
logging.debug(f"emulator authenticated: {user}")
await user.send('{"auth":"success"}')
elif USERS.admin is None and data == PASSWORD_ADMIN and user != USERS.emulator:
USERS.admin = user
logging.debug(f"admin authenticated: {user}")
await user.send('{"auth":"success"}')
2021-11-01 20:13:01 +00:00
2021-11-02 18:32:54 +00:00
if "action" in message:
data = message["action"]
2021-11-01 20:13:01 +00:00
if data in VOTES:
VOTES[data] += 1
user.last_message = time.time()
user.has_voted = True
else:
logging.error(f"unsupported action: {data}")
2021-11-02 18:32:54 +00:00
if "admin" in message:
data = message["admin"]
if USERS.emulator is not None and user == USERS.admin:
2021-11-01 20:13:01 +00:00
if data == "save":
2021-11-01 22:15:16 +00:00
await USERS.emulator.send('{"admin":"save"}')
2021-11-01 20:13:01 +00:00
elif data == "load":
2021-11-01 22:15:16 +00:00
await USERS.emulator.send('{"admin":"load"}')
2021-11-01 20:13:01 +00:00
else:
logging.error(f"unsupported admin action: {data}")
else:
2021-11-01 22:15:16 +00:00
logging.error(f"user is not admin: {user}")
2021-11-01 20:13:01 +00:00
2021-11-02 18:32:54 +00:00
if "emu" in message:
data = message["emu"]
2021-11-01 22:15:16 +00:00
if user == USERS.emulator:
2021-11-01 20:13:01 +00:00
if data == "get":
2021-11-01 22:15:16 +00:00
await USERS.emulator.send(f'{{"action":"{VOTES.next_vote()}"}}')
VOTES.clear()
USERS.clear()
2021-11-01 20:13:01 +00:00
else:
logging.error(f"unsupported emulator action: {data}")
else:
2021-11-01 22:15:16 +00:00
logging.error(f"user is not emulator: {user}")
2021-11-01 20:13:01 +00:00
if "state" in message:
data = message["state"]
if user == USERS.admin:
if data == "get":
await send_states(user)
else:
logging.error(f"unsupported state action: {data}")
else:
logging.error(f"user is not admin: {user}")
2021-11-01 20:13:01 +00:00
async def handler(websocket: Any, path: str):
"""Handle the messages sent by a user.
Args:
2021-11-01 22:15:16 +00:00
websocket (Any): the websocket used by the user.
path (str): the path used by the websocket. (?)
2021-11-01 20:13:01 +00:00
"""
try:
# Register user
2021-11-01 22:15:16 +00:00
user = User(websocket)
USERS.register(user)
# Manage received messages
2021-11-01 20:13:01 +00:00
async for json_message in websocket:
message: dict[str, str] = json.loads(json_message)
await parse_message(user, message)
finally:
# Unregister user
2021-11-01 22:15:16 +00:00
USERS.unregister(user)
if user == USERS.admin:
USERS.admin = None
logging.debug(f"admin disconnected: {user}")
elif user == USERS.emulator:
logging.debug(f"emulator disconnected: {user}")
USERS.emulator = None
async def main():
2021-11-01 20:13:01 +00:00
"""Start the websocket server."""
async with websockets.serve(handler, "localhost", 6789):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())