Telegram bridge lets me drive the local Claude Code tmux session from my phone — inbound messages are typed into the 'claude' session, pane diffs are streamed back as plain Telegram messages (TUI noise, tool-call blocks, echoed user input and already-sent lines are filtered so only the assistant's actual reply reaches the chat). Deployed as food-market-telegram-bridge.service, reads creds from /etc/food-market/telegram.env (not committed). Also committing the local docker-registry unit for reproducibility — registry:2 on 127.0.0.1:5001, data persisted in /opt/food-market-data/docker-registry. Setup docs in docs/telegram-bridge.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
433 lines
15 KiB
Python
433 lines
15 KiB
Python
"""Telegram <-> tmux bridge for controlling a local Claude Code session from a phone.
|
|
|
|
Reads creds from /etc/food-market/telegram.env (TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID).
|
|
Only the single whitelisted chat_id is allowed; everything else is silently ignored.
|
|
|
|
Inbound: each Telegram message is typed into tmux session 'claude' via `tmux send-keys
|
|
-t claude -l <text>` followed by an Enter keypress.
|
|
|
|
Outbound: every poll_interval seconds, capture the current pane, diff against the last
|
|
snapshot, filter TUI noise (box-drawing, spinners, the user's own echoed prompt), then
|
|
send any remaining text as plain Telegram messages.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import collections
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
from telegram import Update
|
|
from telegram.ext import (
|
|
Application,
|
|
ApplicationBuilder,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
)
|
|
|
|
ENV_FILE = Path("/etc/food-market/telegram.env")
|
|
TMUX_SESSION = os.environ.get("TMUX_SESSION", "claude")
|
|
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SEC", "8"))
|
|
CAPTURE_HISTORY = int(os.environ.get("CAPTURE_HISTORY_LINES", "200"))
|
|
TG_MAX_CHARS = 3500
|
|
MAX_SEND_PER_TICK = int(os.environ.get("MAX_SEND_CHARS_PER_TICK", "900"))
|
|
|
|
ANSI_RE = re.compile(r"\x1b\[[0-9;?]*[a-zA-Z]")
|
|
BOX_CHARS = set("╭╮╰╯│─┌┐└┘├┤┬┴┼║═╔╗╚╝╠╣╦╩╬▌▐█▀▄")
|
|
SPINNER_CHARS = set("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⣾⣽⣻⢿⡿⣟⣯⣷")
|
|
# Claude Code TUI markers
|
|
TOOL_CALL_RE = re.compile(r"^\s*[⏺●⏻◯◎⬤]\s+\S")
|
|
TOOL_RESULT_RE = re.compile(r"^\s*⎿")
|
|
USER_PROMPT_RE = re.compile(r"^>\s?(.*)$")
|
|
|
|
|
|
def load_env(path: Path) -> dict[str, str]:
|
|
out: dict[str, str] = {}
|
|
if not path.exists():
|
|
return out
|
|
for raw in path.read_text().splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
out[key.strip()] = value.strip().strip('"').strip("'")
|
|
return out
|
|
|
|
|
|
@dataclass
|
|
class BridgeState:
|
|
chat_id: int
|
|
last_snapshot: str = ""
|
|
last_sent_text: str = ""
|
|
recent_user_inputs: collections.deque = field(
|
|
default_factory=lambda: collections.deque(maxlen=50)
|
|
)
|
|
recently_sent_lines: collections.deque = field(
|
|
default_factory=lambda: collections.deque(maxlen=400)
|
|
)
|
|
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
|
|
|
|
|
async def tmux_send_text(session: str, text: str) -> None:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"tmux", "send-keys", "-t", session, "-l", text,
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"tmux send-keys -l failed: {stderr.decode().strip()}")
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"tmux", "send-keys", "-t", session, "Enter",
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
|
|
)
|
|
_, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"tmux send-keys Enter failed: {stderr.decode().strip()}")
|
|
|
|
|
|
async def tmux_capture(session: str) -> str:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"tmux", "capture-pane", "-t", session, "-p", "-S", f"-{CAPTURE_HISTORY}",
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(f"tmux capture-pane failed: {stderr.decode().strip()}")
|
|
return stdout.decode("utf-8", errors="replace").rstrip("\n")
|
|
|
|
|
|
def _strip_box(line: str) -> str:
|
|
# Drop leading/trailing box-drawing chars and their padding.
|
|
while line and (line[0] in BOX_CHARS or (line[0] == " " and len(line) > 1 and line[1] in BOX_CHARS)):
|
|
line = line[1:].lstrip()
|
|
if not line:
|
|
break
|
|
while line and (line[-1] in BOX_CHARS or (line[-1] == " " and len(line) > 1 and line[-2] in BOX_CHARS)):
|
|
line = line[:-1].rstrip()
|
|
if not line:
|
|
break
|
|
return line
|
|
|
|
|
|
def _is_noise(line: str) -> bool:
|
|
s = line.strip()
|
|
if not s:
|
|
return True
|
|
# Lines made of only box/spinner/decoration chars + spaces.
|
|
if all(c in BOX_CHARS or c in SPINNER_CHARS or c.isspace() for c in s):
|
|
return True
|
|
# Claude Code TUI hints.
|
|
lowered = s.lower()
|
|
if "shift+tab" in lowered or "bypass permissions" in lowered:
|
|
return True
|
|
if lowered.startswith("? for shortcuts"):
|
|
return True
|
|
# Spinner + status line ("✻ Thinking…", "· Pondering…").
|
|
if s.startswith(("✻", "✽", "✢", "·")) and len(s) < 80:
|
|
return True
|
|
# Typing indicator prompt ("> " empty or near-empty input box).
|
|
if s.startswith(">") and len(s) <= 2:
|
|
return True
|
|
return False
|
|
|
|
|
|
def clean_text(text: str, recent_user: collections.deque | None = None) -> str:
|
|
"""Strip TUI noise, tool-call blocks and echoed user input.
|
|
|
|
Only the assistant's prose reply should survive.
|
|
"""
|
|
recent_user = recent_user if recent_user is not None else collections.deque()
|
|
out: list[str] = []
|
|
in_tool_block = False
|
|
for raw in text.splitlines():
|
|
line = ANSI_RE.sub("", raw).rstrip()
|
|
line = _strip_box(line)
|
|
stripped = line.strip()
|
|
|
|
if not stripped:
|
|
in_tool_block = False
|
|
out.append("")
|
|
continue
|
|
|
|
# Tool call / tool result blocks — skip the header and any indented follow-ups.
|
|
if TOOL_CALL_RE.match(line) or TOOL_RESULT_RE.match(line):
|
|
in_tool_block = True
|
|
continue
|
|
if in_tool_block:
|
|
# continuation of a tool block is usually indented; a flush-left line ends it
|
|
if line.startswith(" ") or line.startswith("\t"):
|
|
continue
|
|
in_tool_block = False
|
|
|
|
# Echo of the user's own prompt ("> hello") — drop it.
|
|
m = USER_PROMPT_RE.match(stripped)
|
|
if m:
|
|
continue
|
|
if stripped in recent_user:
|
|
continue
|
|
|
|
if _is_noise(line):
|
|
continue
|
|
|
|
out.append(line)
|
|
|
|
# collapse runs of blank lines
|
|
collapsed: list[str] = []
|
|
prev_blank = False
|
|
for line in out:
|
|
blank = not line.strip()
|
|
if blank and prev_blank:
|
|
continue
|
|
collapsed.append(line)
|
|
prev_blank = blank
|
|
return "\n".join(collapsed).strip("\n")
|
|
|
|
|
|
def diff_snapshot(prev: str, curr: str) -> str:
|
|
"""Return only lines that weren't already present anywhere in the previous snapshot.
|
|
|
|
Set-based: handles TUI scrolling and partial redraws without re-sending history.
|
|
"""
|
|
if not prev:
|
|
return curr
|
|
if prev == curr:
|
|
return ""
|
|
prev_set = set(prev.splitlines())
|
|
new_lines = [ln for ln in curr.splitlines() if ln.rstrip() and ln not in prev_set]
|
|
return "\n".join(new_lines)
|
|
|
|
|
|
def chunk_for_telegram(text: str, limit: int = TG_MAX_CHARS) -> list[str]:
|
|
if not text:
|
|
return []
|
|
out: list[str] = []
|
|
buf: list[str] = []
|
|
buf_len = 0
|
|
for line in text.splitlines():
|
|
if buf_len + len(line) + 1 > limit and buf:
|
|
out.append("\n".join(buf))
|
|
buf, buf_len = [], 0
|
|
while len(line) > limit:
|
|
out.append(line[:limit])
|
|
line = line[limit:]
|
|
buf.append(line)
|
|
buf_len += len(line) + 1
|
|
if buf:
|
|
out.append("\n".join(buf))
|
|
return out
|
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
state: BridgeState = context.application.bot_data["state"]
|
|
if update.effective_chat is None or update.effective_chat.id != state.chat_id:
|
|
return
|
|
text = (update.message.text or "").strip() if update.message else ""
|
|
if not text:
|
|
return
|
|
# Remember what we sent so we can suppress its echo from the pane capture.
|
|
async with state._lock:
|
|
state.recent_user_inputs.append(text)
|
|
# Also store reasonable substrings in case the TUI wraps or truncates
|
|
if len(text) > 40:
|
|
state.recent_user_inputs.append(text[:40])
|
|
try:
|
|
await tmux_send_text(TMUX_SESSION, text)
|
|
except Exception as exc: # noqa: BLE001
|
|
await update.message.reply_text(f"⚠️ tmux error: {exc}")
|
|
|
|
|
|
async def cmd_ping(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
state: BridgeState = context.application.bot_data["state"]
|
|
if update.effective_chat is None or update.effective_chat.id != state.chat_id:
|
|
return
|
|
await update.message.reply_text(
|
|
f"pong — session '{TMUX_SESSION}', poll {POLL_INTERVAL}s"
|
|
)
|
|
|
|
|
|
async def cmd_snapshot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
state: BridgeState = context.application.bot_data["state"]
|
|
if update.effective_chat is None or update.effective_chat.id != state.chat_id:
|
|
return
|
|
try:
|
|
snap = await tmux_capture(TMUX_SESSION)
|
|
except Exception as exc: # noqa: BLE001
|
|
await update.message.reply_text(f"⚠️ tmux error: {exc}")
|
|
return
|
|
async with state._lock:
|
|
cleaned = clean_text(snap, state.recent_user_inputs)
|
|
state.last_snapshot = snap # reset baseline so poller doesn't resend
|
|
for part in chunk_for_telegram(cleaned) or ["(nothing to show)"]:
|
|
await update.message.reply_text(part)
|
|
|
|
|
|
async def poll_and_forward(application: Application) -> None:
|
|
state: BridgeState = application.bot_data["state"]
|
|
bot = application.bot
|
|
logger = logging.getLogger("bridge.poll")
|
|
while True:
|
|
await asyncio.sleep(POLL_INTERVAL)
|
|
# Stability check: capture twice, ~1.5s apart. If pane still changes, assistant
|
|
# is still streaming — skip this tick and try next time.
|
|
try:
|
|
snap1 = await tmux_capture(TMUX_SESSION)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("capture failed: %s", exc)
|
|
continue
|
|
await asyncio.sleep(1.5)
|
|
try:
|
|
snap2 = await tmux_capture(TMUX_SESSION)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("capture failed: %s", exc)
|
|
continue
|
|
if snap1 != snap2:
|
|
# still being written — don't send partials
|
|
continue
|
|
snapshot = snap2
|
|
|
|
async with state._lock:
|
|
prev = state.last_snapshot
|
|
state.last_snapshot = snapshot
|
|
recent_user_copy = list(state.recent_user_inputs)
|
|
recently_sent_copy = list(state.recently_sent_lines)
|
|
|
|
raw_new = diff_snapshot(prev, snapshot)
|
|
new_text = clean_text(raw_new, collections.deque(recent_user_copy))
|
|
if not new_text:
|
|
continue
|
|
|
|
# Line-level dedup vs. what we already shipped: drop lines that are
|
|
# substring-equivalent to a recently sent one (handles streaming dupes).
|
|
deduped: list[str] = []
|
|
for line in new_text.splitlines():
|
|
s = line.rstrip()
|
|
if not s.strip():
|
|
deduped.append(line)
|
|
continue
|
|
ss = s.strip()
|
|
is_dup = False
|
|
for past in recently_sent_copy:
|
|
if ss == past:
|
|
is_dup = True
|
|
break
|
|
if len(ss) >= 15 and len(past) >= 15 and (ss in past or past in ss):
|
|
is_dup = True
|
|
break
|
|
if is_dup:
|
|
continue
|
|
deduped.append(line)
|
|
recently_sent_copy.append(ss)
|
|
|
|
async with state._lock:
|
|
state.recently_sent_lines.clear()
|
|
state.recently_sent_lines.extend(recently_sent_copy[-400:])
|
|
|
|
new_text = "\n".join(deduped).strip("\n")
|
|
if not new_text:
|
|
continue
|
|
|
|
async with state._lock:
|
|
if new_text == state.last_sent_text:
|
|
continue
|
|
state.last_sent_text = new_text
|
|
|
|
# Cap total outbound per tick so a big burst doesn't flood Telegram.
|
|
if len(new_text) > MAX_SEND_PER_TICK:
|
|
keep = new_text[-MAX_SEND_PER_TICK:]
|
|
# snap to next newline to avoid cutting mid-line
|
|
nl = keep.find("\n")
|
|
if 0 <= nl < 200:
|
|
keep = keep[nl + 1 :]
|
|
dropped = len(new_text) - len(keep)
|
|
new_text = f"… (+{dropped} chars earlier)\n{keep}"
|
|
|
|
for part in chunk_for_telegram(new_text):
|
|
try:
|
|
await bot.send_message(
|
|
chat_id=state.chat_id,
|
|
text=part,
|
|
disable_notification=True,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("telegram send failed: %s", exc)
|
|
break
|
|
|
|
|
|
async def on_startup(application: Application) -> None:
|
|
state: BridgeState = application.bot_data["state"]
|
|
try:
|
|
state.last_snapshot = await tmux_capture(TMUX_SESSION)
|
|
except Exception: # noqa: BLE001
|
|
state.last_snapshot = ""
|
|
application.bot_data["poll_task"] = asyncio.create_task(
|
|
poll_and_forward(application), name="bridge-poll"
|
|
)
|
|
|
|
|
|
async def on_shutdown(application: Application) -> None:
|
|
task = application.bot_data.get("poll_task")
|
|
if task:
|
|
task.cancel()
|
|
|
|
|
|
def main() -> int:
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
env = {**os.environ, **load_env(ENV_FILE)}
|
|
token = env.get("TELEGRAM_BOT_TOKEN", "").strip()
|
|
chat_id_raw = env.get("TELEGRAM_CHAT_ID", "").strip()
|
|
if not token or not chat_id_raw:
|
|
print(
|
|
"ERROR: TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID must be set in "
|
|
f"{ENV_FILE} or environment",
|
|
file=sys.stderr,
|
|
)
|
|
return 78
|
|
try:
|
|
chat_id = int(chat_id_raw)
|
|
except ValueError:
|
|
print(f"ERROR: TELEGRAM_CHAT_ID must be an integer, got: {chat_id_raw!r}",
|
|
file=sys.stderr)
|
|
return 78
|
|
|
|
check = subprocess.run(
|
|
["tmux", "has-session", "-t", TMUX_SESSION],
|
|
capture_output=True, text=True,
|
|
)
|
|
if check.returncode != 0:
|
|
print(
|
|
f"WARNING: tmux session '{TMUX_SESSION}' not found — bridge will run "
|
|
"but send/capture will fail until the session is created.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
application = (
|
|
ApplicationBuilder()
|
|
.token(token)
|
|
.post_init(on_startup)
|
|
.post_shutdown(on_shutdown)
|
|
.build()
|
|
)
|
|
application.bot_data["state"] = BridgeState(chat_id=chat_id)
|
|
application.add_handler(CommandHandler("ping", cmd_ping))
|
|
application.add_handler(CommandHandler("snapshot", cmd_snapshot))
|
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
application.run_polling(allowed_updates=Update.ALL_TYPES, stop_signals=None)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|