booplaysgba/src/server.py

197 lines
5.1 KiB
Python
Raw Normal View History

import asyncio
import json
import logging
import time
2021-11-01 22:15:16 +00:00
from dataclasses import dataclass
from typing import Any, Optional
import websockets
class User:
2021-11-01 20:13:01 +00:00
"""Store infos related to a connected user."""
2021-11-01 22:15:16 +00:00
websocket: Any
last_message: float
has_voted: bool
2021-11-01 22:15:16 +00:00
def __init__(self, websocket: Any) -> None:
"""Construct a User object.
2021-11-01 20:13:01 +00:00
Args:
websocket (Any): the websocket used by the user.
"""
self.websocket = websocket
2021-11-01 22:15:16 +00:00
self.last_message = time.time()
self.has_voted = False
2021-11-01 20:13:01 +00:00
async def send(self, data: str):
"""Send data through the user's websocket.
Args:
data (str): message to send.
"""
2021-11-01 22:15:16 +00:00
await self.websocket.send(data)
2021-11-01 20:13:01 +00:00
def __str__(self) -> str:
2021-11-01 20:13:01 +00:00
"""Convert user to string.
Returns:
str: string representing the user.
"""
return f"{self.websocket.remote_address} ({self.websocket.id})"
2021-11-01 22:15:16 +00:00
@dataclass
class Users(set):
"""Store `User`s connected to the server."""
emulator: Optional[User] = None
admin: Optional[User] = None
2021-11-01 22:15:16 +00:00
def register(self, user: User):
"""Register a user in the set.
Args:
user (User): the user to register.
"""
self.add(user)
logging.debug(f"user registered: {user}")
def unregister(self, user: User):
"""Unregister a user in the set.
Args:
user (User): the user to unregister.
"""
self.remove(user)
logging.debug(f"user unregistered: {self}")
def clear(self) -> None:
"""Clear the `has_voted` of each user in the set."""
for user in self:
user.has_voted = False
@dataclass
class Votes(dict):
"""Store the votes sent by users."""
def __init__(self) -> None:
"""Construct a `Votes` object."""
super(Votes, self)
self["a"] = 0
self["b"] = 0
self["select"] = 0
self["start"] = 0
self["right"] = 0
self["left"] = 0
self["up"] = 0
self["down"] = 0
self["r"] = 0
self["l"] = 0
def clear(self) -> None:
"""Clear the `VOTES` dict."""
for key in self.keys():
self[key] = 0
def next_vote(self):
"""Return the most voted action in the last frame.
Returns:
str: the most voted action.
"""
if any(self.values()):
return max(self, key=self.get)
else:
return "null"
logging.basicConfig(level=logging.DEBUG)
PASSWORD_ADMIN: str = "password"
PASSWORD_EMU: str = "password"
2021-11-01 22:15:16 +00:00
VOTES: Votes = Votes()
USERS: Users = Users()
2021-11-01 20:13:01 +00:00
async def parse_message(user: User, msg: dict[str, str]):
"""Parse the `user`'s `msg`.
Args:
user (User): the sender of the message.
msg (dict[str, str]): the data received (through the websocket).
2021-11-01 20:13:01 +00:00
"""
if "auth" in msg:
data = msg["auth"]
if USERS.emulator is not None and data == PASSWORD_EMU:
2021-11-01 22:15:16 +00:00
USERS.emulator = user
2021-11-01 20:13:01 +00:00
logging.debug(f"emulator authenticated: {user}")
elif USERS.admin is not None and data == PASSWORD_ADMIN:
USERS.admin = user
logging.debug(f"admin authenticated: {user}")
2021-11-01 20:13:01 +00:00
if "action" in msg:
data = msg["action"]
if data in VOTES:
VOTES[data] += 1
user.last_message = time.time()
user.has_voted = True
else:
logging.error(f"unsupported action: {data}")
if "admin" in msg:
data = msg["admin"]
if USERS.emulator is not None and user == USERS.admin:
2021-11-01 20:13:01 +00:00
if data == "save":
2021-11-01 22:15:16 +00:00
await USERS.emulator.send('{"admin":"save"}')
2021-11-01 20:13:01 +00:00
elif data == "load":
2021-11-01 22:15:16 +00:00
await USERS.emulator.send('{"admin":"load"}')
2021-11-01 20:13:01 +00:00
else:
logging.error(f"unsupported admin action: {data}")
else:
2021-11-01 22:15:16 +00:00
logging.error(f"user is not admin: {user}")
2021-11-01 20:13:01 +00:00
if "emu" in msg:
data = msg["emu"]
2021-11-01 22:15:16 +00:00
if user == USERS.emulator:
2021-11-01 20:13:01 +00:00
if data == "get":
2021-11-01 22:15:16 +00:00
await USERS.emulator.send(f'{{"action":"{VOTES.next_vote()}"}}')
VOTES.clear()
USERS.clear()
2021-11-01 20:13:01 +00:00
else:
logging.error(f"unsupported emulator action: {data}")
else:
2021-11-01 22:15:16 +00:00
logging.error(f"user is not emulator: {user}")
2021-11-01 20:13:01 +00:00
async def handler(websocket: Any, path: str):
"""Handle the messages sent by a user.
Args:
2021-11-01 22:15:16 +00:00
websocket (Any): the websocket used by the user.
path (str): the path used by the websocket. (?)
2021-11-01 20:13:01 +00:00
"""
try:
# Register user
2021-11-01 22:15:16 +00:00
user = User(websocket)
USERS.register(user)
# Manage received messages
2021-11-01 20:13:01 +00:00
async for json_message in websocket:
message: dict[str, str] = json.loads(json_message)
await parse_message(user, message)
finally:
# Unregister user
2021-11-01 22:15:16 +00:00
USERS.unregister(user)
async def main():
2021-11-01 20:13:01 +00:00
"""Start the websocket server."""
async with websockets.serve(handler, "localhost", 6789):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())