"""Telegram bridge: webhook receiver, paste-to-tmux only. Refactored from the original 2-second polling loop to a fully event-driven design: outgoing assistant messages are now pushed by the Claude Code Stop hook (/usr/local/bin/cc-tg-notify-stop). This bridge only handles the inbound side — Telegram → tmux paste. Config (/etc/food-market/telegram.env or env vars): TELEGRAM_BOT_TOKEN — bot token (required) TELEGRAM_CHAT_ID — single whitelisted chat id (required) TELEGRAM_WEBHOOK_URL — public URL Telegram should POST to (default: https://food-market.zat.kz/tg-webhook) TELEGRAM_WEBHOOK_SECRET — random secret; bridge validates the X-Telegram-Bot-Api-Secret-Token header on every incoming request and Telegram sends it back so third parties can't forge updates TMUX_SESSION — tmux session to paste into (default: claude) WEBHOOK_LISTEN_HOST — local bind host (default: 127.0.0.1) WEBHOOK_LISTEN_PORT — local bind port (default: 8765) """ from __future__ import annotations import asyncio import logging import os import subprocess import sys from pathlib import Path from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ) ENV_FILE = Path("/etc/food-market/telegram.env") TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude") LISTEN_HOST = os.environ.get("WEBHOOK_LISTEN_HOST", "127.0.0.1") LISTEN_PORT = int(os.environ.get("WEBHOOK_LISTEN_PORT", "8765")) WEBHOOK_PATH = "/tg-webhook" logger = logging.getLogger("bridge") def load_env(path: Path) -> dict[str, str]: out: dict[str, str] = {} if not path.exists(): return out for raw in path.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, value = line.partition("=") out[key.strip()] = value.strip().strip('"').strip("'") return out async def tmux_send_text(session: str, text: str) -> None: """Pastes one Telegram message verbatim into the tmux session, then Enter. Uses `send-keys -l` for literal paste — no key-binding interpretation, works for arbitrary text including unicode and special chars. """ proc = await asyncio.create_subprocess_exec( "tmux", "send-keys", "-t", session, "-l", text, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}") proc = await asyncio.create_subprocess_exec( "tmux", "send-keys", "-t", session, "Enter", stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}") def _allowed(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: chat_id = context.application.bot_data["chat_id"] return update.effective_chat is not None and update.effective_chat.id == chat_id async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if not _allowed(update, context): return await update.message.reply_text(f"pong — webhook mode, tmux session «{TMUX_SESSION}»") QUIET_FLAG = "/tmp/cc-tg-quiet" async def cmd_quiet(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Заткнуть PreToolUse прогресс-ленту (Stop hook продолжает работать).""" if not _allowed(update, context): return try: open(QUIET_FLAG, "w").close() await update.message.reply_text("🔕 Прогресс-лента отключена. Включить — /loud") except Exception as exc: # noqa: BLE001 await update.message.reply_text(f"⚠️ {exc}") async def cmd_loud(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Включить обратно PreToolUse прогресс-ленту.""" if not _allowed(update, context): return try: os.unlink(QUIET_FLAG) except FileNotFoundError: pass except Exception as exc: # noqa: BLE001 await update.message.reply_text(f"⚠️ {exc}") return await update.message.reply_text("🔔 Прогресс-лента включена.") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if not _allowed(update, context): return text = (update.message.text or "").strip() if update.message else "" if not text: return logger.info("inbound message: %d chars", len(text)) try: await tmux_send_text(TMUX_SESSION, text) except Exception as exc: # noqa: BLE001 logger.warning("paste to tmux failed: %s", exc) await update.message.reply_text(f"⚠️ tmux error: {exc}") def main() -> int: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) env = {**os.environ, **load_env(ENV_FILE)} token = env.get("TELEGRAM_BOT_TOKEN", "").strip() chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip() secret = env.get("TELEGRAM_WEBHOOK_SECRET", "").strip() webhook_url = env.get("TELEGRAM_WEBHOOK_URL", "https://food-market.zat.kz/tg-webhook").strip() if not token or not chat_id_raw: print("ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID required", file=sys.stderr) return 78 try: chat_id = int(chat_id_raw) except ValueError: print(f"ERROR: TELEGRAM_CHAT_ID must be int, got {chat_id_raw!r}", file=sys.stderr) return 78 if not secret: logger.warning("TELEGRAM_WEBHOOK_SECRET is empty — webhook is unauthenticated") application = ApplicationBuilder().token(token).build() application.bot_data["chat_id"] = chat_id application.add_handler(CommandHandler("ping", cmd_ping)) application.add_handler(CommandHandler("quiet", cmd_quiet)) application.add_handler(CommandHandler("loud", cmd_loud)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) logger.info("starting webhook listener on %s:%d → %s", LISTEN_HOST, LISTEN_PORT, webhook_url) application.run_webhook( listen=LISTEN_HOST, port=LISTEN_PORT, url_path=WEBHOOK_PATH.lstrip("/"), webhook_url=webhook_url, secret_token=secret or None, allowed_updates=Update.ALL_TYPES, drop_pending_updates=False, stop_signals=None, ) return 0 if __name__ == "__main__": sys.exit(main())