"""Telegram <-> tmux bridge for controlling a local Claude Code session from a phone. Reads creds from /etc/food-market/telegram.env (TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID). Only the single whitelisted chat_id is allowed; everything else is silently ignored. Inbound: each Telegram message is typed into tmux session 'claude' via `tmux send-keys -t claude -l ` followed by an Enter keypress. Outbound: every poll_interval seconds, capture the current pane, diff against the last snapshot, filter TUI noise (box-drawing, spinners, the user's own echoed prompt), then send any remaining text as plain Telegram messages. """ from __future__ import annotations import asyncio import collections import logging import os import re import subprocess import sys from dataclasses import dataclass, field from pathlib import Path from telegram import Update from telegram.ext import ( Application, ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ) ENV_FILE = Path("/etc/food-market/telegram.env") TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude") POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SEC", "8")) CAPTURE_HISTORY = int(os.environ.get("CAPTURE_HISTORY_LINES", "200")) TG_MAX_CHARS = 3500 MAX_SEND_PER_TICK = int(os.environ.get("MAX_SEND_CHARS_PER_TICK", "900")) ANSI_RE = re.compile(r"\x1b\[[0-9;?]*[a-zA-Z]") BOX_CHARS = set("╭╮╰╯│─┌┐└┘├┤┬┴┼║═╔╗╚╝╠╣╦╩╬▌▐█▀▄") SPINNER_CHARS = set("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⣾⣽⣻⢿⡿⣟⣯⣷") # Claude Code TUI markers TOOL_CALL_RE = re.compile(r"^\s*[⏺●⏻◯◎⬤]\s+\S") TOOL_RESULT_RE = re.compile(r"^\s*⎿") USER_PROMPT_RE = re.compile(r"^>\s?(.*)$") def load_env(path: Path) -> dict[str, str]: out: dict[str, str] = {} if not path.exists(): return out for raw in path.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, value = line.partition("=") out[key.strip()] = value.strip().strip('"').strip("'") return out @dataclass class BridgeState: chat_id: int last_snapshot: str = "" last_sent_text: str = "" recent_user_inputs: collections.deque = field( default_factory=lambda: collections.deque(maxlen=50) ) recently_sent_lines: collections.deque = field( default_factory=lambda: collections.deque(maxlen=400) ) _lock: asyncio.Lock = field(default_factory=asyncio.Lock) async def tmux_send_text(session: str, text: str) -> None: proc = await asyncio.create_subprocess_exec( "tmux", "send-keys", "-t", session, "-l", text, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}") proc = await asyncio.create_subprocess_exec( "tmux", "send-keys", "-t", session, "Enter", stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) _, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}") async def tmux_capture(session: str) -> str: proc = await asyncio.create_subprocess_exec( "tmux", "capture-pane", "-t", session, "-p", "-S", f"-{CAPTURE_HISTORY}", stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"tmux capture-pane failed: {stderr.decode().strip()}") return stdout.decode("utf-8", errors="replace").rstrip("\n") def _strip_box(line: str) -> str: # Drop leading/trailing box-drawing chars and their padding. while line and (line[0] in BOX_CHARS or (line[0] == " " and len(line) > 1 and line[1] in BOX_CHARS)): line = line[1:].lstrip() if not line: break while line and (line[-1] in BOX_CHARS or (line[-1] == " " and len(line) > 1 and line[-2] in BOX_CHARS)): line = line[:-1].rstrip() if not line: break return line def _is_noise(line: str) -> bool: s = line.strip() if not s: return True # Lines made of only box/spinner/decoration chars + spaces. if all(c in BOX_CHARS or c in SPINNER_CHARS or c.isspace() for c in s): return True # Claude Code TUI hints. lowered = s.lower() if "shift+tab" in lowered or "bypass permissions" in lowered: return True if lowered.startswith("? for shortcuts"): return True # Spinner + status line ("✻ Thinking…", "· Pondering…"). if s.startswith(("✻", "✽", "✢", "·")) and len(s) < 80: return True # Typing indicator prompt ("> " empty or near-empty input box). if s.startswith(">") and len(s) <= 2: return True return False def clean_text(text: str, recent_user: collections.deque | None = None) -> str: """Strip TUI noise, tool-call blocks and echoed user input. Only the assistant's prose reply should survive. """ recent_user = recent_user if recent_user is not None else collections.deque() out: list[str] = [] in_tool_block = False for raw in text.splitlines(): line = ANSI_RE.sub("", raw).rstrip() line = _strip_box(line) stripped = line.strip() if not stripped: in_tool_block = False out.append("") continue # Tool call / tool result blocks — skip the header and any indented follow-ups. if TOOL_CALL_RE.match(line) or TOOL_RESULT_RE.match(line): in_tool_block = True continue if in_tool_block: # continuation of a tool block is usually indented; a flush-left line ends it if line.startswith(" ") or line.startswith("\t"): continue in_tool_block = False # Echo of the user's own prompt ("> hello") — drop it. m = USER_PROMPT_RE.match(stripped) if m: continue if stripped in recent_user: continue if _is_noise(line): continue out.append(line) # collapse runs of blank lines collapsed: list[str] = [] prev_blank = False for line in out: blank = not line.strip() if blank and prev_blank: continue collapsed.append(line) prev_blank = blank return "\n".join(collapsed).strip("\n") def diff_snapshot(prev: str, curr: str) -> str: """Return only lines that weren't already present anywhere in the previous snapshot. Set-based: handles TUI scrolling and partial redraws without re-sending history. """ if not prev: return curr if prev == curr: return "" prev_set = set(prev.splitlines()) new_lines = [ln for ln in curr.splitlines() if ln.rstrip() and ln not in prev_set] return "\n".join(new_lines) def chunk_for_telegram(text: str, limit: int = TG_MAX_CHARS) -> list[str]: if not text: return [] out: list[str] = [] buf: list[str] = [] buf_len = 0 for line in text.splitlines(): if buf_len + len(line) + 1 > limit and buf: out.append("\n".join(buf)) buf, buf_len = [], 0 while len(line) > limit: out.append(line[:limit]) line = line[limit:] buf.append(line) buf_len += len(line) + 1 if buf: out.append("\n".join(buf)) return out async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: state: BridgeState = context.application.bot_data["state"] if update.effective_chat is None or update.effective_chat.id != state.chat_id: return text = (update.message.text or "").strip() if update.message else "" if not text: return # Remember what we sent so we can suppress its echo from the pane capture. async with state._lock: state.recent_user_inputs.append(text) # Also store reasonable substrings in case the TUI wraps or truncates if len(text) > 40: state.recent_user_inputs.append(text[:40]) try: await tmux_send_text(TMUX_SESSION, text) except Exception as exc: # noqa: BLE001 await update.message.reply_text(f"⚠️ tmux error: {exc}") async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: state: BridgeState = context.application.bot_data["state"] if update.effective_chat is None or update.effective_chat.id != state.chat_id: return await update.message.reply_text( f"pong — session '{TMUX_SESSION}', poll {POLL_INTERVAL}s" ) async def cmd_snapshot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: state: BridgeState = context.application.bot_data["state"] if update.effective_chat is None or update.effective_chat.id != state.chat_id: return try: snap = await tmux_capture(TMUX_SESSION) except Exception as exc: # noqa: BLE001 await update.message.reply_text(f"⚠️ tmux error: {exc}") return async with state._lock: cleaned = clean_text(snap, state.recent_user_inputs) state.last_snapshot = snap # reset baseline so poller doesn't resend for part in chunk_for_telegram(cleaned) or ["(nothing to show)"]: await update.message.reply_text(part) async def poll_and_forward(application: Application) -> None: state: BridgeState = application.bot_data["state"] bot = application.bot logger = logging.getLogger("bridge.poll") while True: await asyncio.sleep(POLL_INTERVAL) # Stability check: capture twice, ~1.5s apart. If pane still changes, assistant # is still streaming — skip this tick and try next time. try: snap1 = await tmux_capture(TMUX_SESSION) except Exception as exc: # noqa: BLE001 logger.warning("capture failed: %s", exc) continue await asyncio.sleep(1.5) try: snap2 = await tmux_capture(TMUX_SESSION) except Exception as exc: # noqa: BLE001 logger.warning("capture failed: %s", exc) continue if snap1 != snap2: # still being written — don't send partials continue snapshot = snap2 async with state._lock: prev = state.last_snapshot state.last_snapshot = snapshot recent_user_copy = list(state.recent_user_inputs) recently_sent_copy = list(state.recently_sent_lines) raw_new = diff_snapshot(prev, snapshot) new_text = clean_text(raw_new, collections.deque(recent_user_copy)) if not new_text: continue # Line-level dedup vs. what we already shipped: drop lines that are # substring-equivalent to a recently sent one (handles streaming dupes). deduped: list[str] = [] for line in new_text.splitlines(): s = line.rstrip() if not s.strip(): deduped.append(line) continue ss = s.strip() is_dup = False for past in recently_sent_copy: if ss == past: is_dup = True break if len(ss) >= 15 and len(past) >= 15 and (ss in past or past in ss): is_dup = True break if is_dup: continue deduped.append(line) recently_sent_copy.append(ss) async with state._lock: state.recently_sent_lines.clear() state.recently_sent_lines.extend(recently_sent_copy[-400:]) new_text = "\n".join(deduped).strip("\n") if not new_text: continue async with state._lock: if new_text == state.last_sent_text: continue state.last_sent_text = new_text # Cap total outbound per tick so a big burst doesn't flood Telegram. if len(new_text) > MAX_SEND_PER_TICK: keep = new_text[-MAX_SEND_PER_TICK:] # snap to next newline to avoid cutting mid-line nl = keep.find("\n") if 0 <= nl < 200: keep = keep[nl + 1 :] dropped = len(new_text) - len(keep) new_text = f"… (+{dropped} chars earlier)\n{keep}" for part in chunk_for_telegram(new_text): try: await bot.send_message( chat_id=state.chat_id, text=part, disable_notification=True, ) except Exception as exc: # noqa: BLE001 logger.warning("telegram send failed: %s", exc) break async def on_startup(application: Application) -> None: state: BridgeState = application.bot_data["state"] try: state.last_snapshot = await tmux_capture(TMUX_SESSION) except Exception: # noqa: BLE001 state.last_snapshot = "" application.bot_data["poll_task"] = asyncio.create_task( poll_and_forward(application), name="bridge-poll" ) async def on_shutdown(application: Application) -> None: task = application.bot_data.get("poll_task") if task: task.cancel() def main() -> int: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) env = {**os.environ, **load_env(ENV_FILE)} token = env.get("TELEGRAM_BOT_TOKEN", "").strip() chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip() if not token or not chat_id_raw: print( "ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID must be set in " f"{ENV_FILE} or environment", file=sys.stderr, ) return 78 try: chat_id = int(chat_id_raw) except ValueError: print(f"ERROR: TELEGRAM_CHAT_ID must be an integer, got: {chat_id_raw!r}", file=sys.stderr) return 78 check = subprocess.run( ["tmux", "has-session", "-t", TMUX_SESSION], capture_output=True, text=True, ) if check.returncode != 0: print( f"WARNING: tmux session '{TMUX_SESSION}' not found — bridge will run " "but send/capture will fail until the session is created.", file=sys.stderr, ) application = ( ApplicationBuilder() .token(token) .post_init(on_startup) .post_shutdown(on_shutdown) .build() ) application.bot_data["state"] = BridgeState(chat_id=chat_id) application.add_handler(CommandHandler("ping", cmd_ping)) application.add_handler(CommandHandler("snapshot", cmd_snapshot)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None) return 0 if __name__ == "__main__": sys.exit(main())