food-market/deploy/telegram-bridge/bridge.py
nns d2305b7d40 feat(infra): event-driven Telegram bridge — webhook + Stop hook
Полный отказ от 2-секундного polling tmux'а в пользу реактивной схемы:

OUTBOUND (server-Claude → Telegram) через Stop hook:
- /usr/local/bin/cc-tg-notify-stop (Bash) читает transcript из stdin
  (Claude Code передаёт {transcript_path}), достаёт последнюю
  assistant-запись с непустым text-блоком (jq), чанкует ≤4000 символов
  с префиксом «🤖 [food-market]», POST'ит в Telegram через curl.
  Логи /var/log/cc-tg-notify.log. Если turn без текстового ответа
  (только tool calls) — выходит молча.
- Зарегистрирован в ~/.claude/settings.json под Stop event с пустым
  matcher (все turns).

INBOUND (Telegram → bridge → tmux) через webhook:
- bridge.py переписан с run_polling на run_webhook listening
  127.0.0.1:8765 на /tg-webhook. python-telegram-bot[webhooks]
  (tornado) ставится через pip.
- При старте сам делает setWebhook к Telegram API с secret_token
  из TELEGRAM_WEBHOOK_SECRET (osprandom 24 hex), Telegram присылает
  его обратно в X-Telegram-Bot-Api-Secret-Token — PTB валидирует
  до вызова handler'ов.
- Сохранены: whitelist по chat_id, paste-в-tmux через
  send-keys -l + Enter, /ping команда. Удалён poll_and_forward,
  diff/clean логика, recently_sent_lines дедуп — больше не нужны.

Nginx: новый location = /tg-webhook на food-market-stage.conf,
проксирует на 127.0.0.1:8765 с прокидыванием X-Telegram-Bot-Api-
Secret-Token. Smoke-test: curl с неверным секретом → 403.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 12:38:44 +05:00

151 lines
5.5 KiB
Python

"""Telegram bridge: webhook receiver, paste-to-tmux only.
Refactored from the original 2-second polling loop to a fully event-driven
design: outgoing assistant messages are now pushed by the Claude Code Stop
hook (/usr/local/bin/cc-tg-notify-stop). This bridge only handles the
inbound side — Telegram → tmux paste.
Config (/etc/food-market/telegram.env or env vars):
TELEGRAM_BOT_TOKEN — bot token (required)
TELEGRAM_CHAT_ID — single whitelisted chat id (required)
TELEGRAM_WEBHOOK_URL — public URL Telegram should POST to
(default: https://food-market.zat.kz/tg-webhook)
TELEGRAM_WEBHOOK_SECRET — random secret; bridge validates the
X-Telegram-Bot-Api-Secret-Token header on every
incoming request and Telegram sends it back so
third parties can't forge updates
TMUX_SESSION — tmux session to paste into (default: claude)
WEBHOOK_LISTEN_HOST — local bind host (default: 127.0.0.1)
WEBHOOK_LISTEN_PORT — local bind port (default: 8765)
"""
from __future__ import annotations
import asyncio
import logging
import os
import subprocess
import sys
from pathlib import Path
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
ENV_FILE = Path("/etc/food-market/telegram.env")
TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude")
LISTEN_HOST = os.environ.get("WEBHOOK_LISTEN_HOST", "127.0.0.1")
LISTEN_PORT = int(os.environ.get("WEBHOOK_LISTEN_PORT", "8765"))
WEBHOOK_PATH = "/tg-webhook"
logger = logging.getLogger("bridge")
def load_env(path: Path) -> dict[str, str]:
out: dict[str, str] = {}
if not path.exists():
return out
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
out[key.strip()] = value.strip().strip('"').strip("'")
return out
async def tmux_send_text(session: str, text: str) -> None:
"""Pastes one Telegram message verbatim into the tmux session, then Enter.
Uses `send-keys -l` for literal paste — no key-binding interpretation,
works for arbitrary text including unicode and special chars.
"""
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "-l", text,
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}")
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "Enter",
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}")
def _allowed(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
chat_id = context.application.bot_data["chat_id"]
return update.effective_chat is not None and update.effective_chat.id == chat_id
async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not _allowed(update, context):
return
await update.message.reply_text(f"pong — webhook mode, tmux session «{TMUX_SESSION}»")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not _allowed(update, context):
return
text = (update.message.text or "").strip() if update.message else ""
if not text:
return
logger.info("inbound message: %d chars", len(text))
try:
await tmux_send_text(TMUX_SESSION, text)
except Exception as exc: # noqa: BLE001
logger.warning("paste to tmux failed: %s", exc)
await update.message.reply_text(f"⚠️ tmux error: {exc}")
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
env = {**os.environ, **load_env(ENV_FILE)}
token = env.get("TELEGRAM_BOT_TOKEN", "").strip()
chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip()
secret = env.get("TELEGRAM_WEBHOOK_SECRET", "").strip()
webhook_url = env.get("TELEGRAM_WEBHOOK_URL", "https://food-market.zat.kz/tg-webhook").strip()
if not token or not chat_id_raw:
print("ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID required", file=sys.stderr)
return 78
try:
chat_id = int(chat_id_raw)
except ValueError:
print(f"ERROR: TELEGRAM_CHAT_ID must be int, got {chat_id_raw!r}", file=sys.stderr)
return 78
if not secret:
logger.warning("TELEGRAM_WEBHOOK_SECRET is empty — webhook is unauthenticated")
application = ApplicationBuilder().token(token).build()
application.bot_data["chat_id"] = chat_id
application.add_handler(CommandHandler("ping", cmd_ping))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("starting webhook listener on %s:%d%s", LISTEN_HOST, LISTEN_PORT, webhook_url)
application.run_webhook(
listen=LISTEN_HOST,
port=LISTEN_PORT,
url_path=WEBHOOK_PATH.lstrip("/"),
webhook_url=webhook_url,
secret_token=secret or None,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=False,
stop_signals=None,
)
return 0
if __name__ == "__main__":
sys.exit(main())