From d2305b7d4064d4180d90c44a67ad75487bc31085 Mon Sep 17 00:00:00 2001 From: nns <278048682+nurdotnet@users.noreply.github.com> Date: Sun, 26 Apr 2026 12:38:44 +0500 Subject: [PATCH] =?UTF-8?q?feat(infra):=20event-driven=20Telegram=20bridge?= =?UTF-8?q?=20=E2=80=94=20webhook=20+=20Stop=20hook?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Полный отказ от 2-секундного polling tmux'а в пользу реактивной схемы: OUTBOUND (server-Claude → Telegram) через Stop hook: - /usr/local/bin/cc-tg-notify-stop (Bash) читает transcript из stdin (Claude Code передаёт {transcript_path}), достаёт последнюю assistant-запись с непустым text-блоком (jq), чанкует ≤4000 символов с префиксом «🤖 [food-market]», POST'ит в Telegram через curl. Логи /var/log/cc-tg-notify.log. Если turn без текстового ответа (только tool calls) — выходит молча. - Зарегистрирован в ~/.claude/settings.json под Stop event с пустым matcher (все turns). INBOUND (Telegram → bridge → tmux) через webhook: - bridge.py переписан с run_polling на run_webhook listening 127.0.0.1:8765 на /tg-webhook. python-telegram-bot[webhooks] (tornado) ставится через pip. - При старте сам делает setWebhook к Telegram API с secret_token из TELEGRAM_WEBHOOK_SECRET (osprandom 24 hex), Telegram присылает его обратно в X-Telegram-Bot-Api-Secret-Token — PTB валидирует до вызова handler'ов. - Сохранены: whitelist по chat_id, paste-в-tmux через send-keys -l + Enter, /ping команда. Удалён poll_and_forward, diff/clean логика, recently_sent_lines дедуп — больше не нужны. Nginx: новый location = /tg-webhook на food-market-stage.conf, проксирует на 127.0.0.1:8765 с прокидыванием X-Telegram-Bot-Api- Secret-Token. Smoke-test: curl с неверным секретом → 403. Co-Authored-By: Claude Opus 4.7 (1M context) --- deploy/telegram-bridge/bridge.py | 392 ++++------------------- deploy/telegram-bridge/cc-tg-notify-stop | 99 ++++++ 2 files changed, 154 insertions(+), 337 deletions(-) create mode 100755 deploy/telegram-bridge/cc-tg-notify-stop diff --git a/deploy/telegram-bridge/bridge.py b/deploy/telegram-bridge/bridge.py index 7942f9c..1a3ac0e 100644 --- a/deploy/telegram-bridge/bridge.py +++ b/deploy/telegram-bridge/bridge.py @@ -1,31 +1,35 @@ -"""Telegram <-> tmux bridge for controlling a local Claude Code session from a phone. +"""Telegram bridge: webhook receiver, paste-to-tmux only. -Reads creds from /etc/food-market/telegram.env (TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID). -Only the single whitelisted chat_id is allowed; everything else is silently ignored. +Refactored from the original 2-second polling loop to a fully event-driven +design: outgoing assistant messages are now pushed by the Claude Code Stop +hook (/usr/local/bin/cc-tg-notify-stop). This bridge only handles the +inbound side — Telegram → tmux paste. -Inbound: each Telegram message is typed into tmux session 'claude' via `tmux send-keys --t claude -l ` followed by an Enter keypress. - -Outbound: every poll_interval seconds, capture the current pane, diff against the last -snapshot, filter TUI noise (box-drawing, spinners, the user's own echoed prompt), then -send any remaining text as plain Telegram messages. +Config (/etc/food-market/telegram.env or env vars): + TELEGRAM_BOT_TOKEN — bot token (required) + TELEGRAM_CHAT_ID — single whitelisted chat id (required) + TELEGRAM_WEBHOOK_URL — public URL Telegram should POST to + (default: https://food-market.zat.kz/tg-webhook) + TELEGRAM_WEBHOOK_SECRET — random secret; bridge validates the + X-Telegram-Bot-Api-Secret-Token header on every + incoming request and Telegram sends it back so + third parties can't forge updates + TMUX_SESSION — tmux session to paste into (default: claude) + WEBHOOK_LISTEN_HOST — local bind host (default: 127.0.0.1) + WEBHOOK_LISTEN_PORT — local bind port (default: 8765) """ from __future__ import annotations import asyncio -import collections import logging import os -import re import subprocess import sys -from dataclasses import dataclass, field from pathlib import Path from telegram import Update from telegram.ext import ( - Application, ApplicationBuilder, CommandHandler, ContextTypes, @@ -35,18 +39,11 @@ from telegram.ext import ( ENV_FILE = Path("/etc/food-market/telegram.env") TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude") -POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SEC", "8")) -CAPTURE_HISTORY = int(os.environ.get("CAPTURE_HISTORY_LINES", "200")) -TG_MAX_CHARS = 3500 -MAX_SEND_PER_TICK = int(os.environ.get("MAX_SEND_CHARS_PER_TICK", "900")) +LISTEN_HOST = os.environ.get("WEBHOOK_LISTEN_HOST", "127.0.0.1") +LISTEN_PORT = int(os.environ.get("WEBHOOK_LISTEN_PORT", "8765")) +WEBHOOK_PATH = "/tg-webhook" -ANSI_RE = re.compile(r"\x1b\[[0-9;?]*[a-zA-Z]") -BOX_CHARS = set("╭╮╰╯│─┌┐└┘├┤┬┴┼║═╔╗╚╝╠╣╦╩╬▌▐█▀▄") -SPINNER_CHARS = set("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⣾⣽⣻⢿⡿⣟⣯⣷") -# Claude Code TUI markers -TOOL_CALL_RE = re.compile(r"^\s*[⏺●⏻◯◎⬤]\s+\S") -TOOL_RESULT_RE = re.compile(r"^\s*⎿") -USER_PROMPT_RE = re.compile(r"^>\s?(.*)$") +logger = logging.getLogger("bridge") def load_env(path: Path) -> dict[str, str]: @@ -62,21 +59,12 @@ def load_env(path: Path) -> dict[str, str]: return out -@dataclass -class BridgeState: - chat_id: int - last_snapshot: str = "" - last_sent_text: str = "" - recent_user_inputs: collections.deque = field( - default_factory=lambda: collections.deque(maxlen=50) - ) - recently_sent_lines: collections.deque = field( - default_factory=lambda: collections.deque(maxlen=400) - ) - _lock: asyncio.Lock = field(default_factory=asyncio.Lock) - - async def tmux_send_text(session: str, text: str) -> None: + """Pastes one Telegram message verbatim into the tmux session, then Enter. + + Uses `send-keys -l` for literal paste — no key-binding interpretation, + works for arbitrary text including unicode and special chars. + """ proc = await asyncio.create_subprocess_exec( "tmux", "send-keys", "-t", session, "-l", text, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, @@ -84,7 +72,6 @@ async def tmux_send_text(session: str, text: str) -> None: _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}") - proc = await asyncio.create_subprocess_exec( "tmux", "send-keys", "-t", session, "Enter", stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, @@ -94,291 +81,31 @@ async def tmux_send_text(session: str, text: str) -> None: raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}") -async def tmux_capture(session: str) -> str: - proc = await asyncio.create_subprocess_exec( - "tmux", "capture-pane", "-t", session, "-p", "-S", f"-{CAPTURE_HISTORY}", - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - ) - stdout, stderr = await proc.communicate() - if proc.returncode != 0: - raise RuntimeError(f"tmux capture-pane failed: {stderr.decode().strip()}") - return stdout.decode("utf-8", errors="replace").rstrip("\n") +def _allowed(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: + chat_id = context.application.bot_data["chat_id"] + return update.effective_chat is not None and update.effective_chat.id == chat_id -def _strip_box(line: str) -> str: - # Drop leading/trailing box-drawing chars and their padding. - while line and (line[0] in BOX_CHARS or (line[0] == " " and len(line) > 1 and line[1] in BOX_CHARS)): - line = line[1:].lstrip() - if not line: - break - while line and (line[-1] in BOX_CHARS or (line[-1] == " " and len(line) > 1 and line[-2] in BOX_CHARS)): - line = line[:-1].rstrip() - if not line: - break - return line - - -def _is_noise(line: str) -> bool: - s = line.strip() - if not s: - return True - # Lines made of only box/spinner/decoration chars + spaces. - if all(c in BOX_CHARS or c in SPINNER_CHARS or c.isspace() for c in s): - return True - # Claude Code TUI hints. - lowered = s.lower() - if "shift+tab" in lowered or "bypass permissions" in lowered: - return True - if lowered.startswith("? for shortcuts"): - return True - # Spinner + status line ("✻ Thinking…", "· Pondering…"). - if s.startswith(("✻", "✽", "✢", "·")) and len(s) < 80: - return True - # Typing indicator prompt ("> " empty or near-empty input box). - if s.startswith(">") and len(s) <= 2: - return True - return False - - -def clean_text(text: str, recent_user: collections.deque | None = None) -> str: - """Strip TUI noise, tool-call blocks and echoed user input. - - Only the assistant's prose reply should survive. - """ - recent_user = recent_user if recent_user is not None else collections.deque() - out: list[str] = [] - in_tool_block = False - for raw in text.splitlines(): - line = ANSI_RE.sub("", raw).rstrip() - line = _strip_box(line) - stripped = line.strip() - - if not stripped: - in_tool_block = False - out.append("") - continue - - # Tool call / tool result blocks — skip the header and any indented follow-ups. - if TOOL_CALL_RE.match(line) or TOOL_RESULT_RE.match(line): - in_tool_block = True - continue - if in_tool_block: - # continuation of a tool block is usually indented; a flush-left line ends it - if line.startswith(" ") or line.startswith("\t"): - continue - in_tool_block = False - - # Echo of the user's own prompt ("> hello") — drop it. - m = USER_PROMPT_RE.match(stripped) - if m: - continue - if stripped in recent_user: - continue - - if _is_noise(line): - continue - - out.append(line) - - # collapse runs of blank lines - collapsed: list[str] = [] - prev_blank = False - for line in out: - blank = not line.strip() - if blank and prev_blank: - continue - collapsed.append(line) - prev_blank = blank - return "\n".join(collapsed).strip("\n") - - -def diff_snapshot(prev: str, curr: str) -> str: - """Return only lines that weren't already present anywhere in the previous snapshot. - - Set-based: handles TUI scrolling and partial redraws without re-sending history. - """ - if not prev: - return curr - if prev == curr: - return "" - prev_set = set(prev.splitlines()) - new_lines = [ln for ln in curr.splitlines() if ln.rstrip() and ln not in prev_set] - return "\n".join(new_lines) - - -def chunk_for_telegram(text: str, limit: int = TG_MAX_CHARS) -> list[str]: - if not text: - return [] - out: list[str] = [] - buf: list[str] = [] - buf_len = 0 - for line in text.splitlines(): - if buf_len + len(line) + 1 > limit and buf: - out.append("\n".join(buf)) - buf, buf_len = [], 0 - while len(line) > limit: - out.append(line[:limit]) - line = line[limit:] - buf.append(line) - buf_len += len(line) + 1 - if buf: - out.append("\n".join(buf)) - return out +async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not _allowed(update, context): + return + await update.message.reply_text(f"pong — webhook mode, tmux session «{TMUX_SESSION}»") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - state: BridgeState = context.application.bot_data["state"] - if update.effective_chat is None or update.effective_chat.id != state.chat_id: + if not _allowed(update, context): return text = (update.message.text or "").strip() if update.message else "" if not text: return - # Remember what we sent so we can suppress its echo from the pane capture. - async with state._lock: - state.recent_user_inputs.append(text) - # Also store reasonable substrings in case the TUI wraps or truncates - if len(text) > 40: - state.recent_user_inputs.append(text[:40]) + logger.info("inbound message: %d chars", len(text)) try: await tmux_send_text(TMUX_SESSION, text) except Exception as exc: # noqa: BLE001 + logger.warning("paste to tmux failed: %s", exc) await update.message.reply_text(f"⚠️ tmux error: {exc}") -async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - state: BridgeState = context.application.bot_data["state"] - if update.effective_chat is None or update.effective_chat.id != state.chat_id: - return - await update.message.reply_text( - f"pong — session '{TMUX_SESSION}', poll {POLL_INTERVAL}s" - ) - - -async def cmd_snapshot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - state: BridgeState = context.application.bot_data["state"] - if update.effective_chat is None or update.effective_chat.id != state.chat_id: - return - try: - snap = await tmux_capture(TMUX_SESSION) - except Exception as exc: # noqa: BLE001 - await update.message.reply_text(f"⚠️ tmux error: {exc}") - return - async with state._lock: - cleaned = clean_text(snap, state.recent_user_inputs) - state.last_snapshot = snap # reset baseline so poller doesn't resend - for part in chunk_for_telegram(cleaned) or ["(nothing to show)"]: - await update.message.reply_text(part) - - -async def poll_and_forward(application: Application) -> None: - state: BridgeState = application.bot_data["state"] - bot = application.bot - logger = logging.getLogger("bridge.poll") - while True: - await asyncio.sleep(POLL_INTERVAL) - # Stability check: capture twice, ~1.5s apart. If pane still changes, assistant - # is still streaming — skip this tick and try next time. - try: - snap1 = await tmux_capture(TMUX_SESSION) - except Exception as exc: # noqa: BLE001 - logger.warning("capture failed: %s", exc) - continue - await asyncio.sleep(1.5) - try: - snap2 = await tmux_capture(TMUX_SESSION) - except Exception as exc: # noqa: BLE001 - logger.warning("capture failed: %s", exc) - continue - if snap1 != snap2: - # still being written — don't send partials - continue - snapshot = snap2 - - async with state._lock: - prev = state.last_snapshot - state.last_snapshot = snapshot - recent_user_copy = list(state.recent_user_inputs) - recently_sent_copy = list(state.recently_sent_lines) - - raw_new = diff_snapshot(prev, snapshot) - new_text = clean_text(raw_new, collections.deque(recent_user_copy)) - if not new_text: - continue - - # Line-level dedup vs. what we already shipped: drop lines that are - # substring-equivalent to a recently sent one (handles streaming dupes). - deduped: list[str] = [] - for line in new_text.splitlines(): - s = line.rstrip() - if not s.strip(): - deduped.append(line) - continue - ss = s.strip() - is_dup = False - for past in recently_sent_copy: - if ss == past: - is_dup = True - break - if len(ss) >= 15 and len(past) >= 15 and (ss in past or past in ss): - is_dup = True - break - if is_dup: - continue - deduped.append(line) - recently_sent_copy.append(ss) - - async with state._lock: - state.recently_sent_lines.clear() - state.recently_sent_lines.extend(recently_sent_copy[-400:]) - - new_text = "\n".join(deduped).strip("\n") - if not new_text: - continue - - async with state._lock: - if new_text == state.last_sent_text: - continue - state.last_sent_text = new_text - - # Cap total outbound per tick so a big burst doesn't flood Telegram. - if len(new_text) > MAX_SEND_PER_TICK: - keep = new_text[-MAX_SEND_PER_TICK:] - # snap to next newline to avoid cutting mid-line - nl = keep.find("\n") - if 0 <= nl < 200: - keep = keep[nl + 1 :] - dropped = len(new_text) - len(keep) - new_text = f"… (+{dropped} chars earlier)\n{keep}" - - for part in chunk_for_telegram(new_text): - try: - await bot.send_message( - chat_id=state.chat_id, - text=part, - disable_notification=True, - ) - except Exception as exc: # noqa: BLE001 - logger.warning("telegram send failed: %s", exc) - break - - -async def on_startup(application: Application) -> None: - state: BridgeState = application.bot_data["state"] - try: - state.last_snapshot = await tmux_capture(TMUX_SESSION) - except Exception: # noqa: BLE001 - state.last_snapshot = "" - application.bot_data["poll_task"] = asyncio.create_task( - poll_and_forward(application), name="bridge-poll" - ) - - -async def on_shutdown(application: Application) -> None: - task = application.bot_data.get("poll_task") - if task: - task.cancel() - - def main() -> int: logging.basicConfig( level=logging.INFO, @@ -387,44 +114,35 @@ def main() -> int: env = {**os.environ, **load_env(ENV_FILE)} token = env.get("TELEGRAM_BOT_TOKEN", "").strip() chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip() + secret = env.get("TELEGRAM_WEBHOOK_SECRET", "").strip() + webhook_url = env.get("TELEGRAM_WEBHOOK_URL", "https://food-market.zat.kz/tg-webhook").strip() if not token or not chat_id_raw: - print( - "ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID must be set in " - f"{ENV_FILE} or environment", - file=sys.stderr, - ) + print("ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID required", file=sys.stderr) return 78 try: chat_id = int(chat_id_raw) except ValueError: - print(f"ERROR: TELEGRAM_CHAT_ID must be an integer, got: {chat_id_raw!r}", - file=sys.stderr) + print(f"ERROR: TELEGRAM_CHAT_ID must be int, got {chat_id_raw!r}", file=sys.stderr) return 78 + if not secret: + logger.warning("TELEGRAM_WEBHOOK_SECRET is empty — webhook is unauthenticated") - check = subprocess.run( - ["tmux", "has-session", "-t", TMUX_SESSION], - capture_output=True, text=True, - ) - if check.returncode != 0: - print( - f"WARNING: tmux session '{TMUX_SESSION}' not found — bridge will run " - "but send/capture will fail until the session is created.", - file=sys.stderr, - ) - - application = ( - ApplicationBuilder() - .token(token) - .post_init(on_startup) - .post_shutdown(on_shutdown) - .build() - ) - application.bot_data["state"] = BridgeState(chat_id=chat_id) + application = ApplicationBuilder().token(token).build() + application.bot_data["chat_id"] = chat_id application.add_handler(CommandHandler("ping", cmd_ping)) - application.add_handler(CommandHandler("snapshot", cmd_snapshot)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) - application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None) + logger.info("starting webhook listener on %s:%d → %s", LISTEN_HOST, LISTEN_PORT, webhook_url) + application.run_webhook( + listen=LISTEN_HOST, + port=LISTEN_PORT, + url_path=WEBHOOK_PATH.lstrip("/"), + webhook_url=webhook_url, + secret_token=secret or None, + allowed_updates=Update.ALL_TYPES, + drop_pending_updates=False, + stop_signals=None, + ) return 0 diff --git a/deploy/telegram-bridge/cc-tg-notify-stop b/deploy/telegram-bridge/cc-tg-notify-stop new file mode 100755 index 0000000..05da11c --- /dev/null +++ b/deploy/telegram-bridge/cc-tg-notify-stop @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +# Claude Code Stop hook: вытаскивает финальный assistant-ответ из transcript'а +# и пушит в Telegram. Устанавливается на /usr/local/bin/cc-tg-notify-stop. +# +# Hook runtime передаёт JSON на stdin с полем .transcript_path; раньше это +# приходило как $CLAUDE_TRANSCRIPT_PATH env-var, но в новых версиях стрим +# переехал в stdin. Поддерживаем оба варианта. +# +# Конфиг — /etc/food-market/telegram.env (TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID). +# Логи — /var/log/cc-tg-notify.log (rotated externally). + +set -u +ENV_FILE="/etc/food-market/telegram.env" +LOG_FILE="${CC_TG_LOG:-/var/log/cc-tg-notify.log}" +PROJECT_TAG="${CC_TG_TAG:-food-market}" +MAX_CHUNK=4000 + +log() { printf '%s [stop-hook] %s\n' "$(date -Is)" "$*" >>"$LOG_FILE" 2>/dev/null || true; } + +if [[ -r "$ENV_FILE" ]]; then + # shellcheck disable=SC1090 + set -a; source "$ENV_FILE"; set +a +fi +TOKEN="${TELEGRAM_BOT_TOKEN:-}" +CHAT_ID="${TELEGRAM_CHAT_ID:-}" +if [[ -z "$TOKEN" || -z "$CHAT_ID" ]]; then + log "missing TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID" + exit 0 +fi + +# Читаем JSON со stdin (если пришёл) или берём env-vars (legacy). +INPUT_JSON="" +if [[ -t 0 ]]; then + INPUT_JSON="" +else + INPUT_JSON="$(cat)" +fi + +TRANSCRIPT="${CLAUDE_TRANSCRIPT_PATH:-}" +if [[ -z "$TRANSCRIPT" && -n "$INPUT_JSON" ]]; then + TRANSCRIPT="$(printf '%s' "$INPUT_JSON" | jq -r '.transcript_path // empty' 2>/dev/null || true)" +fi +if [[ -z "$TRANSCRIPT" || ! -r "$TRANSCRIPT" ]]; then + log "no transcript path (stdin=${#INPUT_JSON} chars, env=${CLAUDE_TRANSCRIPT_PATH:-unset})" + exit 0 +fi + +# Последняя assistant-запись с непустым text-блоком. JSONL: одна запись на строку. +TEXT="$(jq -r ' + select(.type == "assistant") + | .message.content[]? + | select(.type == "text" and (.text // "" | length) > 0) + | .text +' "$TRANSCRIPT" 2>/dev/null \ + | awk 'BEGIN{RS=""}{a=$0} END{print a}')" + +# awk выше склеивает все записи в одну; нам нужна именно ПОСЛЕДНЯЯ assistant-запись, +# поэтому делаем второй проход: берём индекс последней записи и достаём её text-блоки. +LAST_TEXT="$(jq -s -r ' + map(select(.type == "assistant")) | last as $m + | ($m.message.content // []) + | map(select(.type == "text" and (.text // "" | length) > 0) | .text) + | join("\n") +' "$TRANSCRIPT" 2>/dev/null)" + +if [[ -n "$LAST_TEXT" ]]; then TEXT="$LAST_TEXT"; fi + +if [[ -z "$TEXT" ]]; then + log "no text in last assistant turn (only tool calls?)" + exit 0 +fi + +# Чанкуем по строкам с лимитом MAX_CHUNK; первый чанк — с префиксом. +PREFIX="🤖 [${PROJECT_TAG}]" +send_chunk() { + local body="$1" + curl -fsS -m 15 -X POST "https://api.telegram.org/bot${TOKEN}/sendMessage" \ + --data-urlencode "chat_id=${CHAT_ID}" \ + --data-urlencode "text=${body}" \ + --data-urlencode "disable_web_page_preview=true" \ + >/dev/null 2>&1 || log "send failed (curl rc=$?)" +} + +CHUNK="$PREFIX"$'\n' +EMITTED=0 +while IFS= read -r line; do + if (( ${#CHUNK} + ${#line} + 1 > MAX_CHUNK )); then + send_chunk "$CHUNK" + EMITTED=$((EMITTED+1)) + CHUNK="" + fi + CHUNK+="$line"$'\n' +done <<<"$TEXT" +if [[ -n "$CHUNK" ]]; then + send_chunk "$CHUNK" + EMITTED=$((EMITTED+1)) +fi +log "sent $EMITTED chunk(s), text=${#TEXT} chars" +exit 0