diff --git a/src/server.py b/src/server.py index e521135..e2a119f 100644 --- a/src/server.py +++ b/src/server.py @@ -2,39 +2,28 @@ import asyncio import json import logging import time +from dataclasses import dataclass from typing import Any import websockets -logging.basicConfig(level=logging.DEBUG) - class User: """Store infos related to a connected user.""" - websocket: Any = None - last_message: float = time.time() - has_voted: bool = False + websocket: Any + last_message: float + has_voted: bool - def register(self, websocket: Any): - """Register a user in the `USERS` set. + def __init__(self, websocket: Any) -> None: + """Construct a User object. Args: websocket (Any): the websocket used by the user. """ self.websocket = websocket - USERS.add(self) - logging.debug( - f"user registered: {self}", - ) - - def unregister(self): - """Unregister a user from the `USERS` set.""" - # self.websocket.close() - USERS.remove(self) - logging.debug( - f"user unregistered: {self}", - ) + self.last_message = time.time() + self.has_voted = False async def send(self, data: str): """Send data through the user's websocket. @@ -42,7 +31,7 @@ class User: Args: data (str): message to send. """ - self.websocket.send(data) + await self.websocket.send(data) def __str__(self) -> str: """Convert user to string. @@ -53,43 +42,77 @@ class User: return f"{self.websocket.remote_address} ({self.websocket.id})" -USERS: set[User] = set() -EMULATOR: User -ADMIN: User +@dataclass +class Users(set): + """Store `User`s connected to the server.""" + + emulator: User = User(None) + admin: User = User(None) + + def register(self, user: User): + """Register a user in the set. + + Args: + user (User): the user to register. + """ + self.add(user) + logging.debug(f"user registered: {user}") + + def unregister(self, user: User): + """Unregister a user in the set. + + Args: + user (User): the user to unregister. + """ + self.remove(user) + logging.debug(f"user unregistered: {self}") + + def clear(self) -> None: + """Clear the `has_voted` of each user in the set.""" + for user in self: + user.has_voted = False + + +@dataclass +class Votes(dict): + """Store the votes sent by users.""" + + def __init__(self) -> None: + """Construct a `Votes` object.""" + super(Votes, self) + self["a"] = 0 + self["b"] = 0 + self["select"] = 0 + self["start"] = 0 + self["right"] = 0 + self["left"] = 0 + self["up"] = 0 + self["down"] = 0 + self["r"] = 0 + self["l"] = 0 + + def clear(self) -> None: + """Clear the `VOTES` dict.""" + for key in self.keys(): + self[key] = 0 + + def next_vote(self): + """Return the most voted action in the last frame. + + Returns: + str: the most voted action. + """ + if any(self.values()): + return max(self, key=self.get) + else: + return "null" + + +logging.basicConfig(level=logging.DEBUG) + PASSWORD: str = "password" - -VOTES: dict[str, int] = { - "a": 0, - "b": 0, - "select": 0, - "start": 0, - "right": 0, - "left": 0, - "up": 0, - "down": 0, - "r": 0, - "l": 0, -} - - -def clear_votes(): - """Clear the `VOTES` dict.""" - for key in VOTES.keys(): - VOTES[key] = 0 - for user in USERS: - user.has_voted = False - - -def next_move(): - """Return the most voted action in the last frame. - - Returns: - str: the most voted action. - """ - if any(VOTES.values()): - return max(VOTES, key=VOTES.get) - else: - return "null" +VOTES: Votes = Votes() +USERS: Users = Users() async def parse_message(user: User, msg: dict[str, str]): @@ -99,18 +122,14 @@ async def parse_message(user: User, msg: dict[str, str]): user (User): the sender of the message. msg (dict[str, str]): the message received through websocket. """ - # Special users - global EMULATOR - global ADMIN - if "auth" in msg: data = msg["auth"] - if not EMULATOR and data == PASSWORD: - EMULATOR = user + if data == PASSWORD: + USERS.emulator = user logging.debug(f"emulator authenticated: {user}") - elif not ADMIN and data == PASSWORD: - ADMIN = user - logging.debug(f"admin authenticated: {user}") + # elif not USERS.admin and data == PASSWORD: + # USERS.admin = user + # logging.debug(f"admin authenticated: {user}") if "action" in msg: data = msg["action"] @@ -123,47 +142,47 @@ async def parse_message(user: User, msg: dict[str, str]): if "admin" in msg: data = msg["admin"] - if user == ADMIN: + if user == USERS.admin: if data == "save": - await EMULATOR.send('{"admin":"save"}') + await USERS.emulator.send('{"admin":"save"}') elif data == "load": - await EMULATOR.send('{"admin":"load"}') + await USERS.emulator.send('{"admin":"load"}') else: logging.error(f"unsupported admin action: {data}") else: - logging.error(f"user is not ADMIN: {user}") + logging.error(f"user is not admin: {user}") if "emu" in msg: data = msg["emu"] - if user == EMULATOR: + if user == USERS.emulator: if data == "get": - move = next_move() - await EMULATOR.send(f'{{"action":"{move}"}}') - clear_votes() + await USERS.emulator.send(f'{{"action":"{VOTES.next_vote()}"}}') + VOTES.clear() + USERS.clear() else: logging.error(f"unsupported emulator action: {data}") else: - logging.error(f"user is not EMULATOR: {user}") + logging.error(f"user is not emulator: {user}") async def handler(websocket: Any, path: str): """Handle the messages sent by a user. Args: - websocket (Any): the websocket used by the user - path (str): the path used by the websocket ? + websocket (Any): the websocket used by the user. + path (str): the path used by the websocket. (?) """ try: # Register user - user = User() - user.register(websocket) + user = User(websocket) + USERS.register(user) # Manage received messages async for json_message in websocket: message: dict[str, str] = json.loads(json_message) await parse_message(user, message) finally: # Unregister user - user.unregister() + USERS.unregister(user) async def main():