food-market/deploy/telegram-bridge/bridge.py
nns 4acb51c270
Some checks are pending
CI / POS (WPF, Windows) (push) Waiting to run
CI / Backend (.NET 8) (push) Successful in 1m22s
CI / Web (React + Vite) (push) Successful in 40s
feat(bridge): /quiet и /loud команды для управления PreToolUse прогресс-лентой
- /quiet → создаёт /tmp/cc-tg-quiet, pretool-hook сразу выходит,
  Stop hook продолжает работать (финальные ответы летят).
- /loud  → удаляет flag, прогресс-лента возобновляется.
- /ping без изменений.

На стенде bridge перезапущен, setWebhook 200.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 14:59:05 +05:00

181 lines
6.7 KiB
Python

"""Telegram bridge: webhook receiver, paste-to-tmux only.
Refactored from the original 2-second polling loop to a fully event-driven
design: outgoing assistant messages are now pushed by the Claude Code Stop
hook (/usr/local/bin/cc-tg-notify-stop). This bridge only handles the
inbound side — Telegram → tmux paste.
Config (/etc/food-market/telegram.env or env vars):
TELEGRAM_BOT_TOKEN — bot token (required)
TELEGRAM_CHAT_ID — single whitelisted chat id (required)
TELEGRAM_WEBHOOK_URL — public URL Telegram should POST to
(default: https://food-market.zat.kz/tg-webhook)
TELEGRAM_WEBHOOK_SECRET — random secret; bridge validates the
X-Telegram-Bot-Api-Secret-Token header on every
incoming request and Telegram sends it back so
third parties can't forge updates
TMUX_SESSION — tmux session to paste into (default: claude)
WEBHOOK_LISTEN_HOST — local bind host (default: 127.0.0.1)
WEBHOOK_LISTEN_PORT — local bind port (default: 8765)
"""
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
import sys
from pathlib import Path
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
ENV_FILE = Path("/etc/food-market/telegram.env")
TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude")
LISTEN_HOST = os.environ.get("WEBHOOK_LISTEN_HOST", "127.0.0.1")
LISTEN_PORT = int(os.environ.get("WEBHOOK_LISTEN_PORT", "8765"))
WEBHOOK_PATH = "/tg-webhook"
logger = logging.getLogger("bridge")
def load_env(path: Path) -> dict[str, str]:
out: dict[str, str] = {}
if not path.exists():
return out
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
out[key.strip()] = value.strip().strip('"').strip("'")
return out
async def tmux_send_text(session: str, text: str) -> None:
"""Pastes one Telegram message verbatim into the tmux session, then Enter.
Uses `send-keys -l` for literal paste — no key-binding interpretation,
works for arbitrary text including unicode and special chars.
"""
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "-l", text,
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}")
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "Enter",
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}")
def _allowed(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
chat_id = context.application.bot_data["chat_id"]
return update.effective_chat is not None and update.effective_chat.id == chat_id
async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not _allowed(update, context):
return
await update.message.reply_text(f"pong — webhook mode, tmux session «{TMUX_SESSION}»")
QUIET_FLAG = "/tmp/cc-tg-quiet"
async def cmd_quiet(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Заткнуть PreToolUse прогресс-ленту (Stop hook продолжает работать)."""
if not _allowed(update, context):
return
try:
open(QUIET_FLAG, "w").close()
await update.message.reply_text("🔕 Прогресс-лента отключена. Включить — /loud")
except Exception as exc: # noqa: BLE001
await update.message.reply_text(f"⚠️ {exc}")
async def cmd_loud(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Включить обратно PreToolUse прогресс-ленту."""
if not _allowed(update, context):
return
try:
os.unlink(QUIET_FLAG)
except FileNotFoundError:
pass
except Exception as exc: # noqa: BLE001
await update.message.reply_text(f"⚠️ {exc}")
return
await update.message.reply_text("🔔 Прогресс-лента включена.")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not _allowed(update, context):
return
text = (update.message.text or "").strip() if update.message else ""
if not text:
return
logger.info("inbound message: %d chars", len(text))
try:
await tmux_send_text(TMUX_SESSION, text)
except Exception as exc: # noqa: BLE001
logger.warning("paste to tmux failed: %s", exc)
await update.message.reply_text(f"⚠️ tmux error: {exc}")
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
env = {**os.environ, **load_env(ENV_FILE)}
token = env.get("TELEGRAM_BOT_TOKEN", "").strip()
chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip()
secret = env.get("TELEGRAM_WEBHOOK_SECRET", "").strip()
webhook_url = env.get("TELEGRAM_WEBHOOK_URL", "https://food-market.zat.kz/tg-webhook").strip()
if not token or not chat_id_raw:
print("ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID required", file=sys.stderr)
return 78
try:
chat_id = int(chat_id_raw)
except ValueError:
print(f"ERROR: TELEGRAM_CHAT_ID must be int, got {chat_id_raw!r}", file=sys.stderr)
return 78
if not secret:
logger.warning("TELEGRAM_WEBHOOK_SECRET is empty — webhook is unauthenticated")
application = ApplicationBuilder().token(token).build()
application.bot_data["chat_id"] = chat_id
application.add_handler(CommandHandler("ping", cmd_ping))
application.add_handler(CommandHandler("quiet", cmd_quiet))
application.add_handler(CommandHandler("loud", cmd_loud))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("starting webhook listener on %s:%d%s", LISTEN_HOST, LISTEN_PORT, webhook_url)
application.run_webhook(
listen=LISTEN_HOST,
port=LISTEN_PORT,
url_path=WEBHOOK_PATH.lstrip("/"),
webhook_url=webhook_url,
secret_token=secret or None,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=False,
stop_signals=None,
)
return 0
if __name__ == "__main__":
sys.exit(main())