#!/usr/bin/env python """ WebSocket game coordinator for ATC. Provides list of currently open games for server-browser purposes, and provides connection management and game state forwarding between players. syntax: python game_coordinator.py [-ip ] [-port ] e.g.: python game_coordinator.py -ip 127.0.0.1 -port 25565 """ import asyncio import json import secrets import sys import random import time from string import ascii_uppercase # optional, for 4-letter id format import websockets from websockets.asyncio.server import serve DEFAULT_PORT = 8181 DEFAULT_IP = "" MAX_LOBBY_ID_GEN_ATTEMPTS = 20 LOBBY_CONTROL_COMMANDS = ["disconnect", "kick_player", "get_lobby_info", "deny_player", "accept_player", "end_lobby", "set_lobby_locked", "set_lobby_state", "set_paused"] HOST_LOBBY_REQD_FIELDS = ["lobby_name", "game_type", "username", "private", "password", "max_players"] HOST_LOBBY_REQD_FIELD_TYPES = [str, str, str, bool, str, int] DISCONNECT_GRACE_PERIOD = 20 # time before removing PAUSED_GRACE_PERIOD = 300 # lobby can be set to paused mode to enable players to be disconnected for up to this number of seconds JOIN_REQUEST_TIME = 8 MAX_JOIN_REQUESTS = 3 # maximum number of times a client can ask the game host to join without receiving a response before the lobby is marked as closed LOBBY_CLOSING_TIME = 5 SOCKET_GRACE_PERIOD = 2 LOBBIES = {} # Send active games for server browser purposes async def send_lobby_list(websocket): lobbies = {"type" : "lobby_list", "lobbies" : []} for lobby_id, lobby_info in LOBBIES.items(): current_lobby_info = {"id" : lobby_id, "lobby_name" : lobby_info["lobby_name"], "state" : lobby_info["lobby_state"], "current_players" : len(lobby_info["players"]), "max_players" : lobby_info["max_players"], "private" : lobby_info["private"]} lobbies["lobbies"].append( current_lobby_info ) await websocket.send(json.dumps(lobbies)) await asyncio.sleep(SOCKET_GRACE_PERIOD) # wait 2 seconds before ending connection to allow packet to be sent """ LOBBY FUNCTIONS """ # generate lobby id async def generate_lobby_id(): new_id = None attempts = 0 while (attempts < MAX_LOBBY_ID_GEN_ATTEMPTS) and ( (not new_id) or (new_id in LOBBIES) ): new_id = ''.join(random.choices(ascii_uppercase, k=4)) # DEFINE FID FORMAT HERE attempts += 1 if attempts >= MAX_LOBBY_ID_GEN_ATTEMPTS: return secrets.token_urlsafe(12) # fall back to non-repeating lobby ID return new_id # get information of all connected players to lobby for async def get_player_info(lobby): players = [] for player_id, player in lobby["players"].items(): if "dead" in player: continue # skip to-be-deleted players players.append( {"username" : player["username"], "player_id" : player_id, "connection_status" : player["connection_status"], "is_host" : player["is_host"], "waiting" : True if "waiting" in player else False } ) return players # send message to all active players within lobby async def broadcast_active(lobby, message): for player_id in lobby["players"]: player = lobby["players"][player_id] if player["connection_status"] == 1 and ("waiting" not in player): try: await player["socket"].send(json.dumps(message)) except websockets.exceptions.ConnectionClosed: # shouldn't happen since disconnected players should update the connection_status variable print(f"Couldn't broadcast to player {player['player_id']}: socket closed.") # create player object async def create_player(websocket, username, is_host=False, waiting_args={}): player = {"socket" : websocket, "username" : username, "is_host" : is_host, "connection_status" : 1, "player_id" : secrets.token_urlsafe(8), "rejoin_key" : secrets.token_urlsafe(10)} if not is_host: player["waiting"] = True player["waiting_since"] = int(time.time()) player["waiting_retries"] = 0 player["waiting_args"] = waiting_args return player # finally delete player, and if necessary shut down server or change host async def remove_player(lobby, player_id): if not lobby: return lobby_id = lobby["lobby_id"] was_host = lobby["players"][player_id]["is_host"] print(f"Removing player {player_id} from lobby {lobby['lobby_id']}.") del lobby["players"][player_id] if len(lobby["players"]) < 1 and (lobby_id in LOBBIES) and ("dead" not in lobby): print(f"Destroying lobby {lobby['lobby_id']}: no remaining players.") del LOBBIES[lobby["lobby_id"]] elif was_host: # host must be transferred print("changing host") new_host_id = None for player_id in lobby["players"]: if "waiting" in lobby["players"][player_id] or ("dead" in lobby["players"][player_id]) or (lobby["players"][player_id]["connection_status"] != 1): continue new_host_id = player_id break if not new_host_id: if (lobby_id in LOBBIES) and ("dead" not in lobby): print(f"No valid remaining host, destroying lobby...") await end_lobby(lobby["lobby_id"]) return lobby["players"][new_host_id]["is_host"] = True lobby["host"] = lobby["players"][new_host_id] player_info = await get_player_info(lobby) message = {"type" : "announcement_change_host", "message" : "Lobby host has been changed.", "player_id" : new_host_id, "current_players" : player_info} await broadcast_active(lobby, message) # disconnect player, notifying rest of lobby async def disconnect_player(lobby, player_id, reason='disconnect'): dc_player = lobby["players"][player_id] dc_player["connection_status"] = 0 if "dead" not in dc_player: response = {"type" : "disconnected", "message" : f"Disconnected. Reason: {reason}", "reason" : reason} try: await dc_player["socket"].send(json.dumps(response)) except: pass lobby["players"][player_id]["dead"] = True player_info = await get_player_info(lobby) announcement = {"type" : "announcement_player_disconnect", "reason" : reason, "player_id" : player_id, "username" : dc_player["username"], "current_players" : player_info} await broadcast_active(lobby, announcement) await asyncio.sleep(SOCKET_GRACE_PERIOD) await lobby["players"][player_id]["socket"].close() await remove_player(lobby, player_id) # called when `for message in websocket` loop ended without a reason determiend by the rest of the GC logic async def connection_lost(lobby, player_id): if player_id in lobby["players"]: lobby["players"][player_id]["connection_status"] = 0 current_players = await get_player_info(lobby) announcement = {"type" : "announcement_player_connerr", "player_id" : player_id, "current_players" : current_players } await broadcast_active(lobby, announcement) await asyncio.sleep( DISCONNECT_GRACE_PERIOD if not lobby["paused"] else PAUSED_GRACE_PERIOD ) if player_id in lobby["players"] and lobby["players"][player_id]["connection_status"] == 0: lobby["players"][player_id]["dead"] = True await disconnect_player(lobby, player_id, 'connection_lost') # notify all players in lobby that it is ending, then delete it and all sockets attached to it async def end_lobby(lobby_id): if lobby_id not in LOBBIES or ("dead" in LOBBIES[lobby_id]): return LOBBIES[lobby_id]["dead"] = True lobby = LOBBIES[lobby_id] message = {"type" : "lobby_closing"} for player_id in lobby["players"]: player = lobby["players"][player_id] if player["connection_status"] == 1: await player["socket"].send(json.dumps(message)) await asyncio.sleep(LOBBY_CLOSING_TIME) for player_id in lobby["players"]: try: lobby["players"][player_id]["dead"] = True await lobby["players"][player_id]["socket"].close() except: pass print(f"Destroyed lobby: {lobby["lobby_name"]} ({lobby_id})") del LOBBIES[lobby_id] # notify host that player is requesting to join async def send_join_request_to_host(lobby, player_id): player = lobby["players"][player_id] join_request = {"type" : "join_request", "player_id" : player_id, "username" : player["username"], "args" : player["waiting_args"]} player["waiting_since"] = int(time.time()) player["waiting_retries"] += 1 if lobby["host"]["connection_status"] == 1: await lobby["host"]["socket"].send(json.dumps(join_request)) # main logic loop for connected clients async def lobby_loop(websocket, lobby, player): player_id = player["player_id"] lobby_id = lobby["lobby_id"] async for message in websocket: event = json.loads(message) lobby = LOBBIES[lobby_id] player = lobby["players"][player_id] # handle messages sent from connected players yet unconfirmed by host if "waiting" in player: response = { "type" : "waiting_notification", "message" : "You have not yet been accepted by the host." } await websocket.send(json.dumps(response)) if ( int(time.time()) - player["waiting_since"] ) > JOIN_REQUEST_TIME: if player["waiting_retries"] > MAX_JOIN_REQUESTS: await end_lobby(lobby_id) # does this need to be awaited? return True try: await send_join_request_to_host(lobby, player_id) except Exception as e: print(e) continue # deny any other requests # host can kick players, accept players, and lock/unlock/end the game # all players can disconnect and query lobby status (includes player info) if event["type"] == "lobby_control": if "command" not in event or (event["command"] not in LOBBY_CONTROL_COMMANDS): response = {"type" : "gc_error", "message" : "Missing or invalid 'lobby_control' command."} await websocket.send(json.dumps(response)) continue if event["command"] == "disconnect": await disconnect_player(lobby, player_id) return True # end of connection elif event["command"] == "get_lobby_info": player_info = await get_player_info(lobby) response = {"type" : "lobby_status", "state" : lobby["lobby_state"], "player_count" : []} # TODO await websocket.send(json.dumps(response)) continue if not player["is_host"]: response = {"type" : "gc_error", "message" : f"Only host has access to command {event['command']}."} await websocket.send(json.dumps(response)) continue # following commands only available to game host if event["command"] == "accept_player": if ("player_id" not in event) or (event["player_id"] not in lobby["players"]): response = {"type" : "gc_error", "message" : "No player ID given, or given player ID is not in the lobby."} await websocket.send(json.dumps(response)) continue accepted_id = event["player_id"] if "waiting" not in lobby["players"][accepted_id]: response = {"type" : "gc_error", "message" : "Player already accepted."} await websocket.send(json.dumps(response)) continue accepted_player = lobby["players"][accepted_id] del accepted_player["waiting"] del accepted_player["waiting_args"] del accepted_player["waiting_since"] del accepted_player["waiting_retries"] current_players = await get_player_info(lobby) notification = { "type" : "join_ok", "current_players" : current_players} if "args" in event: notification["args"] = event["args"] await accepted_player["socket"].send(json.dumps(notification)) # notify waiting player of acceptance elif event["command"] == "deny_player": if ("player_id" not in event) or (event["player_id"] not in lobby["players"]): response = {"type" : "gc_error", "message" : "No player ID given, or given player ID is not in the lobby."} await websocket.send(json.dumps(response)) continue denied_id = event["player_id"] denied_player = lobby["players"][denied_id] reason_text = (" (" + event["reason"] + ")") if "reason" in event else "" await disconnect_player(lobby, denied_id, "denied"+reason_text) elif event["command"] == "kick_player": if ("player_id" not in event) or (event["player_id"] not in lobby["players"]): response = {"type" : "gc_error", "message" : "No player ID given, or given player ID is not in the lobby."} await websocket.send(json.dumps(response)) continue kicked_id = event["player_id"] await disconnect_player(lobby, kicked_id, "kicked") elif event["command"] == "set_lobby_locked": if "locked" not in event or (type(event["locked"]) is not bool): response = {"type" : "gc_error", "message" : "Missing or invalid 'locked' argument."} await websocket.send(json.dumps(response)) continue lobby["locked"] = event["locked"] announcement = { "type" : "announcement_lock_changed", "locked" : event["locked"] } await broadcast_active(lobby, announcement) elif event["command"] == "end_lobby": await end_lobby(lobby_id) return True # end of lobby, end of loop elif event["command"] == "set_lobby_state": # TODO if "state" not in event or (type(event["state"]) is not str): response = {"type" : "gc_error", "message" : "Missing or invalid 'state' argument."} await websocket.send(json.dumps(response)) continue lobby["lobby_state"] = event["state"] announcement = { "type" : "announcement_lobby_state_changed", "state" : event["state"] } await broadcast_active(lobby, announcement) continue # ignore subsequent checks # message forwarding elif event["type"] == "request": response = {"type" : "request", "destination" : player_id, "request_fields" : event["request_fields"] } await game["players"][ event["source"] ]["socket"].send( json.dumps(response) ) elif event["type"] == "request_response": response = {"type" : "request_response", "source" : player_id ,"requested_data" : event["requested_data"] } await game["players"][ event["destination"] ]["socket"].send( json.dumps(response) ) elif event["type"] == "broadcast": message = {"type" : "broadcast", "command" : event["command"], "payload" : event["payload"]} for _player_id in lobby["players"]: player = lobby["players"][_player_id] if player["connection_status"] == 1 and ( (_player_id != player_id) or ("to_self" in event) ): await player["socket"].send(json.dumps(message)) if lobby_id not in LOBBIES: return True # lobby closed if player_id not in LOBBIES[lobby_id]["players"] or ("dead" in LOBBIES[lobby_id]["players"][player_id]): return True # player removed return False # socket not closed cordially # join existing lobby async def join_lobby(websocket, event): for required_field in ["lobby_id", "game_type", "username"]: if required_field not in event: response = {"type" : "join_fail", "message" : f"Missing required field: '{required_field}'."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection lobby_id = event["lobby_id"] if lobby_id not in LOBBIES: response = {"type" : "join_fail", "message" : f"Lobby '{lobby_id}' does not exist."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection lobby = LOBBIES[lobby_id] if str(event["game_type"]) != lobby["game_type"]: response = {"type" : "join_fail", "message" : f"Mismatched game types. Expected {LOBBIES[lobby_id]['game_type']}, received {event['game_type']}."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection if len(lobby["players"]) >= lobby["max_players"]: response = {"type" : "join_fail", "message" : "Lobby full."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection if lobby["locked"] and ("rejoin_key" not in event): response = {"type" : "join_fail", "message" : "Lobby is locked, no new connections are being accepted."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection if lobby["private"] and (event["password"] != lobby["password"]) and ("rejoin_key" not in event): response = {"type" : "join_fail", "message" : "Incorrect password for private lobby."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection if "rejoin_key" in event: for player_id in lobby["players"]: player = lobby["players"][player_id] if player["rejoin_key"] == event["rejoin_key"] and (player["connection_status"] == 0): player["socket"] = websocket player["connection_status"] = 1 response = { "type" : "rejoin_ok" } await websocket.send(json.dumps(response)) print(f"Player ({player_id}) reconnected successfully.") player_info = await get_player_info(lobby) notification = {"type" : "announcement_player_rejoin", "current_players" : player_info} await broadcast_active(lobby, notification) try: if await lobby_loop( websocket, LOBBIES[ lobby_id ], player ): return except websockets.exceptions.ConnectionClosed: print(f"{player["player_id"]} lost connection suddenly!") await connection_lost( LOBBIES[lobby_id], player["player_id"] ) return print(f"{player["player_id"]} lost connection.") await connection_lost( LOBBIES[lobby_id], player["player_id"] ) return # end of socket # Could not rejoin response = {"type" : "rejoin_fail", "message" : "Could not rejoin with given key."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # end of socket # args are optional, thus they are not required to be specified new_player = await create_player( websocket, event["username"], is_host=False, waiting_args= ( event["args"] if "args" in event else {} ) ) lobby["players"][new_player["player_id"]] = new_player # add player to lobby await send_join_request_to_host(lobby, new_player["player_id"]) # request join confirmation from host player_info = await get_player_info(lobby) notification = {"type" : "announcement_player_joining", "current_players" : player_info} await broadcast_active(lobby, notification) response = { "type" : "join_request_ok", "player_id" : new_player["player_id"], "rejoin_key" : new_player["rejoin_key"], "current_players" : player_info, "max_players" : lobby["max_players"], "lobby_name" : lobby["lobby_name"], "private" : lobby["private"]} await websocket.send(json.dumps(response)) try: if await lobby_loop( websocket, LOBBIES[ lobby_id ], new_player ): return except websockets.exceptions.ConnectionClosed: print(f"{new_player["player_id"]} lost connection suddenly!") await connection_lost( LOBBIES[lobby_id], new_player["player_id"] ) return print(f"{new_player["player_id"]} lost connection.") await connection_lost( LOBBIES[lobby_id], new_player["player_id"] ) # host a new lobby async def host_lobby(websocket, event): for reqd_i, required_field in enumerate(HOST_LOBBY_REQD_FIELDS): if required_field not in event: response = {"type" : "host_fail", "message" : f"Missing required field: '{required_field}'."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection try: HOST_LOBBY_REQD_FIELD_TYPES[reqd_i](event[required_field]) except: response = { "type" : "host_fail", "message" : f"Invalid type for field '{required_field}'." } await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return # close connection host_player = await create_player(websocket, event["username"], is_host=True) lobby_id = await generate_lobby_id() new_lobby = { "lobby_id" : lobby_id, "lobby_name" : event["lobby_name"], "game_type" : event["game_type"], "private" : event["private"], "password" : event["password"], "lobby_state" : "LOBBY", "max_players" : int(event["max_players"]), "host" : host_player, # accessible in both 'players' and its own property "players" : { host_player["player_id"] : host_player }, "locked" : False, "paused" : False # paused games have a longer grace period before players with interrupted connections are kicked } print(f"Created {'private ' if event["private"] else ''}lobby: {event['lobby_name']} ({lobby_id})") LOBBIES[ lobby_id ] = new_lobby response = { "type" : "host_ok", "lobby_id" : lobby_id, "player_id" : host_player["player_id"], "rejoin_key" : host_player["rejoin_key"] } await websocket.send(json.dumps(response)) try: if await lobby_loop( websocket, LOBBIES[ lobby_id ], host_player ): return except websockets.exceptions.ConnectionClosed: print(f"{host_player["player_id"]} lost connection suddenly!") await connection_lost( LOBBIES[lobby_id], host_player["player_id"] ) return print(f"{host_player["player_id"]} lost connection.") await connection_lost( LOBBIES[lobby_id], host_player["player_id"] ) # handle incoming connections async def new_connection_handler(websocket): message = await websocket.recv() event = json.loads(message) if "type" not in event: response = {"type" : "gc_error", "message" : "GC ERROR: No message type specified."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) return if event["type"] == "list_open_lobbies": print("List of games requested.") await send_lobby_list(websocket) elif event["type"] == "host_lobby": print("Lobby creation requested.") await host_lobby(websocket, event) elif event["type"] == "join_lobby": print("Lobby join requested.") await join_lobby(websocket, event) else: response = {"type" : "gc_error", "message" : "GC ERROR: Invalid message type received."} await websocket.send(json.dumps(response)) await asyncio.sleep(SOCKET_GRACE_PERIOD) # start server async def main(ip, port): async with serve(new_connection_handler, ip, port) as server: print(f"Server starting at {ip}:{port}.") await server.serve_forever() # customize server from CLI arguments if __name__ == "__main__": IP = DEFAULT_IP PORT = DEFAULT_PORT args = sys.argv[1:] for arg_i in range(len(args)): if args[arg_i] == "-ip" and arg_i < (len(args)-1): arg_i += 1 IP = args[arg_i] if args[arg_i] == "-port" and arg_i < (len(args)-1): arg_i += 1 try: PORT = int(args[arg_i]) except: print("Port must be an integer.") quit() asyncio.run(main(IP, PORT))