From 6b6626e2bc51447c846928d166213907f85ff162 Mon Sep 17 00:00:00 2001 From: Greg Hendrickson Date: Tue, 27 Jan 2026 21:24:30 +0000 Subject: [PATCH] Implement leaderboard with PostgreSQL persistence Features: - Player database with ELO tracking - Game recording - Leaderboard view (top 10 players) - ELO calculation after each game - Player stats (wins/losses/draws/winrate) Database schema: - players: id, username, elo, games_played, wins, losses, draws - games: id, players, result, moves, elo_changes, duration Menu now shows option 4 for Leaderboard --- src/shellmate/db/__init__.py | 6 + src/shellmate/db/database.py | 302 +++++++++++++++++++++++++++++++++++ src/shellmate/db/models.py | 84 ++++++++++ src/shellmate/ssh/server.py | 166 ++++++++++++++++++- 4 files changed, 557 insertions(+), 1 deletion(-) create mode 100644 src/shellmate/db/__init__.py create mode 100644 src/shellmate/db/database.py create mode 100644 src/shellmate/db/models.py diff --git a/src/shellmate/db/__init__.py b/src/shellmate/db/__init__.py new file mode 100644 index 0000000..bc80a09 --- /dev/null +++ b/src/shellmate/db/__init__.py @@ -0,0 +1,6 @@ +"""Database module for ShellMate.""" + +from shellmate.db.database import Database +from shellmate.db.models import GameRecord, PlayerRecord + +__all__ = ["Database", "PlayerRecord", "GameRecord"] diff --git a/src/shellmate/db/database.py b/src/shellmate/db/database.py new file mode 100644 index 0000000..ce04c96 --- /dev/null +++ b/src/shellmate/db/database.py @@ -0,0 +1,302 @@ +"""Database connection and queries for ShellMate.""" + +import logging +import os +from typing import Optional + +import asyncpg + +from shellmate.db.models import GameRecord, LeaderboardEntry, PlayerRecord + +logger = logging.getLogger(__name__) + +# SQL Schema +SCHEMA = """ +CREATE TABLE IF NOT EXISTS players ( + id SERIAL PRIMARY KEY, + username VARCHAR(50) UNIQUE NOT NULL, + elo INTEGER DEFAULT 1200, + games_played INTEGER DEFAULT 0, + wins INTEGER DEFAULT 0, + losses INTEGER DEFAULT 0, + draws INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_seen TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS games ( + id SERIAL PRIMARY KEY, + white_player_id INTEGER REFERENCES players(id), + black_player_id INTEGER REFERENCES players(id), + result VARCHAR(10) NOT NULL, + moves TEXT, + elo_change_white INTEGER DEFAULT 0, + elo_change_black INTEGER DEFAULT 0, + played_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + duration_seconds INTEGER DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_players_elo ON players(elo DESC); +CREATE INDEX IF NOT EXISTS idx_players_username ON players(username); +CREATE INDEX IF NOT EXISTS idx_games_played_at ON games(played_at DESC); +""" + + +class Database: + """Database connection manager.""" + + _instance: Optional["Database"] = None + _pool: asyncpg.Pool | None = None + + def __init__(self): + self.dsn = os.getenv( + "SHELLMATE_DATABASE_URL", + os.getenv( + "DATABASE_URL", + "postgresql://shellmate:shellmate@localhost:5432/shellmate" + ) + ) + + @classmethod + async def get_instance(cls) -> "Database": + """Get or create singleton database instance.""" + if cls._instance is None: + cls._instance = cls() + await cls._instance.connect() + return cls._instance + + async def connect(self) -> None: + """Connect to database and initialize schema.""" + try: + self._pool = await asyncpg.create_pool( + self.dsn, + min_size=2, + max_size=10, + command_timeout=60, + ) + async with self._pool.acquire() as conn: + await conn.execute(SCHEMA) + logger.info("Database connected and schema initialized") + except Exception as e: + logger.error(f"Database connection failed: {e}") + self._pool = None + + async def close(self) -> None: + """Close database connection.""" + if self._pool: + await self._pool.close() + self._pool = None + + @property + def connected(self) -> bool: + """Check if database is connected.""" + return self._pool is not None + + # Player operations + + async def get_or_create_player(self, username: str) -> PlayerRecord | None: + """Get existing player or create new one.""" + if not self._pool: + return None + + async with self._pool.acquire() as conn: + # Try to get existing + row = await conn.fetchrow( + "SELECT * FROM players WHERE username = $1", + username + ) + if row: + return PlayerRecord.from_row(dict(row)) + + # Create new player + row = await conn.fetchrow( + """ + INSERT INTO players (username, elo, created_at, last_seen) + VALUES ($1, 1200, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + RETURNING * + """, + username + ) + return PlayerRecord.from_row(dict(row)) if row else None + + async def get_player(self, username: str) -> PlayerRecord | None: + """Get player by username.""" + if not self._pool: + return None + + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT * FROM players WHERE username = $1", + username + ) + return PlayerRecord.from_row(dict(row)) if row else None + + async def update_player_stats( + self, + player_id: int, + elo_change: int, + won: bool, + draw: bool = False + ) -> None: + """Update player stats after a game.""" + if not self._pool: + return + + async with self._pool.acquire() as conn: + if draw: + await conn.execute( + """ + UPDATE players SET + elo = elo + $2, + games_played = games_played + 1, + draws = draws + 1, + last_seen = CURRENT_TIMESTAMP + WHERE id = $1 + """, + player_id, elo_change + ) + elif won: + await conn.execute( + """ + UPDATE players SET + elo = elo + $2, + games_played = games_played + 1, + wins = wins + 1, + last_seen = CURRENT_TIMESTAMP + WHERE id = $1 + """, + player_id, elo_change + ) + else: + await conn.execute( + """ + UPDATE players SET + elo = elo + $2, + games_played = games_played + 1, + losses = losses + 1, + last_seen = CURRENT_TIMESTAMP + WHERE id = $1 + """, + player_id, elo_change + ) + + # Leaderboard + + async def get_leaderboard(self, limit: int = 10) -> list[LeaderboardEntry]: + """Get top players by ELO.""" + if not self._pool: + return [] + + async with self._pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT username, elo, games_played, wins, losses, draws + FROM players + WHERE games_played > 0 + ORDER BY elo DESC + LIMIT $1 + """, + limit + ) + + entries = [] + for rank, row in enumerate(rows, 1): + games = row["games_played"] + winrate = (row["wins"] / games * 100) if games > 0 else 0 + entries.append(LeaderboardEntry( + rank=rank, + username=row["username"], + elo=row["elo"], + games_played=games, + wins=row["wins"], + losses=row["losses"], + winrate=round(winrate, 1), + )) + return entries + + async def get_player_rank(self, username: str) -> int | None: + """Get player's rank on leaderboard.""" + if not self._pool: + return None + + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT COUNT(*) + 1 as rank + FROM players p1 + WHERE p1.elo > (SELECT elo FROM players WHERE username = $1) + AND p1.games_played > 0 + """, + username + ) + return row["rank"] if row else None + + # Game recording + + async def record_game( + self, + white_id: int, + black_id: int | None, + result: str, + moves: str = "", + elo_change_white: int = 0, + elo_change_black: int = 0, + duration_seconds: int = 0 + ) -> int | None: + """Record a completed game.""" + if not self._pool: + return None + + async with self._pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO games ( + white_player_id, black_player_id, result, moves, + elo_change_white, elo_change_black, duration_seconds + ) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id + """, + white_id, black_id, result, moves, + elo_change_white, elo_change_black, duration_seconds + ) + return row["id"] if row else None + + async def get_player_games( + self, player_id: int, limit: int = 10 + ) -> list[GameRecord]: + """Get recent games for a player.""" + if not self._pool: + return [] + + async with self._pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT * FROM games + WHERE white_player_id = $1 OR black_player_id = $1 + ORDER BY played_at DESC + LIMIT $2 + """, + player_id, limit + ) + return [GameRecord.from_row(dict(row)) for row in rows] + + # Stats + + async def get_total_players(self) -> int: + """Get total number of players.""" + if not self._pool: + return 0 + + async with self._pool.acquire() as conn: + row = await conn.fetchrow("SELECT COUNT(*) as count FROM players") + return row["count"] if row else 0 + + async def get_total_games(self) -> int: + """Get total number of games played.""" + if not self._pool: + return 0 + + async with self._pool.acquire() as conn: + row = await conn.fetchrow("SELECT COUNT(*) as count FROM games") + return row["count"] if row else 0 diff --git a/src/shellmate/db/models.py b/src/shellmate/db/models.py new file mode 100644 index 0000000..ba2bf37 --- /dev/null +++ b/src/shellmate/db/models.py @@ -0,0 +1,84 @@ +"""Database models for ShellMate.""" + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class PlayerRecord: + """Player record from database.""" + + id: int + username: str + elo: int + games_played: int + wins: int + losses: int + draws: int + created_at: datetime + last_seen: datetime | None + + @property + def winrate(self) -> float: + """Calculate win rate percentage.""" + if self.games_played == 0: + return 0.0 + return (self.wins / self.games_played) * 100 + + @classmethod + def from_row(cls, row: dict) -> "PlayerRecord": + """Create from database row.""" + return cls( + id=row["id"], + username=row["username"], + elo=row["elo"], + games_played=row["games_played"], + wins=row["wins"], + losses=row["losses"], + draws=row["draws"], + created_at=row["created_at"], + last_seen=row.get("last_seen"), + ) + + +@dataclass +class GameRecord: + """Game record from database.""" + + id: int + white_player_id: int + black_player_id: int | None # None for AI + result: str # 'white', 'black', 'draw' + moves: str # PGN or UCI moves + elo_change_white: int + elo_change_black: int + played_at: datetime + duration_seconds: int + + @classmethod + def from_row(cls, row: dict) -> "GameRecord": + """Create from database row.""" + return cls( + id=row["id"], + white_player_id=row["white_player_id"], + black_player_id=row.get("black_player_id"), + result=row["result"], + moves=row.get("moves", ""), + elo_change_white=row.get("elo_change_white", 0), + elo_change_black=row.get("elo_change_black", 0), + played_at=row["played_at"], + duration_seconds=row.get("duration_seconds", 0), + ) + + +@dataclass +class LeaderboardEntry: + """Leaderboard entry for display.""" + + rank: int + username: str + elo: int + games_played: int + wins: int + losses: int + winrate: float diff --git a/src/shellmate/ssh/server.py b/src/shellmate/ssh/server.py index 1b7e04f..78f01ce 100644 --- a/src/shellmate/ssh/server.py +++ b/src/shellmate/ssh/server.py @@ -187,6 +187,7 @@ async def run_simple_menu(process, session: TerminalSession, username: str, mode menu_table.add_row("[bright_white on blue] 1 [/] Play vs AI [dim]♔ vs ♚[/]") menu_table.add_row("[bright_white on magenta] 2 [/] Play vs Human [dim]♔ vs ♔[/]") menu_table.add_row("[bright_white on green] 3 [/] Learn & Practice [dim]📖[/]") + menu_table.add_row("[bright_white on yellow] 4 [/] Leaderboard [dim]🏆[/]") menu_table.add_row("[bright_white on red] q [/] Quit [dim]👋[/]") menu_table.add_row("") menu_table.add_row(Text("Press a key to select...", style="dim italic")) @@ -237,6 +238,9 @@ async def run_simple_menu(process, session: TerminalSession, username: str, mode session.write("\r\n\033[33mTutorials coming soon! Try playing vs AI.\033[0m\r\n") await asyncio.sleep(2) render_menu() + elif char == '4': + await show_leaderboard(process, session, username) + render_menu() except asyncio.CancelledError: break except Exception as e: @@ -244,6 +248,113 @@ async def run_simple_menu(process, session: TerminalSession, username: str, mode continue # Don't break on errors, try to continue +async def show_leaderboard(process, session: TerminalSession, username: str) -> None: + """Display the leaderboard.""" + from rich.align import Align + from rich.box import ROUNDED + from rich.console import Console + from rich.table import Table + from rich.text import Text + + class ProcessWriter: + def __init__(self, sess): + self._session = sess + + def write(self, data): + self._session.write(data) + + def flush(self): + pass + + session.clear() + writer = ProcessWriter(session) + console = Console( + file=writer, width=session.width, height=session.height, + force_terminal=True, color_system="truecolor" + ) + + # Try to get leaderboard from database + try: + from shellmate.db import Database + db = await Database.get_instance() + entries = await db.get_leaderboard(10) + player_rank = await db.get_player_rank(username) + total_players = await db.get_total_players() + total_games = await db.get_total_games() + except Exception as e: + logger.warning(f"Database not available: {e}") + entries = [] + player_rank = None + total_players = 0 + total_games = 0 + + console.print() + console.print(Align.center(Text("🏆 LEADERBOARD 🏆", style="bold yellow"))) + console.print(Align.center(Text("━" * 30, style="yellow"))) + console.print() + + if entries: + # Create leaderboard table + table = Table( + show_header=True, + header_style="bold cyan", + box=ROUNDED, + border_style="dim", + ) + table.add_column("#", justify="right", style="dim", width=4) + table.add_column("Player", justify="left", width=15) + table.add_column("ELO", justify="right", style="green", width=6) + table.add_column("W/L/D", justify="center", width=10) + table.add_column("Win%", justify="right", width=6) + + for entry in entries: + rank_style = "" + if entry.rank == 1: + rank_style = "bold yellow" + elif entry.rank == 2: + rank_style = "bold white" + elif entry.rank == 3: + rank_style = "bold red" + + is_you = entry.username == username + name = f"► {entry.username}" if is_you else entry.username + name_style = "bold cyan" if is_you else "" + + table.add_row( + Text(str(entry.rank), style=rank_style), + Text(name, style=name_style), + str(entry.elo), + f"{entry.wins}/{entry.losses}/{entry.games_played - entry.wins - entry.losses}", + f"{entry.winrate:.0f}%", + ) + + console.print(Align.center(table)) + console.print() + + # Stats + if player_rank: + console.print(Align.center( + Text(f"Your rank: #{player_rank} of {total_players} players", style="cyan") + )) + console.print(Align.center( + Text(f"Total games played: {total_games}", style="dim") + )) + else: + console.print(Align.center(Text("No games played yet!", style="dim"))) + console.print() + console.print(Align.center(Text("Be the first on the leaderboard!", style="cyan"))) + console.print(Align.center(Text("Play a game to get ranked.", style="dim"))) + + console.print() + console.print(Align.center(Text("Press any key to return...", style="dim italic"))) + + # Wait for keypress + try: + await process.stdin.read(1) + except Exception: + pass + + async def run_chess_game(process, session: TerminalSession, username: str, opponent: str) -> None: """Run a beautiful chess game session with Stockfish AI.""" import chess @@ -551,6 +662,8 @@ async def run_chess_game(process, session: TerminalSession, username: str, oppon continue if board.is_game_over(): + from rich.text import Text + session.clear() writer = ProcessWriter(session) console = Console( @@ -558,29 +671,80 @@ async def run_chess_game(process, session: TerminalSession, username: str, oppon force_terminal=True ) + # Determine result + player_won = False + is_draw = False + result_text = "" + console.print() if board.is_checkmate(): winner = "Black ♚" if board.turn == chess.WHITE else "White ♔" + player_won = board.turn == chess.BLACK # Player is white vs AI console.print(Align.center(Panel( f"[bold green]🏆 CHECKMATE! 🏆\n\n{winner} wins![/bold green]", box=ROUNDED, border_style="green", width=40 ))) + result_text = "white" if board.turn == chess.BLACK else "black" elif board.is_stalemate(): + is_draw = True console.print(Align.center(Panel( "[yellow]Stalemate!\n\nThe game is a draw.[/yellow]", box=ROUNDED, border_style="yellow", width=40 ))) + result_text = "draw" else: + is_draw = True console.print(Align.center(Panel( - "[yellow]Game Over\n\nDraw by repetition or insufficient material.[/yellow]", + "[yellow]Game Over\n\nDraw.[/yellow]", box=ROUNDED, border_style="yellow", width=40 ))) + result_text = "draw" + + # Record to database + try: + from shellmate.db import Database + db = await Database.get_instance() + if db.connected: + player = await db.get_or_create_player(username) + if player: + # Calculate ELO change (vs AI rated 1500) + ai_elo = 1500 + if is_draw: + result_val = 0.5 + elif player_won: + result_val = 1.0 + else: + result_val = 0.0 + + expected = 1 / (1 + 10 ** ((ai_elo - player.elo) / 400)) + elo_change = int(32 * (result_val - expected)) + + await db.update_player_stats( + player.id, elo_change, player_won, is_draw + ) + await db.record_game( + white_player_id=player.id, + black_player_id=None, # AI + result=result_text, + moves=" ".join(move_history), + elo_change_white=elo_change, + ) + + # Show ELO change + elo_text = f"+{elo_change}" if elo_change >= 0 else str(elo_change) + new_elo = player.elo + elo_change + console.print() + console.print(Align.center( + Text(f"ELO: {new_elo} ({elo_text})", style="cyan") + )) + except Exception as e: + logger.warning(f"Could not record game: {e}") await asyncio.sleep(3)