Cancellable turns: /clear aborts an in-flight streaming response #44

Merged
weiwen merged 1 commit from sandcastle/issue-42 into main 2026-07-06 02:40:21 +08:00
Owner

Summary

Make an in-flight pi turn cancellable, and wire /clear to cancel a streaming response cleanly. Previously, teloxide serialized updates per-chat, so a /clear sent mid-stream couldn't run until the response finished. Now it can.

What changed

Dispatch ()

  • Removed teloxide's default per-chat distribution serialization so a second same-chat message (e.g. /clear) is handled while the first is still streaming.

RPC / pi layer ()

  • now accepts a abort signal. A races the response reader against the abort signal.
  • On abort: sends an command to the pi process, then drains stdout to so the process returns to idle and can be reused.
  • Introduced enum ( / ) to distinguish normal completion from cancellation.
  • helper with idle-timeout protection.

Session state ()

  • Replaced (held + ) with (per-chat + last_activity + optional abort handle).
  • The sessions-map write lock is no longer held across an entire turn — just acquired briefly to get/create the slot, then dropped.
  • signals the in-flight turn's abort sender.
  • now waits for any active turn to finish before killing the process.
  • skips sessions with an active turn; uses to avoid blocking.

Telegram handler ()

  • invokes : aborts the turn, waits for the streaming task to hand over its message IDs, deletes in-flight messages, then resets the session.
  • registers with a (per-chat ) so /clear can await completion and get the message IDs to delete.
  • On abort, hands over message IDs and returns without rendering a final message.

Tests

  • Added tests for abort signal racing, abort-drain timeout, abort command serialization, and the no-op case (abort after agent_end).

Key decisions

  • Signal-based abort, not kill: the pi process is reused after an abort — we send a command, drain the response, and keep the process alive.
  • Per-chat concurrency via removal of serialization: this is a single-user bot, so full concurrency is acceptable. If multi-user concurrency is needed later, a more targeted strategy can replace this.
  • Two-phase abort: /clear signals the abort, then waits for the streaming task to finish and report its message IDs, rather than trying to delete messages concurrently.
  • AtomicI64 for last_activity: avoids needing a lock for idle-timeout checks, preventing the cleanup task from potentially blocking on a running turn.

Reviewer notes

  • Pay special attention to the concurrency model: is stored in the slot under a per-chat mutex, registered after acquiring the process lock. This ensures no races between concurrent calls.
  • The interaction between and is subtle — please verify the race-handling around takeover of and the closed-channel path.
  • acquires the per-chat process lock, so it will wait for an active turn to finish. Combined with the abort signal, this gives clean teardown.

Closes #42

## Summary Make an in-flight pi turn cancellable, and wire /clear to cancel a streaming response cleanly. Previously, teloxide serialized updates per-chat, so a /clear sent mid-stream couldn't run until the response finished. Now it can. ## What changed ### Dispatch () - Removed teloxide's default per-chat distribution serialization so a second same-chat message (e.g. /clear) is handled while the first is still streaming. ### RPC / pi layer () - now accepts a abort signal. A races the response reader against the abort signal. - On abort: sends an command to the pi process, then drains stdout to so the process returns to idle and can be reused. - Introduced enum ( / ) to distinguish normal completion from cancellation. - helper with idle-timeout protection. ### Session state () - Replaced (held + ) with (per-chat + last_activity + optional abort handle). - The sessions-map write lock is no longer held across an entire turn — just acquired briefly to get/create the slot, then dropped. - signals the in-flight turn's abort sender. - now waits for any active turn to finish before killing the process. - skips sessions with an active turn; uses to avoid blocking. ### Telegram handler () - invokes : aborts the turn, waits for the streaming task to hand over its message IDs, deletes in-flight messages, then resets the session. - registers with a (per-chat ) so /clear can await completion and get the message IDs to delete. - On abort, hands over message IDs and returns without rendering a final message. ### Tests - Added tests for abort signal racing, abort-drain timeout, abort command serialization, and the no-op case (abort after agent_end). ## Key decisions - **Signal-based abort, not kill**: the pi process is reused after an abort — we send a command, drain the response, and keep the process alive. - **Per-chat concurrency via removal of serialization**: this is a single-user bot, so full concurrency is acceptable. If multi-user concurrency is needed later, a more targeted strategy can replace this. - **Two-phase abort**: /clear signals the abort, then waits for the streaming task to finish and report its message IDs, rather than trying to delete messages concurrently. - **AtomicI64 for last_activity**: avoids needing a lock for idle-timeout checks, preventing the cleanup task from potentially blocking on a running turn. ## Reviewer notes - Pay special attention to the concurrency model: is stored in the slot under a per-chat mutex, registered *after* acquiring the process lock. This ensures no races between concurrent calls. - The interaction between and is subtle — please verify the race-handling around takeover of and the closed-channel path. - acquires the per-chat process lock, so it will wait for an active turn to finish. Combined with the abort signal, this gives clean teardown. Closes #42
feat(session): cancellable turns — closes #42
All checks were successful
CI / check (pull_request) Successful in 1m19s
PR Triage — label changes-requested reviews / triage-review (pull_request) Successful in 1s
06740a9b44
Introduces the cancel primitive so /clear cleanly aborts an in-flight
streaming response instead of blocking until it finishes or surfacing a
spurious "crashed" error.

## What changed

**pi.rs**
- Added `Abort` variant to `PiCommand` (serialises as `{"type":"abort"}`)
- Added `pub enum TurnOutcome { Response(AgentResponse), Aborted }`
- `PiProcess::send_message` takes `abort_rx: oneshot::Receiver<()>`; on
  cancel it writes `abort` to pi stdin, drains stdout to the resulting
  `agent_end` (returning pi to idle), and returns `TurnOutcome::Aborted`
- `kill` renamed to `kill_mut(&mut self)` for use through a MutexGuard
- Added `drain_aborted`: reads until `agent_end` or EOF/timeout
- Tests: abort before agent_end, abort after agent_end, drain stops at
  agent_end, drain times out on silent stream, abort command serialisation

**session.rs**
- Slot-based design: `RwLock<HashMap<String, Arc<Slot>>>` where
  `Slot { process: Mutex<PiProcess>, last_activity: AtomicI64,
  abort_tx: Mutex<Option<oneshot::Sender<()>>> }`
- Map write lock held only briefly (lookup or create); `process.lock()`
  is held only for the duration of a turn — not the map lock
- `send_message` registers abort_tx AFTER acquiring process.lock() to
  prevent a concurrent call from overwriting the handle mid-turn
- New `abort_turn(chat_id) -> bool`: signals the in-flight turn; returns
  true if a turn was running
- `clear_session` removes the slot then awaits the process lock, so it
  always waits for any active turn before killing
- `cleanup_idle` skips sessions with `has_active_turn()` (try_lock check)
- `SessionError::Aborted` variant for the abort outcome
- `shutdown_all` signals abort before waiting on the process lock

**telegram/mod.rs**
- `TurnTracker`: `Arc<Mutex<HashMap<i64, oneshot::Receiver<Vec<MessageId>>>>>`
  maps Telegram chat IDs to the completion receiver of the in-flight turn
- `run()` creates TurnTracker and adds it as a dispatcher dependency;
  sets `distribution_function(|_| None::<u32>)` to disable teloxide's
  per-chat serialisation so /clear can run while streaming is in progress
- `handle_message` receives `TurnTracker`; `/clear` routes through
  `clear_session_with_abort` instead of bare `clear_session`
- `clear_session_with_abort`: calls `abort_turn`, optionally awaits the
  `done_rx` for the message IDs, deletes them, then calls `clear_session`
- `deliver_streaming` registers `done_rx` in TurnTracker at start; on
  `SessionError::Aborted` it hands `view.ids` to /clear and returns
  cleanly; on normal completion it removes the tracker entry and drops
  `done_tx` so /clear's recv returns `Err` (nothing to delete)

## Key decisions

- abort_tx is set AFTER process.lock() to avoid overwrite races when two
  messages arrive simultaneously for the same chat
- drain_aborted discards snapshots and stops at the first agent_end; on
  timeout it returns an error (treated as crash by session.rs)
- TurnTracker is Telegram-only — session.rs knows nothing about MessageId;
  deliver_silent and http.rs are unaffected
- clear_session awaits process.lock() internally — /clear does not need
  to call clear_session before deliver_streaming finishes; the lock
  ordering ensures correctness
- process is reused (not killed) on abort; clear_session kills it after

## Blockers / notes

No Rust toolchain in sandbox — CI must validate compilation and tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
weiwen merged commit 9ff73f1ff9 into main 2026-07-06 02:40:21 +08:00
weiwen deleted branch sandcastle/issue-42 2026-07-06 02:40:21 +08:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
weiwen/evie!44
No description provided.