Implement leaderboard with PostgreSQL persistence

Features:
- Player database with ELO tracking
- Game recording
- Leaderboard view (top 10 players)
- ELO calculation after each game
- Player stats (wins/losses/draws/winrate)

Database schema:
- players: id, username, elo, games_played, wins, losses, draws
- games: id, players, result, moves, elo_changes, duration

Menu now shows option 4 for Leaderboard
This commit is contained in:
Greg Hendrickson
2026-01-27 21:24:30 +00:00
parent 4506238b2b
commit 6b6626e2bc
4 changed files with 557 additions and 1 deletions

View File

@@ -0,0 +1,6 @@
"""Database module for ShellMate."""
from shellmate.db.database import Database
from shellmate.db.models import GameRecord, PlayerRecord
__all__ = ["Database", "PlayerRecord", "GameRecord"]

View File

@@ -0,0 +1,302 @@
"""Database connection and queries for ShellMate."""
import logging
import os
from typing import Optional
import asyncpg
from shellmate.db.models import GameRecord, LeaderboardEntry, PlayerRecord
logger = logging.getLogger(__name__)
# SQL Schema
SCHEMA = """
CREATE TABLE IF NOT EXISTS players (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
elo INTEGER DEFAULT 1200,
games_played INTEGER DEFAULT 0,
wins INTEGER DEFAULT 0,
losses INTEGER DEFAULT 0,
draws INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP
);
CREATE TABLE IF NOT EXISTS games (
id SERIAL PRIMARY KEY,
white_player_id INTEGER REFERENCES players(id),
black_player_id INTEGER REFERENCES players(id),
result VARCHAR(10) NOT NULL,
moves TEXT,
elo_change_white INTEGER DEFAULT 0,
elo_change_black INTEGER DEFAULT 0,
played_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
duration_seconds INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_players_elo ON players(elo DESC);
CREATE INDEX IF NOT EXISTS idx_players_username ON players(username);
CREATE INDEX IF NOT EXISTS idx_games_played_at ON games(played_at DESC);
"""
class Database:
"""Database connection manager."""
_instance: Optional["Database"] = None
_pool: asyncpg.Pool | None = None
def __init__(self):
self.dsn = os.getenv(
"SHELLMATE_DATABASE_URL",
os.getenv(
"DATABASE_URL",
"postgresql://shellmate:shellmate@localhost:5432/shellmate"
)
)
@classmethod
async def get_instance(cls) -> "Database":
"""Get or create singleton database instance."""
if cls._instance is None:
cls._instance = cls()
await cls._instance.connect()
return cls._instance
async def connect(self) -> None:
"""Connect to database and initialize schema."""
try:
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=2,
max_size=10,
command_timeout=60,
)
async with self._pool.acquire() as conn:
await conn.execute(SCHEMA)
logger.info("Database connected and schema initialized")
except Exception as e:
logger.error(f"Database connection failed: {e}")
self._pool = None
async def close(self) -> None:
"""Close database connection."""
if self._pool:
await self._pool.close()
self._pool = None
@property
def connected(self) -> bool:
"""Check if database is connected."""
return self._pool is not None
# Player operations
async def get_or_create_player(self, username: str) -> PlayerRecord | None:
"""Get existing player or create new one."""
if not self._pool:
return None
async with self._pool.acquire() as conn:
# Try to get existing
row = await conn.fetchrow(
"SELECT * FROM players WHERE username = $1",
username
)
if row:
return PlayerRecord.from_row(dict(row))
# Create new player
row = await conn.fetchrow(
"""
INSERT INTO players (username, elo, created_at, last_seen)
VALUES ($1, 1200, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
RETURNING *
""",
username
)
return PlayerRecord.from_row(dict(row)) if row else None
async def get_player(self, username: str) -> PlayerRecord | None:
"""Get player by username."""
if not self._pool:
return None
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM players WHERE username = $1",
username
)
return PlayerRecord.from_row(dict(row)) if row else None
async def update_player_stats(
self,
player_id: int,
elo_change: int,
won: bool,
draw: bool = False
) -> None:
"""Update player stats after a game."""
if not self._pool:
return
async with self._pool.acquire() as conn:
if draw:
await conn.execute(
"""
UPDATE players SET
elo = elo + $2,
games_played = games_played + 1,
draws = draws + 1,
last_seen = CURRENT_TIMESTAMP
WHERE id = $1
""",
player_id, elo_change
)
elif won:
await conn.execute(
"""
UPDATE players SET
elo = elo + $2,
games_played = games_played + 1,
wins = wins + 1,
last_seen = CURRENT_TIMESTAMP
WHERE id = $1
""",
player_id, elo_change
)
else:
await conn.execute(
"""
UPDATE players SET
elo = elo + $2,
games_played = games_played + 1,
losses = losses + 1,
last_seen = CURRENT_TIMESTAMP
WHERE id = $1
""",
player_id, elo_change
)
# Leaderboard
async def get_leaderboard(self, limit: int = 10) -> list[LeaderboardEntry]:
"""Get top players by ELO."""
if not self._pool:
return []
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT username, elo, games_played, wins, losses, draws
FROM players
WHERE games_played > 0
ORDER BY elo DESC
LIMIT $1
""",
limit
)
entries = []
for rank, row in enumerate(rows, 1):
games = row["games_played"]
winrate = (row["wins"] / games * 100) if games > 0 else 0
entries.append(LeaderboardEntry(
rank=rank,
username=row["username"],
elo=row["elo"],
games_played=games,
wins=row["wins"],
losses=row["losses"],
winrate=round(winrate, 1),
))
return entries
async def get_player_rank(self, username: str) -> int | None:
"""Get player's rank on leaderboard."""
if not self._pool:
return None
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT COUNT(*) + 1 as rank
FROM players p1
WHERE p1.elo > (SELECT elo FROM players WHERE username = $1)
AND p1.games_played > 0
""",
username
)
return row["rank"] if row else None
# Game recording
async def record_game(
self,
white_id: int,
black_id: int | None,
result: str,
moves: str = "",
elo_change_white: int = 0,
elo_change_black: int = 0,
duration_seconds: int = 0
) -> int | None:
"""Record a completed game."""
if not self._pool:
return None
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO games (
white_player_id, black_player_id, result, moves,
elo_change_white, elo_change_black, duration_seconds
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id
""",
white_id, black_id, result, moves,
elo_change_white, elo_change_black, duration_seconds
)
return row["id"] if row else None
async def get_player_games(
self, player_id: int, limit: int = 10
) -> list[GameRecord]:
"""Get recent games for a player."""
if not self._pool:
return []
async with self._pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT * FROM games
WHERE white_player_id = $1 OR black_player_id = $1
ORDER BY played_at DESC
LIMIT $2
""",
player_id, limit
)
return [GameRecord.from_row(dict(row)) for row in rows]
# Stats
async def get_total_players(self) -> int:
"""Get total number of players."""
if not self._pool:
return 0
async with self._pool.acquire() as conn:
row = await conn.fetchrow("SELECT COUNT(*) as count FROM players")
return row["count"] if row else 0
async def get_total_games(self) -> int:
"""Get total number of games played."""
if not self._pool:
return 0
async with self._pool.acquire() as conn:
row = await conn.fetchrow("SELECT COUNT(*) as count FROM games")
return row["count"] if row else 0

View File

@@ -0,0 +1,84 @@
"""Database models for ShellMate."""
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PlayerRecord:
"""Player record from database."""
id: int
username: str
elo: int
games_played: int
wins: int
losses: int
draws: int
created_at: datetime
last_seen: datetime | None
@property
def winrate(self) -> float:
"""Calculate win rate percentage."""
if self.games_played == 0:
return 0.0
return (self.wins / self.games_played) * 100
@classmethod
def from_row(cls, row: dict) -> "PlayerRecord":
"""Create from database row."""
return cls(
id=row["id"],
username=row["username"],
elo=row["elo"],
games_played=row["games_played"],
wins=row["wins"],
losses=row["losses"],
draws=row["draws"],
created_at=row["created_at"],
last_seen=row.get("last_seen"),
)
@dataclass
class GameRecord:
"""Game record from database."""
id: int
white_player_id: int
black_player_id: int | None # None for AI
result: str # 'white', 'black', 'draw'
moves: str # PGN or UCI moves
elo_change_white: int
elo_change_black: int
played_at: datetime
duration_seconds: int
@classmethod
def from_row(cls, row: dict) -> "GameRecord":
"""Create from database row."""
return cls(
id=row["id"],
white_player_id=row["white_player_id"],
black_player_id=row.get("black_player_id"),
result=row["result"],
moves=row.get("moves", ""),
elo_change_white=row.get("elo_change_white", 0),
elo_change_black=row.get("elo_change_black", 0),
played_at=row["played_at"],
duration_seconds=row.get("duration_seconds", 0),
)
@dataclass
class LeaderboardEntry:
"""Leaderboard entry for display."""
rank: int
username: str
elo: int
games_played: int
wins: int
losses: int
winrate: float

View File

@@ -187,6 +187,7 @@ async def run_simple_menu(process, session: TerminalSession, username: str, mode
menu_table.add_row("[bright_white on blue] 1 [/] Play vs AI [dim]♔ vs ♚[/]") menu_table.add_row("[bright_white on blue] 1 [/] Play vs AI [dim]♔ vs ♚[/]")
menu_table.add_row("[bright_white on magenta] 2 [/] Play vs Human [dim]♔ vs ♔[/]") menu_table.add_row("[bright_white on magenta] 2 [/] Play vs Human [dim]♔ vs ♔[/]")
menu_table.add_row("[bright_white on green] 3 [/] Learn & Practice [dim]📖[/]") menu_table.add_row("[bright_white on green] 3 [/] Learn & Practice [dim]📖[/]")
menu_table.add_row("[bright_white on yellow] 4 [/] Leaderboard [dim]🏆[/]")
menu_table.add_row("[bright_white on red] q [/] Quit [dim]👋[/]") menu_table.add_row("[bright_white on red] q [/] Quit [dim]👋[/]")
menu_table.add_row("") menu_table.add_row("")
menu_table.add_row(Text("Press a key to select...", style="dim italic")) menu_table.add_row(Text("Press a key to select...", style="dim italic"))
@@ -237,6 +238,9 @@ async def run_simple_menu(process, session: TerminalSession, username: str, mode
session.write("\r\n\033[33mTutorials coming soon! Try playing vs AI.\033[0m\r\n") session.write("\r\n\033[33mTutorials coming soon! Try playing vs AI.\033[0m\r\n")
await asyncio.sleep(2) await asyncio.sleep(2)
render_menu() render_menu()
elif char == '4':
await show_leaderboard(process, session, username)
render_menu()
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception as e: except Exception as e:
@@ -244,6 +248,113 @@ async def run_simple_menu(process, session: TerminalSession, username: str, mode
continue # Don't break on errors, try to continue continue # Don't break on errors, try to continue
async def show_leaderboard(process, session: TerminalSession, username: str) -> None:
"""Display the leaderboard."""
from rich.align import Align
from rich.box import ROUNDED
from rich.console import Console
from rich.table import Table
from rich.text import Text
class ProcessWriter:
def __init__(self, sess):
self._session = sess
def write(self, data):
self._session.write(data)
def flush(self):
pass
session.clear()
writer = ProcessWriter(session)
console = Console(
file=writer, width=session.width, height=session.height,
force_terminal=True, color_system="truecolor"
)
# Try to get leaderboard from database
try:
from shellmate.db import Database
db = await Database.get_instance()
entries = await db.get_leaderboard(10)
player_rank = await db.get_player_rank(username)
total_players = await db.get_total_players()
total_games = await db.get_total_games()
except Exception as e:
logger.warning(f"Database not available: {e}")
entries = []
player_rank = None
total_players = 0
total_games = 0
console.print()
console.print(Align.center(Text("🏆 LEADERBOARD 🏆", style="bold yellow")))
console.print(Align.center(Text("" * 30, style="yellow")))
console.print()
if entries:
# Create leaderboard table
table = Table(
show_header=True,
header_style="bold cyan",
box=ROUNDED,
border_style="dim",
)
table.add_column("#", justify="right", style="dim", width=4)
table.add_column("Player", justify="left", width=15)
table.add_column("ELO", justify="right", style="green", width=6)
table.add_column("W/L/D", justify="center", width=10)
table.add_column("Win%", justify="right", width=6)
for entry in entries:
rank_style = ""
if entry.rank == 1:
rank_style = "bold yellow"
elif entry.rank == 2:
rank_style = "bold white"
elif entry.rank == 3:
rank_style = "bold red"
is_you = entry.username == username
name = f"{entry.username}" if is_you else entry.username
name_style = "bold cyan" if is_you else ""
table.add_row(
Text(str(entry.rank), style=rank_style),
Text(name, style=name_style),
str(entry.elo),
f"{entry.wins}/{entry.losses}/{entry.games_played - entry.wins - entry.losses}",
f"{entry.winrate:.0f}%",
)
console.print(Align.center(table))
console.print()
# Stats
if player_rank:
console.print(Align.center(
Text(f"Your rank: #{player_rank} of {total_players} players", style="cyan")
))
console.print(Align.center(
Text(f"Total games played: {total_games}", style="dim")
))
else:
console.print(Align.center(Text("No games played yet!", style="dim")))
console.print()
console.print(Align.center(Text("Be the first on the leaderboard!", style="cyan")))
console.print(Align.center(Text("Play a game to get ranked.", style="dim")))
console.print()
console.print(Align.center(Text("Press any key to return...", style="dim italic")))
# Wait for keypress
try:
await process.stdin.read(1)
except Exception:
pass
async def run_chess_game(process, session: TerminalSession, username: str, opponent: str) -> None: async def run_chess_game(process, session: TerminalSession, username: str, opponent: str) -> None:
"""Run a beautiful chess game session with Stockfish AI.""" """Run a beautiful chess game session with Stockfish AI."""
import chess import chess
@@ -551,6 +662,8 @@ async def run_chess_game(process, session: TerminalSession, username: str, oppon
continue continue
if board.is_game_over(): if board.is_game_over():
from rich.text import Text
session.clear() session.clear()
writer = ProcessWriter(session) writer = ProcessWriter(session)
console = Console( console = Console(
@@ -558,29 +671,80 @@ async def run_chess_game(process, session: TerminalSession, username: str, oppon
force_terminal=True force_terminal=True
) )
# Determine result
player_won = False
is_draw = False
result_text = ""
console.print() console.print()
if board.is_checkmate(): if board.is_checkmate():
winner = "Black ♚" if board.turn == chess.WHITE else "White ♔" winner = "Black ♚" if board.turn == chess.WHITE else "White ♔"
player_won = board.turn == chess.BLACK # Player is white vs AI
console.print(Align.center(Panel( console.print(Align.center(Panel(
f"[bold green]🏆 CHECKMATE! 🏆\n\n{winner} wins![/bold green]", f"[bold green]🏆 CHECKMATE! 🏆\n\n{winner} wins![/bold green]",
box=ROUNDED, box=ROUNDED,
border_style="green", border_style="green",
width=40 width=40
))) )))
result_text = "white" if board.turn == chess.BLACK else "black"
elif board.is_stalemate(): elif board.is_stalemate():
is_draw = True
console.print(Align.center(Panel( console.print(Align.center(Panel(
"[yellow]Stalemate!\n\nThe game is a draw.[/yellow]", "[yellow]Stalemate!\n\nThe game is a draw.[/yellow]",
box=ROUNDED, box=ROUNDED,
border_style="yellow", border_style="yellow",
width=40 width=40
))) )))
result_text = "draw"
else: else:
is_draw = True
console.print(Align.center(Panel( console.print(Align.center(Panel(
"[yellow]Game Over\n\nDraw by repetition or insufficient material.[/yellow]", "[yellow]Game Over\n\nDraw.[/yellow]",
box=ROUNDED, box=ROUNDED,
border_style="yellow", border_style="yellow",
width=40 width=40
))) )))
result_text = "draw"
# Record to database
try:
from shellmate.db import Database
db = await Database.get_instance()
if db.connected:
player = await db.get_or_create_player(username)
if player:
# Calculate ELO change (vs AI rated 1500)
ai_elo = 1500
if is_draw:
result_val = 0.5
elif player_won:
result_val = 1.0
else:
result_val = 0.0
expected = 1 / (1 + 10 ** ((ai_elo - player.elo) / 400))
elo_change = int(32 * (result_val - expected))
await db.update_player_stats(
player.id, elo_change, player_won, is_draw
)
await db.record_game(
white_player_id=player.id,
black_player_id=None, # AI
result=result_text,
moves=" ".join(move_history),
elo_change_white=elo_change,
)
# Show ELO change
elo_text = f"+{elo_change}" if elo_change >= 0 else str(elo_change)
new_elo = player.elo + elo_change
console.print()
console.print(Align.center(
Text(f"ELO: {new_elo} ({elo_text})", style="cyan")
))
except Exception as e:
logger.warning(f"Could not record game: {e}")
await asyncio.sleep(3) await asyncio.sleep(3)