Cancellable turns: /clear aborts an in-flight streaming response #44
No reviewers
Labels
No labels
in-review
ready-for-agent
ready-for-human
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
weiwen/evie!44
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "sandcastle/issue-42"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
Make an in-flight pi turn cancellable, and wire /clear to cancel a streaming response cleanly. Previously, teloxide serialized updates per-chat, so a /clear sent mid-stream couldn't run until the response finished. Now it can.
What changed
Dispatch ()
RPC / pi layer ()
Session state ()
Telegram handler ()
Tests
Key decisions
Reviewer notes
Closes #42
Introduces the cancel primitive so /clear cleanly aborts an in-flight streaming response instead of blocking until it finishes or surfacing a spurious "crashed" error. ## What changed **pi.rs** - Added `Abort` variant to `PiCommand` (serialises as `{"type":"abort"}`) - Added `pub enum TurnOutcome { Response(AgentResponse), Aborted }` - `PiProcess::send_message` takes `abort_rx: oneshot::Receiver<()>`; on cancel it writes `abort` to pi stdin, drains stdout to the resulting `agent_end` (returning pi to idle), and returns `TurnOutcome::Aborted` - `kill` renamed to `kill_mut(&mut self)` for use through a MutexGuard - Added `drain_aborted`: reads until `agent_end` or EOF/timeout - Tests: abort before agent_end, abort after agent_end, drain stops at agent_end, drain times out on silent stream, abort command serialisation **session.rs** - Slot-based design: `RwLock<HashMap<String, Arc<Slot>>>` where `Slot { process: Mutex<PiProcess>, last_activity: AtomicI64, abort_tx: Mutex<Option<oneshot::Sender<()>>> }` - Map write lock held only briefly (lookup or create); `process.lock()` is held only for the duration of a turn — not the map lock - `send_message` registers abort_tx AFTER acquiring process.lock() to prevent a concurrent call from overwriting the handle mid-turn - New `abort_turn(chat_id) -> bool`: signals the in-flight turn; returns true if a turn was running - `clear_session` removes the slot then awaits the process lock, so it always waits for any active turn before killing - `cleanup_idle` skips sessions with `has_active_turn()` (try_lock check) - `SessionError::Aborted` variant for the abort outcome - `shutdown_all` signals abort before waiting on the process lock **telegram/mod.rs** - `TurnTracker`: `Arc<Mutex<HashMap<i64, oneshot::Receiver<Vec<MessageId>>>>>` maps Telegram chat IDs to the completion receiver of the in-flight turn - `run()` creates TurnTracker and adds it as a dispatcher dependency; sets `distribution_function(|_| None::<u32>)` to disable teloxide's per-chat serialisation so /clear can run while streaming is in progress - `handle_message` receives `TurnTracker`; `/clear` routes through `clear_session_with_abort` instead of bare `clear_session` - `clear_session_with_abort`: calls `abort_turn`, optionally awaits the `done_rx` for the message IDs, deletes them, then calls `clear_session` - `deliver_streaming` registers `done_rx` in TurnTracker at start; on `SessionError::Aborted` it hands `view.ids` to /clear and returns cleanly; on normal completion it removes the tracker entry and drops `done_tx` so /clear's recv returns `Err` (nothing to delete) ## Key decisions - abort_tx is set AFTER process.lock() to avoid overwrite races when two messages arrive simultaneously for the same chat - drain_aborted discards snapshots and stops at the first agent_end; on timeout it returns an error (treated as crash by session.rs) - TurnTracker is Telegram-only — session.rs knows nothing about MessageId; deliver_silent and http.rs are unaffected - clear_session awaits process.lock() internally — /clear does not need to call clear_session before deliver_streaming finishes; the lock ordering ensures correctness - process is reused (not killed) on abort; clear_session kills it after ## Blockers / notes No Rust toolchain in sandbox — CI must validate compilation and tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>