""" ClubLedger – Store Credit Web App Hard defaults live in CONFIG below; everything is overridable via the Admin UI. """ import sqlite3 import json import os import secrets from contextlib import contextmanager, asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Optional import bcrypt from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from fastapi import FastAPI, HTTPException, Cookie, Depends, Response, UploadFile, File from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, field_validator # --------------------------------------------------------------------------- # Hard defaults (overridden by app_settings table via Admin area) # --------------------------------------------------------------------------- def _server_timezone() -> str: """Detect the server's IANA timezone name for use as the default.""" try: p = Path('/etc/timezone') if p.exists(): return p.read_text().strip() p = Path('/etc/localtime') if p.is_symlink(): target = str(p.resolve()) if 'zoneinfo/' in target: return target.split('zoneinfo/', 1)[-1] except Exception: pass return 'UTC' CONFIG = { "club_name": "ClubLedger", "currency_symbol": "£", "currency_major": "pounds", "currency_minor": "pence", "currency_divisor": 100, "overdraft_policy": "never", "min_topup": 100, "max_topup": 100_000, "max_charge": 50_000, # Business contact "biz_address1": "", "biz_address2": "", "biz_address3": "", "biz_address4": "", "biz_country": "", "biz_phone": "", "biz_email": "", "biz_website": "", # Branding "logo_url": "", "logo_align": "left", "logo_max_width": 200, "logo_max_height": 80, "bar_name": "Bar", "cashier_name": "Cashier", # Transactions "txn_ref_prefix": "TXN", "transfer_types": "Bank Transfer,Cash,QR", # Receipt labels (localizable) "lbl_receipt": "RECEIPT", "lbl_topup_receipt": "TOP-UP RECEIPT", "lbl_withdrawal_receipt": "WITHDRAWAL RECEIPT", "lbl_staff": "STAFF", "lbl_transaction": "TRANSACTION", "lbl_charge_venue": "CHARGE", "lbl_txn_time": "TRANSACTION TIME", "lbl_amount_charged": "AMOUNT CHARGED", "lbl_remaining_balance": "REMAINING BALANCE", "lbl_balance_transfer": "BALANCE TRANSFER", "lbl_amount_topup": "AMOUNT TOPPED-UP", "lbl_amount_withdrawal": "AMOUNT WITHDRAWN", "lbl_transfer_type": "TRANSFER TYPE", "lbl_transfer_ref": "TRANSFER REFERENCE", # Receipt footers "receipt_footer": "", "receipt_footer_charge": "", "receipt_footer_cashier": "", # Timezone for display (IANA name); defaults to server local timezone "timezone": _server_timezone(), # Default paper size for receipts and statements "paper_size": "A4", } DB_PATH = "clubledger.db" STAFF_FILE = Path(__file__).parent / "staff.json" static_dir = Path(__file__).parent / "static" # In-memory sessions: token → {user_id, name, role, expires} _sessions: dict = {} # Cached settings merged from CONFIG + DB app_settings _settings: dict = {} # --------------------------------------------------------------------------- # Database # --------------------------------------------------------------------------- def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn @contextmanager def db_conn(): conn = get_db() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db(): with db_conn() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS members ( id INTEGER PRIMARY KEY AUTOINCREMENT, member_number TEXT UNIQUE NOT NULL, name TEXT NOT NULL, pin_hash TEXT NOT NULL, overdraft_override INTEGER DEFAULT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS ledger_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, member_id INTEGER NOT NULL REFERENCES members(id), amount INTEGER NOT NULL, type TEXT NOT NULL CHECK(type IN ('topup','charge','withdrawal')), venue TEXT NOT NULL CHECK(venue IN ('cashier','bar')), note TEXT, staff_name TEXT NOT NULL, transfer_type TEXT, transfer_ref TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, brand TEXT, price INTEGER NOT NULL, member_price INTEGER, search_tags TEXT, active INTEGER NOT NULL DEFAULT 1 ); CREATE TABLE IF NOT EXISTS staff_accounts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'pos-staff' CHECK(role IN ('cashier','pos-staff','admin')), active INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS app_settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_ledger_member ON ledger_entries(member_id); """) def migrate_db(): """Run schema migrations that can't be expressed as CREATE TABLE IF NOT EXISTS.""" with db_conn() as conn: # --- members: add overdraft_override column --- cols = [r[1] for r in conn.execute("PRAGMA table_info(members)").fetchall()] if "overdraft_override" not in cols: conn.execute("ALTER TABLE members ADD COLUMN overdraft_override INTEGER DEFAULT NULL") # --- staff_accounts: add cashier/pos-staff roles --- schema = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='staff_accounts'" ).fetchone() if schema and "'pos-staff'" not in schema["sql"]: conn.execute("ALTER TABLE staff_accounts RENAME TO _staff_accounts_old") conn.execute(""" CREATE TABLE staff_accounts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'pos-staff' CHECK(role IN ('cashier','pos-staff','admin')), active INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) conn.execute(""" INSERT INTO staff_accounts SELECT id, name, username, password_hash, CASE role WHEN 'staff' THEN 'pos-staff' ELSE role END, active, created_at FROM _staff_accounts_old """) conn.execute("DROP TABLE _staff_accounts_old") # --- ledger_entries: add withdrawal type --- le_schema = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name='ledger_entries'" ).fetchone() if le_schema and "'withdrawal'" not in le_schema["sql"]: conn.execute("ALTER TABLE ledger_entries RENAME TO _ledger_entries_old") conn.execute(""" CREATE TABLE ledger_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, member_id INTEGER NOT NULL REFERENCES members(id), amount INTEGER NOT NULL, type TEXT NOT NULL CHECK(type IN ('topup','charge','withdrawal')), venue TEXT NOT NULL CHECK(venue IN ('cashier','bar')), note TEXT, staff_name TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) conn.execute("INSERT INTO ledger_entries SELECT * FROM _ledger_entries_old") conn.execute("DROP TABLE _ledger_entries_old") conn.execute("CREATE INDEX IF NOT EXISTS idx_ledger_member ON ledger_entries(member_id)") # --- app_settings: rename allow_negative_balance → overdraft_policy --- row = conn.execute( "SELECT value FROM app_settings WHERE key='allow_negative_balance'" ).fetchone() if row is not None: old_val = json.loads(row[0]) conn.execute( "INSERT OR IGNORE INTO app_settings (key,value) VALUES (?,?)", ("overdraft_policy", json.dumps("always" if old_val else "never")) ) conn.execute("DELETE FROM app_settings WHERE key='allow_negative_balance'") # --- ledger_entries: add transfer_type and transfer_ref columns --- le_cols = [r[1] for r in conn.execute("PRAGMA table_info(ledger_entries)").fetchall()] if "transfer_type" not in le_cols: conn.execute("ALTER TABLE ledger_entries ADD COLUMN transfer_type TEXT") if "transfer_ref" not in le_cols: conn.execute("ALTER TABLE ledger_entries ADD COLUMN transfer_ref TEXT") def seed_admin(): with db_conn() as conn: if conn.execute("SELECT COUNT(*) FROM staff_accounts WHERE role='admin'").fetchone()[0] == 0: pw = bcrypt.hashpw(b"admin", bcrypt.gensalt()).decode() conn.execute( "INSERT INTO staff_accounts (name, username, password_hash, role) VALUES (?,?,?,?)", ("Administrator", "admin", pw, "admin") ) print("=" * 60) print(" Default admin created → username: admin password: admin") print(" Change this immediately in the Admin → Staff Accounts area.") print("=" * 60) def refresh_settings(): global _settings with db_conn() as conn: rows = conn.execute("SELECT key, value FROM app_settings").fetchall() overrides = {r["key"]: json.loads(r["value"]) for r in rows} _settings = {**CONFIG, **overrides} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def hash_pin(pin: str) -> str: return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode() def verify_pin(pin: str, hashed: str) -> bool: return bcrypt.checkpw(pin.encode(), hashed.encode()) def member_balance(conn, member_id: int) -> int: row = conn.execute(""" SELECT COALESCE(SUM(CASE WHEN type='topup' THEN amount ELSE -amount END),0) AS b FROM ledger_entries WHERE member_id=? """, (member_id,)).fetchone() return row["b"] if row else 0 def format_amount(pence: int) -> str: sym = _settings.get("currency_symbol") or CONFIG["currency_symbol"] div = _settings.get("currency_divisor") or CONFIG["currency_divisor"] return f"{sym}{pence / div:.2f}" def _display_tz(s: dict): """Return a ZoneInfo (or local tzinfo) for the configured display timezone.""" tz_name = (s.get("timezone") or "").strip() if tz_name: try: return ZoneInfo(tz_name) except (ZoneInfoNotFoundError, KeyError): pass return datetime.now().astimezone().tzinfo # server local def _fmt_dt(dt_str: str, s: dict) -> str: """Convert a stored UTC datetime string to the configured display timezone.""" try: dt_utc = datetime.fromisoformat(dt_str.replace(' ', 'T')).replace(tzinfo=timezone.utc) local = dt_utc.astimezone(_display_tz(s)) return local.strftime('%Y-%m-%d %H:%M') except Exception: return dt_str[:16] def _now_display(s: dict) -> str: """Current time formatted in the configured display timezone.""" try: local = datetime.now(timezone.utc).astimezone(_display_tz(s)) return local.strftime('%Y-%m-%d %H:%M') except Exception: return datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M') def load_staff() -> list: if STAFF_FILE.exists(): return json.loads(STAFF_FILE.read_text()).get("staff", []) return [] def save_staff(names: list): STAFF_FILE.write_text(json.dumps({"staff": sorted(set(names))}, indent=2)) # --------------------------------------------------------------------------- # Auth helpers # --------------------------------------------------------------------------- SESSION_TTL = 8 * 3600 # seconds def current_user(session: Optional[str] = Cookie(default=None)): if not session or session not in _sessions: raise HTTPException(401, "Not authenticated") s = _sessions[session] if datetime.now(timezone.utc).timestamp() > s["expires"]: del _sessions[session] raise HTTPException(401, "Session expired") return s def admin_user(user: dict = Depends(current_user)): if user["role"] != "admin": raise HTTPException(403, "Admin access required") def cashier_user(user: dict = Depends(current_user)): if user["role"] not in ("cashier", "admin"): raise HTTPException(403, "Cashier access required") return user def pos_user(user: dict = Depends(current_user)): if user["role"] not in ("pos-staff", "admin"): raise HTTPException(403, "POS staff access required") return user # --------------------------------------------------------------------------- # App # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app): init_db() migrate_db() seed_admin() refresh_settings() yield app = FastAPI(title="ClubLedger", lifespan=lifespan) static_dir.mkdir(exist_ok=True) app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") # --------------------------------------------------------------------------- # Pydantic models # --------------------------------------------------------------------------- class MemberCreate(BaseModel): member_number: str name: str pin: str @field_validator("pin") @classmethod def pin_length(cls, v): if len(v) < 4: raise ValueError("PIN must be at least 4 characters") return v @field_validator("member_number") @classmethod def member_number_nonempty(cls, v): v = v.strip() if not v: raise ValueError("member_number cannot be empty") return v class MemberUpdate(BaseModel): member_number: Optional[str] = None name: Optional[str] = None pin: Optional[str] = None overdraft_override: Optional[int] = None # NULL=default, 1=allow, 0=block class TopupRequest(BaseModel): member_id: int amount: int note: Optional[str] = None transfer_type: Optional[str] = None transfer_ref: Optional[str] = None class ChargeRequest(BaseModel): member_id: int amount: int pin: str note: Optional[str] = None class WithdrawalRequest(BaseModel): member_id: int amount: int pin: str note: Optional[str] = None transfer_type: Optional[str] = None transfer_ref: Optional[str] = None class ProductCreate(BaseModel): name: str brand: Optional[str] = None price: int member_price: Optional[int] = None search_tags: Optional[str] = None class StaffAdd(BaseModel): name: str class LoginRequest(BaseModel): username: str password: str class StaffAccountCreate(BaseModel): name: str username: str password: str role: str = "pos-staff" class StaffAccountUpdate(BaseModel): name: Optional[str] = None username: Optional[str] = None password: Optional[str] = None role: Optional[str] = None active: Optional[bool] = None class AppSettingsUpdate(BaseModel): club_name: Optional[str] = None currency_symbol: Optional[str] = None currency_major: Optional[str] = None currency_minor: Optional[str] = None currency_divisor: Optional[int] = None overdraft_policy: Optional[str] = None min_topup: Optional[int] = None max_topup: Optional[int] = None max_charge: Optional[int] = None # Business contact biz_address1: Optional[str] = None biz_address2: Optional[str] = None biz_address3: Optional[str] = None biz_address4: Optional[str] = None biz_country: Optional[str] = None biz_phone: Optional[str] = None biz_email: Optional[str] = None biz_website: Optional[str] = None # Branding logo_url: Optional[str] = None logo_align: Optional[str] = None logo_max_width: Optional[int] = None logo_max_height: Optional[int] = None bar_name: Optional[str] = None cashier_name: Optional[str] = None # Transactions txn_ref_prefix: Optional[str] = None transfer_types: Optional[str] = None # Receipt labels lbl_receipt: Optional[str] = None lbl_topup_receipt: Optional[str] = None lbl_withdrawal_receipt: Optional[str] = None lbl_staff: Optional[str] = None lbl_transaction: Optional[str] = None lbl_charge_venue: Optional[str] = None lbl_txn_time: Optional[str] = None lbl_amount_charged: Optional[str] = None lbl_remaining_balance: Optional[str] = None lbl_balance_transfer: Optional[str] = None lbl_amount_topup: Optional[str] = None lbl_amount_withdrawal: Optional[str] = None lbl_transfer_type: Optional[str] = None lbl_transfer_ref: Optional[str] = None # Receipt footers receipt_footer: Optional[str] = None receipt_footer_charge: Optional[str] = None receipt_footer_cashier: Optional[str] = None # Timezone timezone: Optional[str] = None # Default paper size paper_size: Optional[str] = None # --------------------------------------------------------------------------- # Page routes # --------------------------------------------------------------------------- @app.get("/", response_class=HTMLResponse) async def root(): return (static_dir / "index.html").read_text() @app.get("/cashier", response_class=HTMLResponse) async def cashier_page(): return (static_dir / "cashier.html").read_text() @app.get("/bar", response_class=HTMLResponse) async def bar_page(): return (static_dir / "bar.html").read_text() # --------------------------------------------------------------------------- # Auth endpoints # --------------------------------------------------------------------------- @app.post("/auth/login") def login(body: LoginRequest, response: Response): with db_conn() as conn: row = conn.execute( "SELECT * FROM staff_accounts WHERE username=? AND active=1", (body.username.strip(),) ).fetchone() if not row or not bcrypt.checkpw(body.password.encode(), row["password_hash"].encode()): raise HTTPException(401, "Invalid username or password") token = secrets.token_hex(32) _sessions[token] = { "user_id": row["id"], "name": row["name"], "role": row["role"], "expires": datetime.now(timezone.utc).timestamp() + SESSION_TTL, } response.set_cookie("session", token, httponly=True, max_age=SESSION_TTL, samesite="strict") return {"name": row["name"], "role": row["role"]} @app.post("/auth/logout") def logout(response: Response, session: Optional[str] = Cookie(default=None)): if session and session in _sessions: del _sessions[session] response.delete_cookie("session") return {"ok": True} @app.get("/auth/me") def auth_me(user: dict = Depends(current_user)): return {"name": user["name"], "role": user["role"]} # --------------------------------------------------------------------------- # Member endpoints # --------------------------------------------------------------------------- @app.post("/members") def create_member(body: MemberCreate, user: dict = Depends(current_user)): with db_conn() as conn: if conn.execute("SELECT id FROM members WHERE member_number=?", (body.member_number.strip(),)).fetchone(): raise HTTPException(400, "Member number already exists") cur = conn.execute( "INSERT INTO members (member_number, name, pin_hash) VALUES (?,?,?)", (body.member_number.strip(), body.name.strip(), hash_pin(body.pin)) ) mid = cur.lastrowid with db_conn() as conn: r = conn.execute("SELECT * FROM members WHERE id=?", (mid,)).fetchone() return {"id": r["id"], "member_number": r["member_number"], "name": r["name"], "created_at": r["created_at"]} @app.put("/members/{member_id}") def update_member(member_id: int, body: MemberUpdate, user: dict = Depends(current_user)): with db_conn() as conn: if not conn.execute("SELECT id FROM members WHERE id=?", (member_id,)).fetchone(): raise HTTPException(404, "Member not found") updates = {} if body.name is not None: n = body.name.strip() if not n: raise HTTPException(400, "Name cannot be empty") updates["name"] = n if body.member_number is not None: mn = body.member_number.strip() if not mn: raise HTTPException(400, "Member number cannot be empty") if conn.execute("SELECT id FROM members WHERE member_number=? AND id!=?", (mn, member_id)).fetchone(): raise HTTPException(400, "Member number already in use") updates["member_number"] = mn if body.pin is not None: if len(body.pin) < 4: raise HTTPException(400, "PIN must be at least 4 characters") updates["pin_hash"] = hash_pin(body.pin) if "overdraft_override" in body.model_fields_set: updates["overdraft_override"] = body.overdraft_override # None, 0, or 1 if updates: conn.execute( f"UPDATE members SET {', '.join(f'{k}=?' for k in updates)} WHERE id=?", list(updates.values()) + [member_id] ) r = conn.execute("SELECT * FROM members WHERE id=?", (member_id,)).fetchone() return {"id": r["id"], "member_number": r["member_number"], "name": r["name"]} @app.delete("/members/{member_id}") def delete_member(member_id: int, user: dict = Depends(current_user)): with db_conn() as conn: if not conn.execute("SELECT id FROM members WHERE id=?", (member_id,)).fetchone(): raise HTTPException(404, "Member not found") bal = member_balance(conn, member_id) if bal != 0: raise HTTPException(400, f"Cannot delete: balance is {format_amount(bal)}") conn.execute("DELETE FROM ledger_entries WHERE member_id=?", (member_id,)) conn.execute("DELETE FROM members WHERE id=?", (member_id,)) return {"ok": True} @app.get("/members") def list_members(q: Optional[str] = None, user: dict = Depends(current_user)): with db_conn() as conn: if q: pat = f"%{q}%" rows = conn.execute( "SELECT * FROM members WHERE name LIKE ? OR member_number LIKE ? ORDER BY name", (pat, pat) ).fetchall() else: rows = conn.execute("SELECT * FROM members ORDER BY name").fetchall() result = [] for r in rows: bal = member_balance(conn, r["id"]) result.append({ "id": r["id"], "member_number": r["member_number"], "name": r["name"], "overdraft_override": r["overdraft_override"], "balance": bal, "balance_display": format_amount(bal), "created_at": r["created_at"], }) return result @app.post("/topup") def topup(body: TopupRequest, user: dict = Depends(cashier_user)): s = _settings if body.amount < s["min_topup"]: raise HTTPException(400, f"Minimum top-up is {format_amount(s['min_topup'])}") if body.amount > s["max_topup"]: raise HTTPException(400, f"Maximum top-up is {format_amount(s['max_topup'])}") with db_conn() as conn: if not conn.execute("SELECT id FROM members WHERE id=?", (body.member_id,)).fetchone(): raise HTTPException(404, "Member not found") cur = conn.execute( "INSERT INTO ledger_entries (member_id,amount,type,venue,note,staff_name,transfer_type,transfer_ref) VALUES (?,?,?,?,?,?,?,?)", (body.member_id, body.amount, "topup", "cashier", body.note, user["name"], body.transfer_type, body.transfer_ref) ) eid = cur.lastrowid bal = member_balance(conn, body.member_id) return {"ok": True, "entry_id": eid, "new_balance": bal, "new_balance_display": format_amount(bal)} @app.post("/charge") def charge(body: ChargeRequest, user: dict = Depends(pos_user)): s = _settings if body.amount <= 0: raise HTTPException(400, "Amount must be positive") if body.amount > s["max_charge"]: raise HTTPException(400, f"Maximum single charge is {format_amount(s['max_charge'])}") with db_conn() as conn: member = conn.execute("SELECT * FROM members WHERE id=?", (body.member_id,)).fetchone() if not member: raise HTTPException(404, "Member not found") if not verify_pin(body.pin, member["pin_hash"]): raise HTTPException(403, "Incorrect PIN") bal = member_balance(conn, body.member_id) policy = s.get("overdraft_policy", "never") member_ov = member["overdraft_override"] # None, 0, or 1 if policy == "never": overdraft_ok = False elif policy == "always": overdraft_ok = True elif policy in ("staff_override", "admin_override"): overdraft_ok = (member_ov == 1) elif policy == "staff_block": overdraft_ok = (member_ov != 0) # None or 1 = allowed; 0 = explicitly blocked else: overdraft_ok = False if not overdraft_ok and bal < body.amount: raise HTTPException(400, f"Insufficient balance ({format_amount(bal)})") cur = conn.execute( "INSERT INTO ledger_entries (member_id,amount,type,venue,note,staff_name) VALUES (?,?,?,?,?,?)", (body.member_id, body.amount, "charge", "bar", body.note, user["name"]) ) eid = cur.lastrowid new_bal = member_balance(conn, body.member_id) return {"ok": True, "entry_id": eid, "new_balance": new_bal, "new_balance_display": format_amount(new_bal)} @app.post("/withdrawal") def withdrawal(body: WithdrawalRequest, user: dict = Depends(cashier_user)): if body.amount <= 0: raise HTTPException(400, "Amount must be positive") with db_conn() as conn: member = conn.execute("SELECT * FROM members WHERE id=?", (body.member_id,)).fetchone() if not member: raise HTTPException(404, "Member not found") if not verify_pin(body.pin, member["pin_hash"]): raise HTTPException(403, "Incorrect PIN") bal = member_balance(conn, body.member_id) if bal < body.amount: raise HTTPException(400, f"Insufficient balance ({format_amount(bal)})") cur = conn.execute( "INSERT INTO ledger_entries (member_id,amount,type,venue,note,staff_name,transfer_type,transfer_ref) VALUES (?,?,?,?,?,?,?,?)", (body.member_id, body.amount, "withdrawal", "cashier", body.note, user["name"], body.transfer_type, body.transfer_ref) ) eid = cur.lastrowid new_bal = member_balance(conn, body.member_id) return {"ok": True, "entry_id": eid, "new_balance": new_bal, "new_balance_display": format_amount(new_bal)} # --------------------------------------------------------------------------- # Cashier stats # --------------------------------------------------------------------------- def _period_bounds(period: str, s: dict, from_date: Optional[str] = None, to_date: Optional[str] = None): """Return (start_utc_str, end_utc_str), or None for 'all' (no filter).""" from datetime import timedelta, date as date_type if period == "all": return None tz = _display_tz(s) now = datetime.now(timezone.utc).astimezone(tz) today = now.date() def _local(d: date_type): return datetime(d.year, d.month, d.day, tzinfo=tz) def _next_month(y: int, m: int): return (y, m % 12 + 1) if m < 12 else (y + 1, 1) def _prev_month_start(d: date_type) -> date_type: first = d.replace(day=1) return (first - timedelta(days=1)).replace(day=1) if period == "today": start = _local(today); end = start + timedelta(days=1) elif period == "week": start = _local(today - timedelta(days=today.weekday())) end = start + timedelta(days=7) elif period == "last_week": mon_this = today - timedelta(days=today.weekday()) start = _local(mon_this - timedelta(days=7)); end = _local(mon_this) elif period == "month": start = _local(today.replace(day=1)) ny, nm = _next_month(today.year, today.month) end = _local(date_type(ny, nm, 1)) elif period == "last_month": lm_start = _prev_month_start(today) lm_end = today.replace(day=1) start = _local(lm_start); end = _local(lm_end) elif period == "quarter": qm = ((today.month - 1) // 3) * 3 + 1 start = _local(date_type(today.year, qm, 1)) ey, em = _next_month(today.year, qm + 2) end = _local(date_type(ey, em, 1)) elif period == "last_quarter": qm = ((today.month - 1) // 3) * 3 + 1 # start of this quarter lqm = qm - 3 lqy = today.year if lqm > 0 else today.year - 1 lqm = lqm if lqm > 0 else lqm + 12 start = _local(date_type(lqy, lqm, 1)) end = _local(date_type(today.year, qm, 1)) elif period == "year": start = _local(date_type(today.year, 1, 1)) end = _local(date_type(today.year + 1, 1, 1)) elif period == "last_year": start = _local(date_type(today.year - 1, 1, 1)) end = _local(date_type(today.year, 1, 1)) elif period == "custom" and from_date and to_date: try: fd = date_type.fromisoformat(from_date) td = date_type.fromisoformat(to_date) start = _local(fd); end = _local(td) + timedelta(days=1) except ValueError: start = _local(today); end = start + timedelta(days=1) else: # fallback → today start = _local(today); end = start + timedelta(days=1) fmt = "%Y-%m-%d %H:%M:%S" return (start.astimezone(timezone.utc).strftime(fmt), end.astimezone(timezone.utc).strftime(fmt)) @app.get("/cashier/stats") def cashier_stats(period: str = "today", from_date: Optional[str] = None, to_date: Optional[str] = None, user: dict = Depends(cashier_user)): s = _settings sym = s.get("currency_symbol", "£") div = int(s.get("currency_divisor") or 100) def fmt(v: int) -> str: return f"{sym}{v / div:.2f}" bounds = _period_bounds(period, s, from_date, to_date) with db_conn() as conn: credit = conn.execute( "SELECT COALESCE(SUM(CASE WHEN type='topup' THEN amount ELSE -amount END),0) FROM ledger_entries" ).fetchone()[0] if bounds: rows = conn.execute( """SELECT type, COUNT(*) cnt, COALESCE(SUM(amount),0) total FROM ledger_entries WHERE created_at >= ? AND created_at < ? GROUP BY type""", bounds ).fetchall() else: rows = conn.execute( """SELECT type, COUNT(*) cnt, COALESCE(SUM(amount),0) total FROM ledger_entries GROUP BY type""" ).fetchall() by_type = {r["type"]: {"count": r["cnt"], "total": r["total"]} for r in rows} def stat(t): d = by_type.get(t, {"count": 0, "total": 0}) return {"count": d["count"], "total": d["total"], "display": fmt(d["total"])} tu = by_type.get("topup", {"total": 0})["total"] wd = by_type.get("withdrawal", {"total": 0})["total"] ch = by_type.get("charge", {"total": 0})["total"] net = tu - wd - ch return { "outstanding_credit": credit, "outstanding_credit_display": fmt(credit), "topups": stat("topup"), "withdrawals": stat("withdrawal"), "charges": stat("charge"), "net": {"total": net, "display": fmt(abs(net)), "negative": net < 0}, "period_from": start_utc[:10], "period_to": end_utc[:10], } @app.get("/members/{member_id}/transactions") def transactions(member_id: int, limit: int = 50, offset: int = 0, user: dict = Depends(current_user)): with db_conn() as conn: member = conn.execute("SELECT * FROM members WHERE id=?", (member_id,)).fetchone() if not member: raise HTTPException(404, "Member not found") rows = conn.execute(""" SELECT * FROM ledger_entries WHERE member_id=? ORDER BY created_at DESC LIMIT ? OFFSET ? """, (member_id, limit, offset)).fetchall() bal = member_balance(conn, member_id) return { "member": {"id": member["id"], "member_number": member["member_number"], "name": member["name"]}, "balance": bal, "balance_display": format_amount(bal), "transactions": [ {"id": r["id"], "amount": r["amount"], "amount_display": format_amount(r["amount"]), "type": r["type"], "venue": r["venue"], "note": r["note"], "staff_name": r["staff_name"], "created_at": r["created_at"]} for r in rows ], } # --------------------------------------------------------------------------- # Print views (no auth – opened as new-tab popups) # --------------------------------------------------------------------------- def _print_size_script(s: dict): size = "A5" if (s.get("paper_size") or "A4").upper() == "A5" else "A4" return f"""""" def _print_controls(s: dict, extra: str = "") -> str: size = "A5" if (s.get("paper_size") or "A4").upper() == "A5" else "A4" a4_chk = ' checked' if size == "A4" else '' a5_chk = ' checked' if size == "A5" else '' return f"""
Paper: {extra}
""" def _txn_ref(entry_id: int, s: dict) -> str: prefix = (s.get("txn_ref_prefix") or "TXN").strip() return f"{prefix}{entry_id:07d}" def _logo_html(s: dict) -> str: url = (s.get("logo_url") or "").strip() if not url: return "" align = s.get("logo_align", "left") max_w = int(s.get("logo_max_width", 200) or 200) max_h = int(s.get("logo_max_height", 80) or 80) style = f"max-width:{max_w}px;max-height:{max_h}px;" css_cl = f"biz-logo align-{align}" if align in ("left","center","right") else "biz-logo" return f'logo' def _biz_header_html(s: dict) -> str: logo = _logo_html(s) name = s.get("club_name") or "ClubLedger" addr = [( s.get(f"biz_address{i}") or "").strip() for i in range(1,5)] addr += [(s.get("biz_country") or "").strip()] addr = [l for l in addr if l] contacts = [] if (s.get("biz_phone") or "").strip(): contacts.append(f'Tel.   {s["biz_phone"]}') if (s.get("biz_email") or "").strip(): contacts.append(f'Email: {s["biz_email"]}') if (s.get("biz_website") or "").strip(): contacts.append(f'Web:   {s["biz_website"]}') parts = [] if logo: parts.append(logo) parts.append(f'
{name}
') if addr and contacts: parts.append( f'
' f'
{"
".join(addr)}
' f'
{"
".join(contacts)}
' f'
' ) elif addr: parts.append(f'
{"
".join(addr)}
') elif contacts: parts.append(f'
{"
".join(contacts)}
') return '
' + "\n".join(parts) + "
" def _rx_cell(label: str, value: str, extra_cls: str = "") -> str: val_cls = ("rx-val " + extra_cls).strip() return f'
{label}
{value}
' RECEIPT_CSS = """ body{font-family:Arial,sans-serif;font-size:11pt;color:#111;margin:28px;} hr{border:none;border-top:1px solid #ccc;margin:12px 0;} .controls{display:flex;align-items:center;gap:12px;margin-bottom:16px;flex-wrap:wrap;} .size-label{font-size:10pt;color:#555;} .controls label{font-size:10pt;cursor:pointer;} .print-btn{padding:6px 16px;font-size:10pt;cursor:pointer;margin-left:auto;} @media print{.no-print{display:none;}} /* Business header */ .biz-logo{display:block;margin-bottom:8px;} .biz-logo.align-center{margin-left:auto;margin-right:auto;} .biz-logo.align-right{margin-left:auto;} .biz-name{font-size:14pt;font-weight:bold;margin:4px 0 6px;} .biz-info-row{display:flex;justify-content:space-between;align-items:flex-start;gap:24px;font-size:10pt;line-height:1.7;} .biz-addr{line-height:1.7;} .biz-contacts{text-align:right;white-space:nowrap;line-height:1.7;} /* Receipt */ .rx-title{font-size:13pt;font-weight:bold;text-transform:uppercase;letter-spacing:.06em;margin:14px 0 12px;} .rx-grid{display:grid;grid-template-columns:1fr 1fr;gap:14px 40px;margin:10px 0;} .rx-cell{} .rx-lbl{font-size:9pt;font-weight:700;color:#555;text-transform:uppercase;letter-spacing:.05em;margin-bottom:3px;} .rx-val{font-size:11pt;} .rx-val.bold{font-weight:bold;} .rx-val.large{font-size:13pt;font-weight:bold;} .rx-val.charge{color:#c00;} .rx-val.credit{color:#080;} .footer{margin-top:20px;font-size:10pt;color:#444;line-height:1.7;white-space:pre-wrap;} /* Statement */ h2{font-size:13pt;font-weight:bold;margin:14px 0 4px;} .stmt-info{font-size:10pt;color:#555;margin-bottom:12px;line-height:1.6;} table{width:100%;border-collapse:collapse;margin-top:4px;font-size:10pt;} th{border-bottom:2px solid #222;padding:6px 8px 6px 0;text-align:left;font-size:9pt;font-weight:700;white-space:nowrap;} td{padding:5px 8px 5px 0;border-bottom:1px solid #e0e0e0;vertical-align:top;} th.rnum,td.rnum{text-align:right;padding-right:0;} .credit{color:#080;} .debit{color:#c00;} .sub-row td{font-size:10pt;color:#555;padding-top:0;border-bottom:none;padding-left:88px;} .balance-box{margin-top:14px;text-align:right;font-size:11pt;font-weight:bold;} """ _STMT_PERIODS = [ ("all", "All time"), ("today", "Today"), ("week", "This week"), ("last_week", "Last week"), ("month", "This month"), ("last_month", "Last month"), ("quarter", "This quarter"), ("last_quarter", "Last quarter"), ("year", "This year"), ("last_year", "Last year"), ("custom", "Custom…"), ] def _stmt_period_selector(period: str, from_date: str, to_date: str) -> str: opts = "".join( f'' for v, lbl in _STMT_PERIODS ) fd = from_date or ""; td = to_date or "" vis = "inline-flex" if period == "custom" else "none" return ( f'' f'' f'' f'to' f'' f'' f'' ) _STMT_SCRIPT = """""" @app.get("/members/{member_id}/statement", response_class=HTMLResponse) def statement(member_id: int, period: str = "all", from_date: Optional[str] = None, to_date: Optional[str] = None): s = _settings bounds = _period_bounds(period, s, from_date, to_date) with db_conn() as conn: member = conn.execute("SELECT * FROM members WHERE id=?", (member_id,)).fetchone() if not member: raise HTTPException(404, "Member not found") if bounds: opening_bal = conn.execute( """SELECT COALESCE(SUM(CASE WHEN type='topup' THEN amount ELSE -amount END),0) FROM ledger_entries WHERE member_id=? AND created_at < ?""", (member_id, bounds[0]) ).fetchone()[0] rows = conn.execute( "SELECT * FROM ledger_entries WHERE member_id=? AND created_at>=? AND created_at+ {fmt(r["amount"])}' type_lbl = "Top-up" elif r["type"] == "withdrawal": running -= r["amount"] amt_html = f'- {fmt(r["amount"])}' type_lbl = "Withdrawal" else: running -= r["amount"] amt_html = f'- {fmt(r["amount"])}' type_lbl = "Charge" rows_html += ( f"{_fmt_dt(r['created_at'], s)}{txn_ref}" f"{type_lbl}{venue}{r['staff_name']}" f"{amt_html}{fmt(running)}" ) # Detail sub-row sub = "" if r["type"] in ("topup","withdrawal"): tf_type = r["transfer_type"] or "" tf_ref = r["transfer_ref"] or "" if tf_type and tf_ref: sub = f"Transfer type: {tf_type} — {tf_ref}" elif tf_type: sub = f"Transfer type: {tf_type}" elif tf_ref: sub = f"Ref: {tf_ref}" elif r["note"]: sub = r["note"] if sub: rows_html += f'{sub}' period_lbl = dict(_STMT_PERIODS).get(period, period) if period == "custom" and bounds: period_lbl = f"{bounds[0][:10]} to {bounds[1][:10]}" return f""" Statement — {member['name']} {_print_controls(s, extra=_stmt_period_selector(period, from_date or '', to_date or ''))} {_biz_header_html(s)}

Account Statement

Member: {member['name']} — #{member['member_number']}
Period: {period_lbl} — Generated: {_now_display(s)}
{rows_html}
Date and TimeReferenceTypeVenue StaffAmountBalance
Current Balance: {fmt(bal)}
{('') if footer else ''} {_STMT_SCRIPT} {_print_size_script(s)}""" @app.get("/receipt/{entry_id}", response_class=HTMLResponse) def receipt(entry_id: int): s = _settings with db_conn() as conn: entry = conn.execute("SELECT * FROM ledger_entries WHERE id=?", (entry_id,)).fetchone() if not entry: raise HTTPException(404, "Receipt not found") member = conn.execute("SELECT * FROM members WHERE id=?", (entry["member_id"],)).fetchone() bal_after = conn.execute(""" SELECT COALESCE(SUM(CASE WHEN type='topup' THEN amount ELSE -amount END),0) FROM ledger_entries WHERE member_id=? AND id<=? """, (entry["member_id"], entry_id)).fetchone()[0] sym, div = s.get("currency_symbol","£"), s.get("currency_divisor",100) def fmt(p): return f"{sym}{p/div:.2f}" txn_ref = _txn_ref(entry_id, s) etype = entry["type"] venue_name = s.get("bar_name","Bar") if entry["venue"]=="bar" else s.get("cashier_name","Cashier") tf_type = entry["transfer_type"] or "" tf_ref = entry["transfer_ref"] or "" timestamp = _fmt_dt(entry["created_at"], s) lbl_staff = s.get("lbl_staff", "STAFF") lbl_txn = s.get("lbl_transaction", "TRANSACTION") lbl_txn_time = s.get("lbl_txn_time", "TRANSACTION TIME") lbl_remaining = s.get("lbl_remaining_balance", "REMAINING BALANCE") if etype == "topup": title = s.get("lbl_topup_receipt", "TOP-UP RECEIPT") footer = s.get("receipt_footer_cashier") or s.get("receipt_footer","") lbl_tf_sec = s.get("lbl_balance_transfer", "BALANCE TRANSFER") lbl_amount = s.get("lbl_amount_topup", "AMOUNT TOPPED-UP") tf_label = "Top-up" amount_cls = "large credit" elif etype == "withdrawal": title = s.get("lbl_withdrawal_receipt","WITHDRAWAL RECEIPT") footer = s.get("receipt_footer_cashier") or s.get("receipt_footer","") lbl_tf_sec = s.get("lbl_balance_transfer", "BALANCE TRANSFER") lbl_amount = s.get("lbl_amount_withdrawal","AMOUNT WITHDRAWN") tf_label = "Withdrawal" amount_cls = "large charge" else: title = s.get("lbl_receipt", "RECEIPT") footer = s.get("receipt_footer_charge") or s.get("receipt_footer","") lbl_charge = s.get("lbl_charge_venue", "CHARGE") lbl_amount = s.get("lbl_amount_charged", "AMOUNT CHARGED") if etype == "charge": body_html = f"""
{_rx_cell(lbl_staff, entry['staff_name'])} {_rx_cell(lbl_txn, txn_ref)}

{_rx_cell(lbl_charge, venue_name)} {_rx_cell(lbl_txn_time, timestamp)}

{_rx_cell(lbl_amount, fmt(entry['amount']), 'large charge')} {_rx_cell(lbl_remaining, fmt(bal_after), 'large')}
""" else: lbl_tf_type = s.get("lbl_transfer_type", "TRANSFER TYPE") lbl_tf_ref = s.get("lbl_transfer_ref", "TRANSFER REFERENCE") body_html = f"""
{_rx_cell(lbl_staff, entry['staff_name'])} {_rx_cell(lbl_txn, txn_ref)}

{_rx_cell(lbl_tf_sec, tf_label)} {_rx_cell(lbl_txn_time, timestamp)}

{_rx_cell(lbl_amount, fmt(entry['amount']), amount_cls)} {_rx_cell(lbl_remaining, fmt(bal_after), 'large')}

{_rx_cell(lbl_tf_type, tf_type or '—')} {_rx_cell(lbl_tf_ref, tf_ref or '—')}
""" return f""" Receipt — {member['name']} {_print_controls(s)} {_biz_header_html(s)}
{title}
{body_html}
{('') if footer else ''} {_print_size_script(s)}""" # --------------------------------------------------------------------------- # Products # --------------------------------------------------------------------------- @app.get("/products") def list_products(q: Optional[str] = None, active_only: bool = True, user: dict = Depends(current_user)): with db_conn() as conn: conds, params = [], [] if active_only: conds.append("active=1") if q: conds.append("(name LIKE ? OR brand LIKE ? OR search_tags LIKE ?)") p = f"%{q}%"; params += [p, p, p] sql = "SELECT * FROM products" + (" WHERE " + " AND ".join(conds) if conds else "") + " ORDER BY name" rows = conn.execute(sql, params).fetchall() return [{"id": r["id"], "name": r["name"], "brand": r["brand"], "price": r["price"], "price_display": format_amount(r["price"]), "member_price": r["member_price"], "member_price_display": format_amount(r["member_price"]) if r["member_price"] else None, "search_tags": r["search_tags"], "active": bool(r["active"])} for r in rows] @app.post("/products") def create_product(body: ProductCreate, user: dict = Depends(current_user)): with db_conn() as conn: cur = conn.execute( "INSERT INTO products (name,brand,price,member_price,search_tags) VALUES (?,?,?,?,?)", (body.name, body.brand, body.price, body.member_price, body.search_tags) ) return {"id": cur.lastrowid, "ok": True} # --------------------------------------------------------------------------- # Legacy staff name list (backward compat with cashier.html / bar.html) # --------------------------------------------------------------------------- @app.get("/staff") def get_staff(user: dict = Depends(current_user)): return {"staff": load_staff()} @app.post("/staff") def add_staff(body: StaffAdd, user: dict = Depends(current_user)): name = body.name.strip() if not name: raise HTTPException(400, "Name cannot be empty") staff = load_staff() if name not in staff: staff.append(name); save_staff(staff) return {"staff": sorted(staff)} @app.delete("/staff/{name}") def remove_staff(name: str, user: dict = Depends(current_user)): staff = [s for s in load_staff() if s != name] save_staff(staff) return {"staff": staff} # --------------------------------------------------------------------------- # Admin – staff accounts # --------------------------------------------------------------------------- @app.get("/admin/staff-accounts") def list_staff_accounts(user: dict = Depends(admin_user)): with db_conn() as conn: rows = conn.execute( "SELECT id,name,username,role,active,created_at FROM staff_accounts ORDER BY name" ).fetchall() return [dict(r) for r in rows] @app.post("/admin/staff-accounts") def create_staff_account(body: StaffAccountCreate, user: dict = Depends(admin_user)): if body.role not in ("cashier", "pos-staff", "admin"): raise HTTPException(400, "Role must be 'cashier', 'pos-staff', or 'admin'") with db_conn() as conn: if conn.execute("SELECT id FROM staff_accounts WHERE username=?", (body.username.strip(),)).fetchone(): raise HTTPException(400, "Username already taken") pw = bcrypt.hashpw(body.password.encode(), bcrypt.gensalt()).decode() cur = conn.execute( "INSERT INTO staff_accounts (name,username,password_hash,role) VALUES (?,?,?,?)", (body.name.strip(), body.username.strip(), pw, body.role) ) return {"id": cur.lastrowid, "ok": True} @app.put("/admin/staff-accounts/{account_id}") def update_staff_account(account_id: int, body: StaffAccountUpdate, user: dict = Depends(admin_user)): with db_conn() as conn: if not conn.execute("SELECT id FROM staff_accounts WHERE id=?", (account_id,)).fetchone(): raise HTTPException(404, "Account not found") updates = {} if body.name is not None: updates["name"] = body.name.strip() if body.username is not None: if conn.execute("SELECT id FROM staff_accounts WHERE username=? AND id!=?", (body.username.strip(), account_id)).fetchone(): raise HTTPException(400, "Username already taken") updates["username"] = body.username.strip() if body.password is not None: updates["password_hash"] = bcrypt.hashpw(body.password.encode(), bcrypt.gensalt()).decode() if body.role is not None: if body.role not in ("cashier","pos-staff","admin"): raise HTTPException(400, "Invalid role") updates["role"] = body.role if body.active is not None: updates["active"] = 1 if body.active else 0 if updates: conn.execute( f"UPDATE staff_accounts SET {', '.join(f'{k}=?' for k in updates)} WHERE id=?", list(updates.values()) + [account_id] ) return {"ok": True} @app.delete("/admin/staff-accounts/{account_id}") def delete_staff_account(account_id: int, user: dict = Depends(admin_user)): if account_id == user["user_id"]: raise HTTPException(400, "Cannot delete your own account") with db_conn() as conn: row = conn.execute("SELECT * FROM staff_accounts WHERE id=?", (account_id,)).fetchone() if not row: raise HTTPException(404, "Account not found") if row["role"] == "admin": if conn.execute("SELECT COUNT(*) FROM staff_accounts WHERE role='admin'").fetchone()[0] <= 1: raise HTTPException(400, "Cannot delete the last admin account") conn.execute("DELETE FROM staff_accounts WHERE id=?", (account_id,)) return {"ok": True} # --------------------------------------------------------------------------- # Admin – app settings # --------------------------------------------------------------------------- @app.get("/admin/settings") def get_admin_settings(user: dict = Depends(admin_user)): return _settings _OVERDRAFT_POLICIES = ("never", "always", "staff_override", "admin_override", "staff_block") @app.post("/admin/settings") def update_admin_settings(body: AppSettingsUpdate, user: dict = Depends(admin_user)): if body.overdraft_policy is not None and body.overdraft_policy not in _OVERDRAFT_POLICIES: raise HTTPException(400, "Invalid overdraft policy") with db_conn() as conn: for field in body.model_fields_set: val = getattr(body, field) if val is not None: conn.execute( "INSERT OR REPLACE INTO app_settings (key,value) VALUES (?,?)", (field, json.dumps(val)) ) refresh_settings() return _settings # --------------------------------------------------------------------------- # Admin – logo upload # --------------------------------------------------------------------------- @app.post("/admin/logo") async def upload_logo(file: UploadFile = File(...), user: dict = Depends(admin_user)): content_type = file.content_type or "" if not content_type.startswith("image/"): raise HTTPException(400, "Only image files are allowed") suffix = Path(file.filename or "logo.png").suffix.lower() if suffix not in (".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"): suffix = ".png" dest = static_dir / f"logo{suffix}" dest.write_bytes(await file.read()) url = f"/static/logo{suffix}" with db_conn() as conn: conn.execute("INSERT OR REPLACE INTO app_settings (key,value) VALUES (?,?)", ("logo_url", json.dumps(url))) refresh_settings() return {"url": url} # --------------------------------------------------------------------------- # Config (public – loaded by frontend before login screen shows) # --------------------------------------------------------------------------- @app.get("/config") def get_config(): s = dict(_settings) s["currency_unit"] = s.get("currency_major", "pounds") raw_tt = s.get("transfer_types", "Bank Transfer,Cash,QR") s["transfer_types"] = [t.strip() for t in raw_tt.split(",") if t.strip()] return s if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)