feat(infra): event-driven Telegram bridge — webhook + Stop hook
Some checks are pending
CI / POS (WPF, Windows) (push) Waiting to run
CI / Backend (.NET 8) (push) Successful in 1m2s
CI / Web (React + Vite) (push) Successful in 37s

Полный отказ от 2-секундного polling tmux'а в пользу реактивной схемы:

OUTBOUND (server-Claude → Telegram) через Stop hook:
- /usr/local/bin/cc-tg-notify-stop (Bash) читает transcript из stdin
  (Claude Code передаёт {transcript_path}), достаёт последнюю
  assistant-запись с непустым text-блоком (jq), чанкует ≤4000 символов
  с префиксом «🤖 [food-market]», POST'ит в Telegram через curl.
  Логи /var/log/cc-tg-notify.log. Если turn без текстового ответа
  (только tool calls) — выходит молча.
- Зарегистрирован в ~/.claude/settings.json под Stop event с пустым
  matcher (все turns).

INBOUND (Telegram → bridge → tmux) через webhook:
- bridge.py переписан с run_polling на run_webhook listening
  127.0.0.1:8765 на /tg-webhook. python-telegram-bot[webhooks]
  (tornado) ставится через pip.
- При старте сам делает setWebhook к Telegram API с secret_token
  из TELEGRAM_WEBHOOK_SECRET (osprandom 24 hex), Telegram присылает
  его обратно в X-Telegram-Bot-Api-Secret-Token — PTB валидирует
  до вызова handler'ов.
- Сохранены: whitelist по chat_id, paste-в-tmux через
  send-keys -l + Enter, /ping команда. Удалён poll_and_forward,
  diff/clean логика, recently_sent_lines дедуп — больше не нужны.

Nginx: новый location = /tg-webhook на food-market-stage.conf,
проксирует на 127.0.0.1:8765 с прокидыванием X-Telegram-Bot-Api-
Secret-Token. Smoke-test: curl с неверным секретом → 403.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
nns 2026-04-26 12:38:44 +05:00
parent 08f03fd17a
commit 7c40c11595
2 changed files with 154 additions and 337 deletions

View file

@ -1,31 +1,35 @@
"""Telegram <-> tmux bridge for controlling a local Claude Code session from a phone.
"""Telegram bridge: webhook receiver, paste-to-tmux only.
Reads creds from /etc/food-market/telegram.env (TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID).
Only the single whitelisted chat_id is allowed; everything else is silently ignored.
Refactored from the original 2-second polling loop to a fully event-driven
design: outgoing assistant messages are now pushed by the Claude Code Stop
hook (/usr/local/bin/cc-tg-notify-stop). This bridge only handles the
inbound side Telegram tmux paste.
Inbound: each Telegram message is typed into tmux session 'claude' via `tmux send-keys
-t claude -l <text>` followed by an Enter keypress.
Outbound: every poll_interval seconds, capture the current pane, diff against the last
snapshot, filter TUI noise (box-drawing, spinners, the user's own echoed prompt), then
send any remaining text as plain Telegram messages.
Config (/etc/food-market/telegram.env or env vars):
TELEGRAM_BOT_TOKEN bot token (required)
TELEGRAM_CHAT_ID single whitelisted chat id (required)
TELEGRAM_WEBHOOK_URL public URL Telegram should POST to
(default: https://food-market.zat.kz/tg-webhook)
TELEGRAM_WEBHOOK_SECRET random secret; bridge validates the
X-Telegram-Bot-Api-Secret-Token header on every
incoming request and Telegram sends it back so
third parties can't forge updates
TMUX_SESSION tmux session to paste into (default: claude)
WEBHOOK_LISTEN_HOST local bind host (default: 127.0.0.1)
WEBHOOK_LISTEN_PORT local bind port (default: 8765)
"""
from __future__ import annotations
import asyncio
import collections
import logging
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from telegram import Update
from telegram.ext import (
Application,
ApplicationBuilder,
CommandHandler,
ContextTypes,
@ -35,18 +39,11 @@ from telegram.ext import (
ENV_FILE = Path("/etc/food-market/telegram.env")
TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude")
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SEC", "8"))
CAPTURE_HISTORY = int(os.environ.get("CAPTURE_HISTORY_LINES", "200"))
TG_MAX_CHARS = 3500
MAX_SEND_PER_TICK = int(os.environ.get("MAX_SEND_CHARS_PER_TICK", "900"))
LISTEN_HOST = os.environ.get("WEBHOOK_LISTEN_HOST", "127.0.0.1")
LISTEN_PORT = int(os.environ.get("WEBHOOK_LISTEN_PORT", "8765"))
WEBHOOK_PATH = "/tg-webhook"
ANSI_RE = re.compile(r"\x1b\[[0-9;?]*[a-zA-Z]")
BOX_CHARS = set("╭╮╰╯│─┌┐└┘├┤┬┴┼║═╔╗╚╝╠╣╦╩╬▌▐█▀▄")
SPINNER_CHARS = set("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⣾⣽⣻⢿⡿⣟⣯⣷")
# Claude Code TUI markers
TOOL_CALL_RE = re.compile(r"^\s*[⏺●⏻◯◎⬤]\s+\S")
TOOL_RESULT_RE = re.compile(r"^\s*⎿")
USER_PROMPT_RE = re.compile(r"^>\s?(.*)$")
logger = logging.getLogger("bridge")
def load_env(path: Path) -> dict[str, str]:
@ -62,21 +59,12 @@ def load_env(path: Path) -> dict[str, str]:
return out
@dataclass
class BridgeState:
chat_id: int
last_snapshot: str = ""
last_sent_text: str = ""
recent_user_inputs: collections.deque = field(
default_factory=lambda: collections.deque(maxlen=50)
)
recently_sent_lines: collections.deque = field(
default_factory=lambda: collections.deque(maxlen=400)
)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def tmux_send_text(session: str, text: str) -> None:
"""Pastes one Telegram message verbatim into the tmux session, then Enter.
Uses `send-keys -l` for literal paste no key-binding interpretation,
works for arbitrary text including unicode and special chars.
"""
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "-l", text,
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
@ -84,7 +72,6 @@ async def tmux_send_text(session: str, text: str) -> None:
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}")
proc = await asyncio.create_subprocess_exec(
"tmux", "send-keys", "-t", session, "Enter",
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
@ -94,291 +81,31 @@ async def tmux_send_text(session: str, text: str) -> None:
raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}")
async def tmux_capture(session: str) -> str:
proc = await asyncio.create_subprocess_exec(
"tmux", "capture-pane", "-t", session, "-p", "-S", f"-{CAPTURE_HISTORY}",
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f"tmux capture-pane failed: {stderr.decode().strip()}")
return stdout.decode("utf-8", errors="replace").rstrip("\n")
def _allowed(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
chat_id = context.application.bot_data["chat_id"]
return update.effective_chat is not None and update.effective_chat.id == chat_id
def _strip_box(line: str) -> str:
# Drop leading/trailing box-drawing chars and their padding.
while line and (line[0] in BOX_CHARS or (line[0] == " " and len(line) > 1 and line[1] in BOX_CHARS)):
line = line[1:].lstrip()
if not line:
break
while line and (line[-1] in BOX_CHARS or (line[-1] == " " and len(line) > 1 and line[-2] in BOX_CHARS)):
line = line[:-1].rstrip()
if not line:
break
return line
def _is_noise(line: str) -> bool:
s = line.strip()
if not s:
return True
# Lines made of only box/spinner/decoration chars + spaces.
if all(c in BOX_CHARS or c in SPINNER_CHARS or c.isspace() for c in s):
return True
# Claude Code TUI hints.
lowered = s.lower()
if "shift+tab" in lowered or "bypass permissions" in lowered:
return True
if lowered.startswith("? for shortcuts"):
return True
# Spinner + status line ("✻ Thinking…", "· Pondering…").
if s.startswith(("", "", "", "·")) and len(s) < 80:
return True
# Typing indicator prompt ("> " empty or near-empty input box).
if s.startswith(">") and len(s) <= 2:
return True
return False
def clean_text(text: str, recent_user: collections.deque | None = None) -> str:
"""Strip TUI noise, tool-call blocks and echoed user input.
Only the assistant's prose reply should survive.
"""
recent_user = recent_user if recent_user is not None else collections.deque()
out: list[str] = []
in_tool_block = False
for raw in text.splitlines():
line = ANSI_RE.sub("", raw).rstrip()
line = _strip_box(line)
stripped = line.strip()
if not stripped:
in_tool_block = False
out.append("")
continue
# Tool call / tool result blocks — skip the header and any indented follow-ups.
if TOOL_CALL_RE.match(line) or TOOL_RESULT_RE.match(line):
in_tool_block = True
continue
if in_tool_block:
# continuation of a tool block is usually indented; a flush-left line ends it
if line.startswith(" ") or line.startswith("\t"):
continue
in_tool_block = False
# Echo of the user's own prompt ("> hello") — drop it.
m = USER_PROMPT_RE.match(stripped)
if m:
continue
if stripped in recent_user:
continue
if _is_noise(line):
continue
out.append(line)
# collapse runs of blank lines
collapsed: list[str] = []
prev_blank = False
for line in out:
blank = not line.strip()
if blank and prev_blank:
continue
collapsed.append(line)
prev_blank = blank
return "\n".join(collapsed).strip("\n")
def diff_snapshot(prev: str, curr: str) -> str:
"""Return only lines that weren't already present anywhere in the previous snapshot.
Set-based: handles TUI scrolling and partial redraws without re-sending history.
"""
if not prev:
return curr
if prev == curr:
return ""
prev_set = set(prev.splitlines())
new_lines = [ln for ln in curr.splitlines() if ln.rstrip() and ln not in prev_set]
return "\n".join(new_lines)
def chunk_for_telegram(text: str, limit: int = TG_MAX_CHARS) -> list[str]:
if not text:
return []
out: list[str] = []
buf: list[str] = []
buf_len = 0
for line in text.splitlines():
if buf_len + len(line) + 1 > limit and buf:
out.append("\n".join(buf))
buf, buf_len = [], 0
while len(line) > limit:
out.append(line[:limit])
line = line[limit:]
buf.append(line)
buf_len += len(line) + 1
if buf:
out.append("\n".join(buf))
return out
async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not _allowed(update, context):
return
await update.message.reply_text(f"pong — webhook mode, tmux session «{TMUX_SESSION}»")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
state: BridgeState = context.application.bot_data["state"]
if update.effective_chat is None or update.effective_chat.id != state.chat_id:
if not _allowed(update, context):
return
text = (update.message.text or "").strip() if update.message else ""
if not text:
return
# Remember what we sent so we can suppress its echo from the pane capture.
async with state._lock:
state.recent_user_inputs.append(text)
# Also store reasonable substrings in case the TUI wraps or truncates
if len(text) > 40:
state.recent_user_inputs.append(text[:40])
logger.info("inbound message: %d chars", len(text))
try:
await tmux_send_text(TMUX_SESSION, text)
except Exception as exc: # noqa: BLE001
logger.warning("paste to tmux failed: %s", exc)
await update.message.reply_text(f"⚠️ tmux error: {exc}")
async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
state: BridgeState = context.application.bot_data["state"]
if update.effective_chat is None or update.effective_chat.id != state.chat_id:
return
await update.message.reply_text(
f"pong — session '{TMUX_SESSION}', poll {POLL_INTERVAL}s"
)
async def cmd_snapshot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
state: BridgeState = context.application.bot_data["state"]
if update.effective_chat is None or update.effective_chat.id != state.chat_id:
return
try:
snap = await tmux_capture(TMUX_SESSION)
except Exception as exc: # noqa: BLE001
await update.message.reply_text(f"⚠️ tmux error: {exc}")
return
async with state._lock:
cleaned = clean_text(snap, state.recent_user_inputs)
state.last_snapshot = snap # reset baseline so poller doesn't resend
for part in chunk_for_telegram(cleaned) or ["(nothing to show)"]:
await update.message.reply_text(part)
async def poll_and_forward(application: Application) -> None:
state: BridgeState = application.bot_data["state"]
bot = application.bot
logger = logging.getLogger("bridge.poll")
while True:
await asyncio.sleep(POLL_INTERVAL)
# Stability check: capture twice, ~1.5s apart. If pane still changes, assistant
# is still streaming — skip this tick and try next time.
try:
snap1 = await tmux_capture(TMUX_SESSION)
except Exception as exc: # noqa: BLE001
logger.warning("capture failed: %s", exc)
continue
await asyncio.sleep(1.5)
try:
snap2 = await tmux_capture(TMUX_SESSION)
except Exception as exc: # noqa: BLE001
logger.warning("capture failed: %s", exc)
continue
if snap1 != snap2:
# still being written — don't send partials
continue
snapshot = snap2
async with state._lock:
prev = state.last_snapshot
state.last_snapshot = snapshot
recent_user_copy = list(state.recent_user_inputs)
recently_sent_copy = list(state.recently_sent_lines)
raw_new = diff_snapshot(prev, snapshot)
new_text = clean_text(raw_new, collections.deque(recent_user_copy))
if not new_text:
continue
# Line-level dedup vs. what we already shipped: drop lines that are
# substring-equivalent to a recently sent one (handles streaming dupes).
deduped: list[str] = []
for line in new_text.splitlines():
s = line.rstrip()
if not s.strip():
deduped.append(line)
continue
ss = s.strip()
is_dup = False
for past in recently_sent_copy:
if ss == past:
is_dup = True
break
if len(ss) >= 15 and len(past) >= 15 and (ss in past or past in ss):
is_dup = True
break
if is_dup:
continue
deduped.append(line)
recently_sent_copy.append(ss)
async with state._lock:
state.recently_sent_lines.clear()
state.recently_sent_lines.extend(recently_sent_copy[-400:])
new_text = "\n".join(deduped).strip("\n")
if not new_text:
continue
async with state._lock:
if new_text == state.last_sent_text:
continue
state.last_sent_text = new_text
# Cap total outbound per tick so a big burst doesn't flood Telegram.
if len(new_text) > MAX_SEND_PER_TICK:
keep = new_text[-MAX_SEND_PER_TICK:]
# snap to next newline to avoid cutting mid-line
nl = keep.find("\n")
if 0 <= nl < 200:
keep = keep[nl + 1 :]
dropped = len(new_text) - len(keep)
new_text = f"… (+{dropped} chars earlier)\n{keep}"
for part in chunk_for_telegram(new_text):
try:
await bot.send_message(
chat_id=state.chat_id,
text=part,
disable_notification=True,
)
except Exception as exc: # noqa: BLE001
logger.warning("telegram send failed: %s", exc)
break
async def on_startup(application: Application) -> None:
state: BridgeState = application.bot_data["state"]
try:
state.last_snapshot = await tmux_capture(TMUX_SESSION)
except Exception: # noqa: BLE001
state.last_snapshot = ""
application.bot_data["poll_task"] = asyncio.create_task(
poll_and_forward(application), name="bridge-poll"
)
async def on_shutdown(application: Application) -> None:
task = application.bot_data.get("poll_task")
if task:
task.cancel()
def main() -> int:
logging.basicConfig(
level=logging.INFO,
@ -387,44 +114,35 @@ def main() -> int:
env = {**os.environ, **load_env(ENV_FILE)}
token = env.get("TELEGRAM_BOT_TOKEN", "").strip()
chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip()
secret = env.get("TELEGRAM_WEBHOOK_SECRET", "").strip()
webhook_url = env.get("TELEGRAM_WEBHOOK_URL", "https://food-market.zat.kz/tg-webhook").strip()
if not token or not chat_id_raw:
print(
"ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID must be set in "
f"{ENV_FILE} or environment",
file=sys.stderr,
)
print("ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID required", file=sys.stderr)
return 78
try:
chat_id = int(chat_id_raw)
except ValueError:
print(f"ERROR: TELEGRAM_CHAT_ID must be an integer, got: {chat_id_raw!r}",
file=sys.stderr)
print(f"ERROR: TELEGRAM_CHAT_ID must be int, got {chat_id_raw!r}", file=sys.stderr)
return 78
if not secret:
logger.warning("TELEGRAM_WEBHOOK_SECRET is empty — webhook is unauthenticated")
check = subprocess.run(
["tmux", "has-session", "-t", TMUX_SESSION],
capture_output=True, text=True,
)
if check.returncode != 0:
print(
f"WARNING: tmux session '{TMUX_SESSION}' not found — bridge will run "
"but send/capture will fail until the session is created.",
file=sys.stderr,
)
application = (
ApplicationBuilder()
.token(token)
.post_init(on_startup)
.post_shutdown(on_shutdown)
.build()
)
application.bot_data["state"] = BridgeState(chat_id=chat_id)
application = ApplicationBuilder().token(token).build()
application.bot_data["chat_id"] = chat_id
application.add_handler(CommandHandler("ping", cmd_ping))
application.add_handler(CommandHandler("snapshot", cmd_snapshot))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None)
logger.info("starting webhook listener on %s:%d%s", LISTEN_HOST, LISTEN_PORT, webhook_url)
application.run_webhook(
listen=LISTEN_HOST,
port=LISTEN_PORT,
url_path=WEBHOOK_PATH.lstrip("/"),
webhook_url=webhook_url,
secret_token=secret or None,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=False,
stop_signals=None,
)
return 0

View file

@ -0,0 +1,99 @@
#!/usr/bin/env bash
# Claude Code Stop hook: вытаскивает финальный assistant-ответ из transcript'а
# и пушит в Telegram. Устанавливается на /usr/local/bin/cc-tg-notify-stop.
#
# Hook runtime передаёт JSON на stdin с полем .transcript_path; раньше это
# приходило как $CLAUDE_TRANSCRIPT_PATH env-var, но в новых версиях стрим
# переехал в stdin. Поддерживаем оба варианта.
#
# Конфиг — /etc/food-market/telegram.env (TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID).
# Логи — /var/log/cc-tg-notify.log (rotated externally).
set -u
ENV_FILE="/etc/food-market/telegram.env"
LOG_FILE="${CC_TG_LOG:-/var/log/cc-tg-notify.log}"
PROJECT_TAG="${CC_TG_TAG:-food-market}"
MAX_CHUNK=4000
log() { printf '%s [stop-hook] %s\n' "$(date -Is)" "$*" >>"$LOG_FILE" 2>/dev/null || true; }
if [[ -r "$ENV_FILE" ]]; then
# shellcheck disable=SC1090
set -a; source "$ENV_FILE"; set +a
fi
TOKEN="${TELEGRAM_BOT_TOKEN:-}"
CHAT_ID="${TELEGRAM_CHAT_ID:-}"
if [[ -z "$TOKEN" || -z "$CHAT_ID" ]]; then
log "missing TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID"
exit 0
fi
# Читаем JSON со stdin (если пришёл) или берём env-vars (legacy).
INPUT_JSON=""
if [[ -t 0 ]]; then
INPUT_JSON=""
else
INPUT_JSON="$(cat)"
fi
TRANSCRIPT="${CLAUDE_TRANSCRIPT_PATH:-}"
if [[ -z "$TRANSCRIPT" && -n "$INPUT_JSON" ]]; then
TRANSCRIPT="$(printf '%s' "$INPUT_JSON" | jq -r '.transcript_path // empty' 2>/dev/null || true)"
fi
if [[ -z "$TRANSCRIPT" || ! -r "$TRANSCRIPT" ]]; then
log "no transcript path (stdin=${#INPUT_JSON} chars, env=${CLAUDE_TRANSCRIPT_PATH:-unset})"
exit 0
fi
# Последняя assistant-запись с непустым text-блоком. JSONL: одна запись на строку.
TEXT="$(jq -r '
select(.type == "assistant")
| .message.content[]?
| select(.type == "text" and (.text // "" | length) > 0)
| .text
' "$TRANSCRIPT" 2>/dev/null \
| awk 'BEGIN{RS=""}{a=$0} END{print a}')"
# awk выше склеивает все записи в одну; нам нужна именно ПОСЛЕДНЯЯ assistant-запись,
# поэтому делаем второй проход: берём индекс последней записи и достаём её text-блоки.
LAST_TEXT="$(jq -s -r '
map(select(.type == "assistant")) | last as $m
| ($m.message.content // [])
| map(select(.type == "text" and (.text // "" | length) > 0) | .text)
| join("\n")
' "$TRANSCRIPT" 2>/dev/null)"
if [[ -n "$LAST_TEXT" ]]; then TEXT="$LAST_TEXT"; fi
if [[ -z "$TEXT" ]]; then
log "no text in last assistant turn (only tool calls?)"
exit 0
fi
# Чанкуем по строкам с лимитом MAX_CHUNK; первый чанк — с префиксом.
PREFIX="🤖 [${PROJECT_TAG}]"
send_chunk() {
local body="$1"
curl -fsS -m 15 -X POST "https://api.telegram.org/bot${TOKEN}/sendMessage" \
--data-urlencode "chat_id=${CHAT_ID}" \
--data-urlencode "text=${body}" \
--data-urlencode "disable_web_page_preview=true" \
>/dev/null 2>&1 || log "send failed (curl rc=$?)"
}
CHUNK="$PREFIX"$'\n'
EMITTED=0
while IFS= read -r line; do
if (( ${#CHUNK} + ${#line} + 1 > MAX_CHUNK )); then
send_chunk "$CHUNK"
EMITTED=$((EMITTED+1))
CHUNK=""
fi
CHUNK+="$line"$'\n'
done <<<"$TEXT"
if [[ -n "$CHUNK" ]]; then
send_chunk "$CHUNK"
EMITTED=$((EMITTED+1))
fi
log "sent $EMITTED chunk(s), text=${#TEXT} chars"
exit 0