use Gemini-CLI --experimental-acp (#784)

This commit is contained in:
Solomon
2025-09-25 19:36:08 +01:00
committed by GitHub
parent 4f7351ce16
commit 1c23d4fd11
21 changed files with 1750 additions and 679 deletions

1
.gitignore vendored
View File

@@ -67,6 +67,7 @@ coverage/
frontend/dist
crates/executors/bindings
crates/utils/bindings
crates/services/bindings
build-npm-package-codesign.sh

View File

@@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
workspace_utils = { path = "../utils", package = "utils" }
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
tokio-util = { version = "0.7", features = ["io", "compat"] }
bytes = "1.0"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -42,3 +42,4 @@ convert_case = "0.6"
sqlx = "0.8.6"
axum = { workspace = true }
shlex = "1.3.0"
agent-client-protocol = "0.4"

View File

@@ -1,13 +1,12 @@
use std::path::Path;
use async_trait::async_trait;
use command_group::AsyncGroupChild;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use crate::{
actions::Executable,
executors::{ExecutorError, StandardCodingAgentExecutor},
executors::{ExecutorError, SpawnedChild, StandardCodingAgentExecutor},
profile::{ExecutorConfigs, ExecutorProfileId},
};
@@ -30,7 +29,7 @@ impl CodingAgentFollowUpRequest {
#[async_trait]
impl Executable for CodingAgentFollowUpRequest {
async fn spawn(&self, current_dir: &Path) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path) -> Result<SpawnedChild, ExecutorError> {
let executor_profile_id = self.get_executor_profile_id();
let agent = ExecutorConfigs::get_cached()
.get_coding_agent(&executor_profile_id)

View File

@@ -1,13 +1,12 @@
use std::path::Path;
use async_trait::async_trait;
use command_group::AsyncGroupChild;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use crate::{
actions::Executable,
executors::{ExecutorError, StandardCodingAgentExecutor},
executors::{ExecutorError, SpawnedChild, StandardCodingAgentExecutor},
profile::{ExecutorConfigs, ExecutorProfileId},
};
@@ -22,7 +21,7 @@ pub struct CodingAgentInitialRequest {
#[async_trait]
impl Executable for CodingAgentInitialRequest {
async fn spawn(&self, current_dir: &Path) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path) -> Result<SpawnedChild, ExecutorError> {
let executor_profile_id = self.executor_profile_id.clone();
let agent = ExecutorConfigs::get_cached()
.get_coding_agent(&executor_profile_id)

View File

@@ -1,7 +1,6 @@
use std::path::Path;
use async_trait::async_trait;
use command_group::AsyncGroupChild;
use enum_dispatch::enum_dispatch;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
@@ -11,7 +10,7 @@ use crate::{
coding_agent_follow_up::CodingAgentFollowUpRequest,
coding_agent_initial::CodingAgentInitialRequest, script::ScriptRequest,
},
executors::ExecutorError,
executors::{ExecutorError, SpawnedChild},
};
pub mod coding_agent_follow_up;
pub mod coding_agent_initial;
@@ -49,12 +48,12 @@ impl ExecutorAction {
#[async_trait]
#[enum_dispatch(ExecutorActionType)]
pub trait Executable {
async fn spawn(&self, current_dir: &Path) -> Result<AsyncGroupChild, ExecutorError>;
async fn spawn(&self, current_dir: &Path) -> Result<SpawnedChild, ExecutorError>;
}
#[async_trait]
impl Executable for ExecutorAction {
async fn spawn(&self, current_dir: &Path) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path) -> Result<SpawnedChild, ExecutorError> {
self.typ.spawn(current_dir).await
}
}

View File

@@ -1,13 +1,16 @@
use std::path::Path;
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use command_group::AsyncCommandGroup;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use ts_rs::TS;
use workspace_utils::shell::get_shell_command;
use crate::{actions::Executable, executors::ExecutorError};
use crate::{
actions::Executable,
executors::{ExecutorError, SpawnedChild},
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
pub enum ScriptRequestLanguage {
@@ -30,7 +33,7 @@ pub struct ScriptRequest {
#[async_trait]
impl Executable for ScriptRequest {
async fn spawn(&self, current_dir: &Path) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
command
@@ -43,6 +46,6 @@ impl Executable for ScriptRequest {
let child = command.group_spawn()?;
Ok(child)
Ok(child.into())
}
}

View File

@@ -0,0 +1,145 @@
use agent_client_protocol as acp;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tracing::{debug, warn};
use crate::executors::acp::AcpEvent;
/// ACP client that handles agent-client protocol communication
pub struct AcpClient {
event_tx: mpsc::UnboundedSender<AcpEvent>,
}
impl AcpClient {
/// Create a new ACP client
pub fn new(event_tx: mpsc::UnboundedSender<AcpEvent>) -> Self {
Self { event_tx }
}
pub fn record_user_prompt_event(&self, prompt: &str) {
self.send_event(AcpEvent::User(prompt.to_string()));
}
/// Send an event to the event channel
fn send_event(&self, event: AcpEvent) {
if let Err(e) = self.event_tx.send(event) {
warn!("Failed to send ACP event: {}", e);
}
}
}
#[async_trait(?Send)]
impl acp::Client for AcpClient {
async fn request_permission(
&self,
args: acp::RequestPermissionRequest,
) -> Result<acp::RequestPermissionResponse, acp::Error> {
// Forward the request as an event
self.send_event(AcpEvent::RequestPermission(args.clone()));
// Auto-approve with best available option
let chosen_option = args
.options
.iter()
.find(|o| matches!(o.kind, acp::PermissionOptionKind::AllowAlways))
.or_else(|| {
args.options
.iter()
.find(|o| matches!(o.kind, acp::PermissionOptionKind::AllowOnce))
})
.or_else(|| args.options.first());
let outcome = if let Some(opt) = chosen_option {
debug!("Auto-approving permission with option: {}", opt.id);
acp::RequestPermissionOutcome::Selected {
option_id: opt.id.clone(),
}
} else {
warn!("No permission options available, cancelling");
acp::RequestPermissionOutcome::Cancelled
};
Ok(acp::RequestPermissionResponse {
outcome,
meta: None,
})
}
async fn session_notification(&self, args: acp::SessionNotification) -> Result<(), acp::Error> {
// Convert to typed events
let event = match args.update {
acp::SessionUpdate::AgentMessageChunk { content } => Some(AcpEvent::Message(content)),
acp::SessionUpdate::AgentThoughtChunk { content } => Some(AcpEvent::Thought(content)),
acp::SessionUpdate::ToolCall(tc) => Some(AcpEvent::ToolCall(tc)),
acp::SessionUpdate::ToolCallUpdate(update) => Some(AcpEvent::ToolUpdate(update)),
acp::SessionUpdate::Plan(plan) => Some(AcpEvent::Plan(plan)),
_ => Some(AcpEvent::Other(args)),
};
if let Some(event) = event {
self.send_event(event);
}
Ok(())
}
// File system operations - not implemented as we don't expose FS
async fn write_text_file(
&self,
_args: acp::WriteTextFileRequest,
) -> Result<acp::WriteTextFileResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
async fn read_text_file(
&self,
_args: acp::ReadTextFileRequest,
) -> Result<acp::ReadTextFileResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
// Terminal operations - not implemented
async fn create_terminal(
&self,
_args: acp::CreateTerminalRequest,
) -> Result<acp::CreateTerminalResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
async fn terminal_output(
&self,
_args: acp::TerminalOutputRequest,
) -> Result<acp::TerminalOutputResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
async fn release_terminal(
&self,
_args: acp::ReleaseTerminalRequest,
) -> Result<acp::ReleaseTerminalResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
async fn wait_for_terminal_exit(
&self,
_args: acp::WaitForTerminalExitRequest,
) -> Result<acp::WaitForTerminalExitResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
async fn kill_terminal_command(
&self,
_args: acp::KillTerminalCommandRequest,
) -> Result<acp::KillTerminalCommandResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
// Extension methods
async fn ext_method(&self, _args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
Err(acp::Error::method_not_found())
}
async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> {
Ok(())
}
}

View File

@@ -0,0 +1,377 @@
use std::{
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
};
use agent_client_protocol as proto;
use agent_client_protocol::Agent as _;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use futures::StreamExt;
use tokio::{io::AsyncWriteExt, process::Command, sync::mpsc};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tracing::error;
use workspace_utils::shell::get_shell_command;
use super::{AcpClient, SessionManager};
use crate::executors::{ExecutorError, SpawnedChild, acp::AcpEvent};
/// Reusable harness for ACP-based conns (Gemini, Qwen, etc.)
pub struct AcpAgentHarness {
session_namespace: String,
}
impl Default for AcpAgentHarness {
fn default() -> Self {
// Keep existing behavior for Gemini
Self::new()
}
}
impl AcpAgentHarness {
/// Create a harness with the default Gemini namespace
pub fn new() -> Self {
Self {
session_namespace: "gemini_sessions".to_string(),
}
}
/// Create a harness with a custom session namespace (e.g. for Qwen)
pub fn with_session_namespace(namespace: impl Into<String>) -> Self {
Self {
session_namespace: namespace.into(),
}
}
pub async fn spawn_with_command(
&self,
current_dir: &Path,
prompt: String,
full_command: String,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(full_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel::<()>();
Self::bootstrap_acp_connection(
&mut child,
current_dir.to_path_buf(),
None,
prompt,
Some(exit_tx),
self.session_namespace.clone(),
)
.await?;
Ok(SpawnedChild {
child,
exit_signal: Some(exit_rx),
})
}
pub async fn spawn_follow_up_with_command(
&self,
current_dir: &Path,
prompt: String,
session_id: &str,
full_command: String,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(full_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel::<()>();
Self::bootstrap_acp_connection(
&mut child,
current_dir.to_path_buf(),
Some(session_id.to_string()),
prompt,
Some(exit_tx),
self.session_namespace.clone(),
)
.await?;
Ok(SpawnedChild {
child,
exit_signal: Some(exit_rx),
})
}
async fn bootstrap_acp_connection(
child: &mut AsyncGroupChild,
cwd: PathBuf,
existing_session: Option<String>,
prompt: String,
exit_signal: Option<tokio::sync::oneshot::Sender<()>>,
session_namespace: String,
) -> Result<(), ExecutorError> {
// Take child's stdio for ACP wiring
let orig_stdout = child.inner().stdout.take().ok_or_else(|| {
ExecutorError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Child process has no stdout",
))
})?;
let orig_stdin = child.inner().stdin.take().ok_or_else(|| {
ExecutorError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Child process has no stdin",
))
})?;
// Create a fresh stdout pipe for logs
let writer = crate::stdout_dup::create_stdout_pipe_writer(child)?;
let shared_writer = Arc::new(tokio::sync::Mutex::new(writer));
let (log_tx, mut log_rx) = mpsc::unbounded_channel::<String>();
// Spawn log -> stdout writer task
tokio::spawn(async move {
while let Some(line) = log_rx.recv().await {
let mut data = line.into_bytes();
data.push(b'\n');
let mut w = shared_writer.lock().await;
let _ = w.write_all(&data).await;
}
});
// ACP client STDIO
let (mut to_acp_writer, acp_incoming_reader) = tokio::io::duplex(64 * 1024);
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
// Process stdout -> ACP
tokio::spawn(async move {
let mut stdout_stream = tokio_util::io::ReaderStream::new(orig_stdout);
while let Some(res) = stdout_stream.next().await {
if *shutdown_rx.borrow() {
break;
}
match res {
Ok(data) => {
let _ = to_acp_writer.write_all(&data).await;
}
Err(_) => break,
}
}
});
// ACP crate expects futures::AsyncRead + AsyncWrite, use tokio compat to adapt tokio::io::AsyncRead + Write
let outgoing = orig_stdin.compat_write();
let incoming = acp_incoming_reader.compat();
let mut exit_signal_tx = exit_signal;
// Run ACP client in a LocalSet
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");
rt.block_on(async move {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
// Create event and raw channels
// Typed events available for future use; raw lines forwarded and persisted
let (event_tx, mut event_rx) =
mpsc::unbounded_channel::<crate::executors::acp::AcpEvent>();
// Create session manager
let session_manager = match SessionManager::new(session_namespace) {
Ok(sm) => sm,
Err(e) => {
error!("Failed to create session manager: {}", e);
return;
}
};
let session_manager = std::sync::Arc::new(session_manager);
// Create ACP client
let client = AcpClient::new(event_tx.clone());
client.record_user_prompt_event(&prompt);
// Set up connection
let (conn, io_fut) =
proto::ClientSideConnection::new(client, outgoing, incoming, |fut| {
tokio::task::spawn_local(fut);
});
// Drive I/O
let io_handle = tokio::task::spawn_local(async move {
let _ = io_fut.await;
});
// Initialize
let _ = conn
.initialize(proto::InitializeRequest {
protocol_version: proto::V1,
client_capabilities: proto::ClientCapabilities {
fs: proto::FileSystemCapability {
read_text_file: false,
write_text_file: false,
meta: None,
},
terminal: false,
meta: None,
},
meta: None,
})
.await;
// Handle session creation/forking
let (acp_session_id, display_session_id, prompt_to_send) =
if let Some(existing) = existing_session {
// Fork existing session
let new_ui_id = uuid::Uuid::new_v4().to_string();
let _ = session_manager.fork_session(&existing, &new_ui_id);
let history = session_manager.read_session_raw(&new_ui_id).ok();
let meta =
history.map(|h| serde_json::json!({ "history_jsonl": h }));
match conn
.new_session(proto::NewSessionRequest {
mcp_servers: vec![],
cwd: cwd.clone(),
meta,
})
.await
{
Ok(resp) => {
let resume_prompt = session_manager
.generate_resume_prompt(&new_ui_id, &prompt)
.unwrap_or_else(|_| prompt.clone());
(resp.session_id.0.to_string(), new_ui_id, resume_prompt)
}
Err(e) => {
error!("Failed to create session: {}", e);
return;
}
}
} else {
// New session
match conn
.new_session(proto::NewSessionRequest {
mcp_servers: vec![],
cwd: cwd.clone(),
meta: None,
})
.await
{
Ok(resp) => {
let sid = resp.session_id.0.to_string();
(sid.clone(), sid, prompt)
}
Err(e) => {
error!("Failed to create session: {}", e);
return;
}
}
};
// Emit session ID
let _ = log_tx
.send(AcpEvent::SessionStart(display_session_id.clone()).to_string());
// Start raw event forwarder and persistence
let app_tx_clone = log_tx.clone();
let sess_id_for_writer = display_session_id.clone();
let sm_for_writer = session_manager.clone();
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
// Forward to stdout
let _ = app_tx_clone.send(event.to_string());
// Persist to session file
let _ = sm_for_writer
.append_raw_line(&sess_id_for_writer, &event.to_string());
}
});
// Save prompt to session
let _ = session_manager.append_raw_line(
&display_session_id,
&serde_json::to_string(&serde_json::json!({ "user": prompt_to_send }))
.unwrap_or_default(),
);
// Build prompt request
let req = proto::PromptRequest {
session_id: proto::SessionId(acp_session_id.clone().into()),
prompt: vec![proto::ContentBlock::Text(proto::TextContent {
annotations: None,
text: prompt_to_send,
meta: None,
})],
meta: None,
};
// Send the prompt and await completion to obtain stop_reason
match conn.prompt(req).await {
Ok(resp) => {
// Emit done with stop_reason
let stop_reason =
serde_json::to_string(&resp.stop_reason).unwrap_or_default();
let _ = log_tx.send(AcpEvent::Done(stop_reason).to_string());
}
Err(e) => {
tracing::debug!("error {} {e} {:?}", e.code, e.data);
if e.code == agent_client_protocol::ErrorCode::INTERNAL_ERROR.code
&& e.data
.as_ref()
.is_some_and(|d| d == "server shut down unexpectedly")
{
tracing::debug!("ACP server killed");
} else {
let _ =
log_tx.send(AcpEvent::Error(format!("{e}")).to_string());
}
}
}
// Notify container of completion
if let Some(tx) = exit_signal_tx.take() {
let _ = tx.send(());
}
// Cancel session work
let _ = conn
.cancel(proto::CancelNotification {
session_id: proto::SessionId(acp_session_id.into()),
meta: None,
})
.await;
// Cleanup
drop(conn);
let _ = shutdown_tx.send(true);
let _ = io_handle.await;
drop(log_tx);
})
.await;
});
});
Ok(())
}
}

View File

@@ -0,0 +1,44 @@
pub mod client;
pub mod harness;
pub mod normalize_logs;
pub mod session;
use std::{fmt::Display, str::FromStr};
pub use client::AcpClient;
pub use harness::AcpAgentHarness;
pub use normalize_logs::*;
use serde::{Deserialize, Serialize};
pub use session::SessionManager;
/// Parsed event types for internal processing
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AcpEvent {
User(String),
SessionStart(String),
Message(agent_client_protocol::ContentBlock),
Thought(agent_client_protocol::ContentBlock),
ToolCall(agent_client_protocol::ToolCall),
ToolUpdate(agent_client_protocol::ToolCallUpdate),
Plan(agent_client_protocol::Plan),
AvailableCommands(Vec<agent_client_protocol::AvailableCommand>),
CurrentMode(agent_client_protocol::SessionModeId),
RequestPermission(agent_client_protocol::RequestPermissionRequest),
Error(String),
Done(String),
Other(agent_client_protocol::SessionNotification),
}
impl Display for AcpEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::to_string(self).unwrap_or_default())
}
}
impl FromStr for AcpEvent {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}

View File

@@ -0,0 +1,673 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use agent_client_protocol::{self as acp, SessionNotification};
use futures::StreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::Deserialize;
use tracing::debug;
use workspace_utils::msg_store::MsgStore;
pub use super::AcpAgentHarness;
use super::AcpEvent;
use crate::logs::{
ActionType, FileChange, NormalizedEntry, NormalizedEntryType, ToolResult, ToolResultValueType,
ToolStatus as LogToolStatus,
stderr_processor::normalize_stderr_logs,
utils::{ConversationPatch, EntryIndexProvider},
};
pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
// stderr normalization
let entry_index = EntryIndexProvider::start_from(&msg_store);
normalize_stderr_logs(msg_store.clone(), entry_index.clone());
// stdout normalization (main loop)
let worktree_path = worktree_path.to_path_buf();
// Type aliases to simplify complex state types and appease clippy
tokio::spawn(async move {
type ToolStates = std::collections::HashMap<String, PartialToolCallData>;
let mut stored_session_id = false;
let mut streaming: StreamingState = StreamingState::default();
let mut tool_states: ToolStates = HashMap::new();
let mut stdout_lines = msg_store.stdout_lines_stream();
while let Some(Ok(line)) = stdout_lines.next().await {
if let Some(parsed) = AcpEventParser::parse_line(&line) {
debug!("Parsed ACP line: {:?}", parsed);
match parsed {
AcpEvent::SessionStart(id) => {
if !stored_session_id {
msg_store.push_session_id(id);
stored_session_id = true;
}
}
AcpEvent::Error(msg) => {
let idx = entry_index.next();
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ErrorMessage,
content: msg,
metadata: None,
};
msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry));
}
AcpEvent::Done(_) => {
streaming.assistant_text = None;
streaming.thinking_text = None;
}
AcpEvent::Message(content) => {
streaming.thinking_text = None;
if let agent_client_protocol::ContentBlock::Text(text) = content {
let is_new = streaming.assistant_text.is_none();
if is_new {
let idx = entry_index.next();
streaming.assistant_text = Some(StreamingText {
index: idx,
content: String::new(),
});
}
if let Some(ref mut s) = streaming.assistant_text {
s.content.push_str(&text.text);
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::AssistantMessage,
content: s.content.clone(),
metadata: None,
};
let patch = if is_new {
ConversationPatch::add_normalized_entry(s.index, entry)
} else {
ConversationPatch::replace(s.index, entry)
};
msg_store.push_patch(patch);
}
}
}
AcpEvent::Thought(content) => {
streaming.assistant_text = None;
if let agent_client_protocol::ContentBlock::Text(text) = content {
let is_new = streaming.thinking_text.is_none();
if is_new {
let idx = entry_index.next();
streaming.thinking_text = Some(StreamingText {
index: idx,
content: String::new(),
});
}
if let Some(ref mut s) = streaming.thinking_text {
s.content.push_str(&text.text);
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::Thinking,
content: s.content.clone(),
metadata: None,
};
let patch = if is_new {
ConversationPatch::add_normalized_entry(s.index, entry)
} else {
ConversationPatch::replace(s.index, entry)
};
msg_store.push_patch(patch);
}
}
}
AcpEvent::Plan(plan) => {
streaming.assistant_text = None;
streaming.thinking_text = None;
let mut body = String::from("Plan:\n");
for (i, e) in plan.entries.iter().enumerate() {
body.push_str(&format!("{}. {}\n", i + 1, e.content));
}
let idx = entry_index.next();
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::SystemMessage,
content: body,
metadata: None,
};
msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry));
}
AcpEvent::AvailableCommands(cmds) => {
let mut body = String::from("Available commands:\n");
for c in &cmds {
body.push_str(&format!("- {}\n", c.name));
}
let idx = entry_index.next();
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::SystemMessage,
content: body,
metadata: None,
};
msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry));
}
AcpEvent::CurrentMode(mode_id) => {
let idx = entry_index.next();
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::SystemMessage,
content: format!("Current mode: {}", mode_id.0),
metadata: None,
};
msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry));
}
AcpEvent::RequestPermission(perm) => {
if let Ok(tc) = agent_client_protocol::ToolCall::try_from(perm.tool_call) {
handle_tool_call(
&tc,
&worktree_path,
&mut streaming,
&mut tool_states,
&entry_index,
&msg_store,
);
}
}
AcpEvent::ToolCall(tc) => handle_tool_call(
&tc,
&worktree_path,
&mut streaming,
&mut tool_states,
&entry_index,
&msg_store,
),
AcpEvent::ToolUpdate(update) => {
let mut update = update;
if update.fields.title.is_none() {
update.fields.title = tool_states
.get(&update.id.0.to_string())
.map(|s| s.title.clone())
.or_else(|| Some("".to_string()));
}
debug!("Got tool call update: {:?}", update);
if let Ok(tc) = agent_client_protocol::ToolCall::try_from(update.clone()) {
handle_tool_call(
&tc,
&worktree_path,
&mut streaming,
&mut tool_states,
&entry_index,
&msg_store,
);
} else {
debug!("Failed to convert tool call update to ToolCall");
}
}
AcpEvent::User(_) | AcpEvent::Other(_) => (),
}
}
}
fn handle_tool_call(
tc: &agent_client_protocol::ToolCall,
worktree_path: &Path,
streaming: &mut StreamingState,
tool_states: &mut ToolStates,
entry_index: &EntryIndexProvider,
msg_store: &Arc<MsgStore>,
) {
streaming.assistant_text = None;
streaming.thinking_text = None;
let id = tc.id.0.to_string();
let is_new = !tool_states.contains_key(&id);
let tool_data = tool_states.entry(id).or_default();
tool_data.extend(tc, worktree_path);
if is_new {
tool_data.index = entry_index.next();
}
let action = map_to_action_type(tool_data);
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: tool_data.title.clone(),
action_type: action,
status: convert_tool_status(&tool_data.status),
},
content: get_tool_content(tool_data),
metadata: None,
};
let patch = if is_new {
ConversationPatch::add_normalized_entry(tool_data.index, entry)
} else {
ConversationPatch::replace(tool_data.index, entry)
};
msg_store.push_patch(patch);
}
fn map_to_action_type(tc: &PartialToolCallData) -> ActionType {
match tc.kind {
agent_client_protocol::ToolKind::Read => {
// Special-case: read_many_files style titles parsed via helper
if tc.id.0.starts_with("read_many_files") {
let result = collect_text_content(&tc.content).map(|text| ToolResult {
r#type: ToolResultValueType::Markdown,
value: serde_json::Value::String(text),
});
return ActionType::Tool {
tool_name: "read_many_files".to_string(),
arguments: Some(serde_json::Value::String(tc.title.clone())),
result,
};
}
ActionType::FileRead {
path: tc
.path
.clone()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
}
}
agent_client_protocol::ToolKind::Edit => {
let changes = extract_file_changes(tc);
ActionType::FileEdit {
path: tc
.path
.clone()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
changes,
}
}
agent_client_protocol::ToolKind::Execute => {
let command = AcpEventParser::parse_execute_command(&tc.title);
// Prefer structured raw_output, else fallback to aggregated text content
let completed =
matches!(tc.status, agent_client_protocol::ToolCallStatus::Completed);
tracing::debug!(
"Mapping execute tool call, completed: {}, command: {}",
completed,
command
);
let mut result = if let Some(out_val) = tc.raw_output.as_ref() {
serde_json::from_value::<ShellOutput>(out_val.clone())
.ok()
.map(|out| {
let mut exit_status = out
.exit_code
.map(|code| crate::logs::CommandExitStatus::ExitCode { code });
let output = out.stdout.or(out.stderr).unwrap_or_default();
if exit_status.is_none() && completed {
exit_status = Some(crate::logs::CommandExitStatus::Success {
success: true,
});
}
crate::logs::CommandRunResult {
exit_status,
output: Some(output),
}
})
} else {
None
};
if result.is_none() && completed {
result = Some(crate::logs::CommandRunResult {
exit_status: Some(crate::logs::CommandExitStatus::Success {
success: true,
}),
output: None,
});
}
ActionType::CommandRun { command, result }
}
agent_client_protocol::ToolKind::Delete => ActionType::FileEdit {
path: tc
.path
.clone()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
changes: vec![FileChange::Delete],
},
agent_client_protocol::ToolKind::Search => {
let query = tc
.raw_input
.as_ref()
.and_then(|v| serde_json::from_value::<SearchArgs>(v.clone()).ok())
.map(|a| a.query)
.unwrap_or_else(|| tc.title.clone());
ActionType::Search { query }
}
agent_client_protocol::ToolKind::Fetch => {
let mut url = tc
.raw_input
.as_ref()
.and_then(|v| serde_json::from_value::<FetchArgs>(v.clone()).ok())
.map(|a| a.url)
.unwrap_or_default();
if url.is_empty() {
// Fallback: try to extract first URL from the title
if let Some(extracted) = extract_url_from_text(&tc.title) {
url = extracted;
}
}
ActionType::WebFetch { url }
}
agent_client_protocol::ToolKind::Think => {
let tool_name = extract_tool_name_from_id(tc.id.0.as_ref())
.unwrap_or_else(|| tc.title.clone());
// For think/save_memory, surface both title and aggregated text content as arguments
let text = collect_text_content(&tc.content);
let arguments = Some(match &text {
Some(t) => serde_json::json!({ "title": tc.title, "content": t }),
None => serde_json::json!({ "title": tc.title }),
});
let result = if let Some(output) = &tc.raw_output {
Some(ToolResult {
r#type: ToolResultValueType::Json,
value: output.clone(),
})
} else {
collect_text_content(&tc.content).map(|text| ToolResult {
r#type: ToolResultValueType::Markdown,
value: serde_json::Value::String(text),
})
};
ActionType::Tool {
tool_name,
arguments,
result,
}
}
agent_client_protocol::ToolKind::SwitchMode => ActionType::Other {
description: "switch_mode".to_string(),
},
agent_client_protocol::ToolKind::Other | agent_client_protocol::ToolKind::Move => {
// Derive a friendlier tool name from the id if it looks like name-<digits>
let tool_name = extract_tool_name_from_id(tc.id.0.as_ref())
.unwrap_or_else(|| tc.title.clone());
// Some tools embed JSON args into the title instead of raw_input
let arguments = if let Some(raw) = &tc.raw_input {
Some(raw.clone())
} else if tc.title.trim_start().starts_with('{') {
// Title contains JSON arguments for the tool
serde_json::from_str::<serde_json::Value>(&tc.title).ok()
} else {
None
};
// Extract result: prefer raw_output (structured), else text content as Markdown
let result = if let Some(output) = &tc.raw_output {
Some(ToolResult {
r#type: ToolResultValueType::Json,
value: output.clone(),
})
} else {
collect_text_content(&tc.content).map(|text| ToolResult {
r#type: ToolResultValueType::Markdown,
value: serde_json::Value::String(text),
})
};
ActionType::Tool {
tool_name,
arguments,
result,
}
}
}
}
fn extract_file_changes(tc: &PartialToolCallData) -> Vec<FileChange> {
let mut changes = Vec::new();
for c in &tc.content {
if let agent_client_protocol::ToolCallContent::Diff { diff } = c {
let path = diff.path.to_string_lossy().to_string();
let rel = if !path.is_empty() {
path
} else {
tc.path
.clone()
.unwrap_or_default()
.to_string_lossy()
.to_string()
};
let old_text = diff.old_text.as_deref().unwrap_or("");
if old_text.is_empty() {
changes.push(FileChange::Write {
content: diff.new_text.clone(),
});
} else {
let unified = workspace_utils::diff::create_unified_diff(
&rel,
old_text,
&diff.new_text,
);
changes.push(FileChange::Edit {
unified_diff: unified,
has_line_numbers: false,
});
}
}
}
changes
}
fn get_tool_content(tc: &PartialToolCallData) -> String {
match tc.kind {
agent_client_protocol::ToolKind::Execute => {
AcpEventParser::parse_execute_command(&tc.title)
}
agent_client_protocol::ToolKind::Think => "Saving memory".to_string(),
agent_client_protocol::ToolKind::Other => {
let tool_name = extract_tool_name_from_id(tc.id.0.as_ref())
.unwrap_or_else(|| "tool".to_string());
if tc.title.is_empty() {
tool_name
} else {
format!("{}: {}", tool_name, tc.title)
}
}
agent_client_protocol::ToolKind::Read => {
if tc.id.0.starts_with("read_many_files") {
"Read files".to_string()
} else {
tc.title.clone()
}
}
_ => tc.title.clone(),
}
}
fn extract_tool_name_from_id(id: &str) -> Option<String> {
if let Some(idx) = id.rfind('-') {
let (head, tail) = id.split_at(idx);
if tail
.trim_start_matches('-')
.chars()
.all(|c| c.is_ascii_digit())
{
return Some(head.to_string());
}
}
None
}
fn extract_url_from_text(text: &str) -> Option<String> {
// Simple URL extractor
lazy_static! {
static ref URL_RE: Regex =
Regex::new(r#"https?://[^\s"')]+"#).expect("valid regex");
}
URL_RE.find(text).map(|m| m.as_str().to_string())
}
fn collect_text_content(
content: &[agent_client_protocol::ToolCallContent],
) -> Option<String> {
let mut out = String::new();
for c in content {
if let agent_client_protocol::ToolCallContent::Content { content } = c
&& let agent_client_protocol::ContentBlock::Text(t) = content
{
out.push_str(&t.text);
if !out.ends_with('\n') {
out.push('\n');
}
}
}
if out.is_empty() { None } else { Some(out) }
}
fn convert_tool_status(status: &agent_client_protocol::ToolCallStatus) -> LogToolStatus {
match status {
agent_client_protocol::ToolCallStatus::Pending
| agent_client_protocol::ToolCallStatus::InProgress => LogToolStatus::Created,
agent_client_protocol::ToolCallStatus::Completed => LogToolStatus::Success,
agent_client_protocol::ToolCallStatus::Failed => LogToolStatus::Failed,
}
}
});
}
struct PartialToolCallData {
index: usize,
id: agent_client_protocol::ToolCallId,
kind: agent_client_protocol::ToolKind,
title: String,
status: agent_client_protocol::ToolCallStatus,
path: Option<PathBuf>,
content: Vec<agent_client_protocol::ToolCallContent>,
raw_input: Option<serde_json::Value>,
raw_output: Option<serde_json::Value>,
}
impl PartialToolCallData {
fn extend(&mut self, tc: &agent_client_protocol::ToolCall, worktree_path: &Path) {
self.id = tc.id.clone();
if tc.kind != Default::default() {
self.kind = tc.kind;
}
if !tc.title.is_empty() {
self.title = tc.title.clone();
}
if tc.status != Default::default() {
self.status = tc.status;
}
if !tc.locations.is_empty() {
self.path = tc.locations.first().map(|l| {
PathBuf::from(workspace_utils::path::make_path_relative(
&l.path.to_string_lossy(),
&worktree_path.to_string_lossy(),
))
});
}
if !tc.content.is_empty() {
self.content = tc.content.clone();
}
if tc.raw_input.is_some() {
self.raw_input = tc.raw_input.clone();
}
if tc.raw_output.is_some() {
self.raw_output = tc.raw_output.clone();
}
}
}
impl Default for PartialToolCallData {
fn default() -> Self {
Self {
id: agent_client_protocol::ToolCallId(Default::default()),
index: 0,
kind: agent_client_protocol::ToolKind::default(),
title: String::new(),
status: Default::default(),
path: None,
content: Vec::new(),
raw_input: None,
raw_output: None,
}
}
}
struct AcpEventParser;
impl AcpEventParser {
/// Parse a line that may contain an ACP event
pub fn parse_line(line: &str) -> Option<AcpEvent> {
let trimmed = line.trim();
if let Ok(acp_event) = serde_json::from_str::<AcpEvent>(trimmed) {
return Some(acp_event);
}
debug!("Failed to parse ACP raw log {trimmed}");
None
}
/// Parse command from tool title (for execute tools)
pub fn parse_execute_command(title: &str) -> String {
title.split(" (").next().unwrap_or(title).trim().to_string()
}
}
/// Result of parsing a line
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ParsedLine {
SessionId(String),
Event(AcpEvent),
Error(String),
Done,
}
impl TryFrom<SessionNotification> for AcpEvent {
type Error = ();
fn try_from(notification: SessionNotification) -> Result<Self, ()> {
let event = match notification.update {
acp::SessionUpdate::AgentMessageChunk { content } => AcpEvent::Message(content),
acp::SessionUpdate::AgentThoughtChunk { content } => AcpEvent::Thought(content),
acp::SessionUpdate::ToolCall(tc) => AcpEvent::ToolCall(tc),
acp::SessionUpdate::ToolCallUpdate(update) => AcpEvent::ToolUpdate(update),
acp::SessionUpdate::Plan(plan) => AcpEvent::Plan(plan),
acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => {
AcpEvent::AvailableCommands(available_commands)
}
acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => {
AcpEvent::CurrentMode(current_mode_id)
}
_ => return Err(()),
};
Ok(event)
}
}
#[derive(Debug, Clone, Deserialize)]
struct SearchArgs {
query: String,
}
#[derive(Debug, Clone, Deserialize)]
struct FetchArgs {
url: String,
}
#[derive(Debug, Clone, Deserialize)]
struct ShellOutput {
#[serde(default)]
exit_code: Option<i32>,
#[serde(default)]
stdout: Option<String>,
#[serde(default)]
stderr: Option<String>,
}
#[derive(Debug, Clone, Default)]
struct StreamingState {
assistant_text: Option<StreamingText>,
thinking_text: Option<StreamingText>,
}
#[derive(Debug, Clone)]
struct StreamingText {
index: usize,
content: String,
}

View File

@@ -0,0 +1,180 @@
use std::{
fs::{self, OpenOptions},
io::{self, Result, Write},
path::PathBuf,
str::FromStr,
};
use serde::{Deserialize, Serialize};
use crate::executors::acp::AcpEvent;
/// Manages session persistence and state for ACP interactions
pub struct SessionManager {
base_dir: PathBuf,
}
impl SessionManager {
/// Create a new session manager with the given namespace
pub fn new(namespace: impl Into<String>) -> Result<Self> {
let namespace = namespace.into();
let mut vk_dir = dirs::home_dir()
.ok_or_else(|| io::Error::other("Could not determine home directory"))?
.join(".vibe-kanban");
if cfg!(debug_assertions) {
vk_dir = vk_dir.join("dev");
}
let base_dir = vk_dir.join(&namespace);
fs::create_dir_all(&base_dir)?;
Ok(Self { base_dir })
}
/// Get the file path for a session
fn session_file_path(&self, session_id: &str) -> PathBuf {
self.base_dir.join(format!("{session_id}.jsonl"))
}
/// Append a raw JSON line to the session log
///
/// We normalize ACP payloads by:
/// - Removing top-level `sessionId`
/// - Unwrapping the `update` envelope (store its object directly)
/// - Dropping top-level `options` (permission menu). Note: `options` is
/// mutually exclusive with `update`, so when `update` is present we do not
/// perform any `options` stripping.
pub fn append_raw_line(&self, session_id: &str, raw_json: &str) -> Result<()> {
let Some(normalized) = Self::normalize_session_event(raw_json) else {
return Ok(());
};
let path = self.session_file_path(session_id);
let mut file = OpenOptions::new().create(true).append(true).open(path)?;
writeln!(file, "{normalized}")?;
Ok(())
}
/// Attempt to normalize a raw ACP JSON event into a cleaner shape.
/// Rules:
/// - Remove top-level `sessionId` always.
/// - If `update` is present with an object that has `sessionUpdate`, emit
/// a single-key object where key = camelCase(sessionUpdate) and value =
/// the `update` object minus `sessionUpdate`.
/// - If `update` is absent, remove only top-level `options`.
///
/// Returns None if the input is not a JSON object.
fn normalize_session_event(raw_json: &str) -> Option<String> {
let mut event = AcpEvent::from_str(raw_json).ok()?;
match event {
AcpEvent::SessionStart(..)
| AcpEvent::Error(..)
| AcpEvent::Done(..)
| AcpEvent::Other(..) => return None,
AcpEvent::User(..)
| AcpEvent::Message(..)
| AcpEvent::Thought(..)
| AcpEvent::ToolCall(..)
| AcpEvent::ToolUpdate(..)
| AcpEvent::Plan(..)
| AcpEvent::AvailableCommands(..)
| AcpEvent::CurrentMode(..) => {}
AcpEvent::RequestPermission(req) => event = AcpEvent::ToolUpdate(req.tool_call),
}
match event {
AcpEvent::User(prompt) => {
return serde_json::to_string(&serde_json::json!({"user": prompt})).ok();
}
AcpEvent::Message(ref content) | AcpEvent::Thought(ref content) => {
if let agent_client_protocol::ContentBlock::Text(text) = content {
// Special simplification for pure text messages
let key = if let AcpEvent::Message(_) = event {
"assistant"
} else {
"thinking"
};
return serde_json::to_string(&serde_json::json!({ key: text.text })).ok();
}
}
_ => {}
}
serde_json::to_string(&event).ok()
}
/// Read the raw JSONL content of a session
pub fn read_session_raw(&self, session_id: &str) -> Result<String> {
let path = self.session_file_path(session_id);
if !path.exists() {
return Ok(String::new());
}
fs::read_to_string(path)
}
/// Fork a session to create a new one with the same history
pub fn fork_session(&self, old_id: &str, new_id: &str) -> Result<()> {
let old_path = self.session_file_path(old_id);
let new_path = self.session_file_path(new_id);
if old_path.exists() {
fs::copy(&old_path, &new_path)?;
} else {
// Create empty new file if old doesn't exist
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&new_path)?;
}
Ok(())
}
/// Delete a session
pub fn delete_session(&self, session_id: &str) -> Result<()> {
let path = self.session_file_path(session_id);
if path.exists() {
fs::remove_file(path)?;
}
Ok(())
}
/// Generate a resume prompt from session history
pub fn generate_resume_prompt(&self, session_id: &str, current_prompt: &str) -> Result<String> {
let session_context = self.read_session_raw(session_id)?;
Ok(format!(
concat!(
"RESUME CONTEXT FOR CONTINUING TASK\n\n",
"=== EXECUTION HISTORY ===\n",
"The following is the conversation history from this session:\n",
"{}\n\n",
"=== CURRENT REQUEST ===\n",
"{}\n\n",
"=== INSTRUCTIONS ===\n",
"You are continuing work on the above task. The execution history shows ",
"the previous conversation in this session. Please continue from where ",
"the previous execution left off, taking into account all the context provided above."
),
session_context, current_prompt
))
}
}
/// Session metadata stored separately from events
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMetadata {
pub session_id: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub parent_session: Option<String>,
pub tags: Vec<String>,
}

View File

@@ -1,7 +1,7 @@
use std::{path::Path, process::Stdio, sync::Arc};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use command_group::AsyncCommandGroup;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command};
@@ -11,7 +11,7 @@ use workspace_utils::{msg_store::MsgStore, shell::get_shell_command};
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{
AppendPrompt, ExecutorError, StandardCodingAgentExecutor,
AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor,
claude::{ClaudeLogProcessor, HistoryStrategy},
},
logs::{stderr_processor::normalize_stderr_logs, utils::EntryIndexProvider},
@@ -44,11 +44,7 @@ impl Amp {
#[async_trait]
impl StandardCodingAgentExecutor for Amp {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = self.build_command_builder().build_initial();
@@ -72,7 +68,7 @@ impl StandardCodingAgentExecutor for Amp {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
async fn spawn_follow_up(
@@ -80,7 +76,7 @@ impl StandardCodingAgentExecutor for Amp {
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<SpawnedChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
@@ -142,7 +138,7 @@ impl StandardCodingAgentExecutor for Amp {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &Path) {

View File

@@ -3,7 +3,7 @@ use std::os::unix::fs::PermissionsExt;
use std::{path::Path, process::Stdio, sync::Arc};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use command_group::AsyncCommandGroup;
use futures::StreamExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -21,7 +21,7 @@ use workspace_utils::{
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor},
executors::{AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor},
logs::{
ActionType, FileChange, NormalizedEntry, NormalizedEntryType, TodoItem, ToolStatus,
stderr_processor::normalize_stderr_logs,
@@ -121,11 +121,7 @@ impl ClaudeCode {
#[async_trait]
impl StandardCodingAgentExecutor for ClaudeCode {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let command_builder = self.build_command_builder().await;
let mut base_command = command_builder.build_initial();
@@ -158,7 +154,7 @@ impl StandardCodingAgentExecutor for ClaudeCode {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
async fn spawn_follow_up(
@@ -166,7 +162,7 @@ impl StandardCodingAgentExecutor for ClaudeCode {
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let command_builder = self.build_command_builder().await;
// Build follow-up command with --resume {session_id}
@@ -201,7 +197,7 @@ impl StandardCodingAgentExecutor for ClaudeCode {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &Path) {

View File

@@ -7,7 +7,7 @@ use std::{
};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use command_group::AsyncCommandGroup;
use futures::StreamExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -24,7 +24,8 @@ use workspace_utils::{
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{
AppendPrompt, ExecutorError, StandardCodingAgentExecutor, codex::session::SessionHandler,
AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor,
codex::session::SessionHandler,
},
logs::{
ActionType, FileChange, NormalizedEntry, NormalizedEntryType, ToolStatus,
@@ -126,11 +127,7 @@ impl Codex {
#[async_trait]
impl StandardCodingAgentExecutor for Codex {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let codex_command = self.build_command_builder().build_initial();
@@ -156,7 +153,7 @@ impl StandardCodingAgentExecutor for Codex {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
async fn spawn_follow_up(
@@ -164,7 +161,7 @@ impl StandardCodingAgentExecutor for Codex {
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<SpawnedChild, ExecutorError> {
// Fork rollout: copy and assign a new session id so each execution has a unique session
let (_rollout_file_path, new_session_id) = SessionHandler::fork_rollout_file(session_id)
.map_err(|e| ExecutorError::SpawnError(std::io::Error::other(e)))?;
@@ -196,7 +193,7 @@ impl StandardCodingAgentExecutor for Codex {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &Path) {

View File

@@ -2,7 +2,7 @@ use core::str;
use std::{path::Path, process::Stdio, sync::Arc, time::Duration};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use command_group::AsyncCommandGroup;
use futures::StreamExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -20,7 +20,7 @@ use workspace_utils::{
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor},
executors::{AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor},
logs::{
ActionType, FileChange, NormalizedEntry, NormalizedEntryType, TodoItem, ToolStatus,
plain_text_processor::PlainTextLogProcessor,
@@ -61,11 +61,7 @@ impl Cursor {
#[async_trait]
impl StandardCodingAgentExecutor for Cursor {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let agent_cmd = self.build_command_builder().build_initial();
@@ -88,7 +84,7 @@ impl StandardCodingAgentExecutor for Cursor {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
async fn spawn_follow_up(
@@ -96,7 +92,7 @@ impl StandardCodingAgentExecutor for Cursor {
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let agent_cmd = self
.build_command_builder()
@@ -121,7 +117,7 @@ impl StandardCodingAgentExecutor for Cursor {
stdin.shutdown().await?;
}
Ok(child)
Ok(child.into())
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, worktree_path: &Path) {

View File

@@ -1,30 +1,15 @@
use std::{
path::{Path, PathBuf},
process::Stdio,
sync::Arc,
};
use std::{path::Path, sync::Arc};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use futures::{StreamExt, stream::BoxStream};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::{
fs::{self, OpenOptions},
io::AsyncWriteExt,
process::Command,
};
use ts_rs::TS;
use workspace_utils::{msg_store::MsgStore, shell::get_shell_command};
use workspace_utils::msg_store::MsgStore;
pub use super::acp::AcpAgentHarness;
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor},
logs::{
NormalizedEntry, NormalizedEntryType, plain_text_processor::PlainTextLogProcessor,
stderr_processor::normalize_stderr_logs, utils::EntryIndexProvider,
},
stdout_dup,
executors::{AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor},
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)]
@@ -69,352 +54,42 @@ impl Gemini {
builder = builder.extend_params(["--yolo"]);
}
builder = builder.extend_params(["--experimental-acp"]);
apply_overrides(builder, &self.cmd)
}
}
#[async_trait]
impl StandardCodingAgentExecutor for Gemini {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let gemini_command = self.build_command_builder().build_initial();
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let harness = AcpAgentHarness::new();
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(gemini_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;
// Write prompt to stdin
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(combined_prompt.as_bytes()).await?;
stdin.shutdown().await?;
}
// Duplicate stdout for session logging
let duplicate_stdout = stdout_dup::duplicate_stdout(&mut child)?;
tokio::spawn(Self::record_session(
duplicate_stdout,
current_dir.to_path_buf(),
prompt.to_string(),
false,
));
Ok(child)
let gemini_command = self.build_command_builder().build_initial();
harness
.spawn_with_command(current_dir, combined_prompt, gemini_command)
.await
}
async fn spawn_follow_up(
&self,
current_dir: &Path,
prompt: &str,
_session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// Build comprehensive prompt with session context
let followup_prompt = self.build_followup_prompt(current_dir, prompt).await?;
let (shell_cmd, shell_arg) = get_shell_command();
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
let harness = AcpAgentHarness::new();
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let gemini_command = self.build_command_builder().build_follow_up(&[]);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(gemini_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;
// Write comprehensive prompt to stdin
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(followup_prompt.as_bytes()).await?;
stdin.shutdown().await?;
harness
.spawn_follow_up_with_command(current_dir, combined_prompt, session_id, gemini_command)
.await
}
// Duplicate stdout for session logging (resume existing session)
let duplicate_stdout = stdout_dup::duplicate_stdout(&mut child)?;
tokio::spawn(Self::record_session(
duplicate_stdout,
current_dir.to_path_buf(),
prompt.to_string(),
true,
));
Ok(child)
}
/// Parses both stderr and stdout logs for Gemini executor using PlainTextLogProcessor.
///
/// - Stderr: uses the standard stderr log processor, which formats stderr output as ErrorMessage entries.
/// - Stdout: applies custom `format_chunk` to insert line breaks on period-to-capital transitions,
/// then create assitant messages from the output.
///
/// Each entry is converted into an `AssistantMessage` or `ErrorMessage` and emitted as patches.
///
/// # Example
///
/// ```rust,ignore
/// gemini.normalize_logs(msg_store.clone(), &worktree_path);
/// ```
///
/// Subsequent queries to `msg_store` will receive JSON patches representing parsed log entries.
/// Sets up log normalization for the Gemini executor:
/// - stderr via [`normalize_stderr_logs`]
/// - stdout via [`PlainTextLogProcessor`] with Gemini-specific formatting and default heuristics
fn normalize_logs(&self, msg_store: Arc<MsgStore>, worktree_path: &Path) {
let entry_index_counter = EntryIndexProvider::start_from(&msg_store);
normalize_stderr_logs(msg_store.clone(), entry_index_counter.clone());
// Send session ID to msg_store to enable follow-ups
msg_store.push_session_id(
worktree_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
);
// Normalize Agent logs
tokio::spawn(async move {
let mut stdout = msg_store.stdout_chunked_stream();
// Create a processor with Gemini-specific formatting
let mut processor = Self::create_gemini_style_processor(entry_index_counter);
while let Some(Ok(chunk)) = stdout.next().await {
for patch in processor.process(chunk) {
msg_store.push_patch(patch);
}
}
});
super::acp::normalize_logs(msg_store, worktree_path);
}
// MCP configuration methods
fn default_mcp_config_path(&self) -> Option<std::path::PathBuf> {
dirs::home_dir().map(|home| home.join(".gemini").join("settings.json"))
}
}
impl Gemini {
/// Creates a PlainTextLogProcessor that applies Gemini's sentence-break heuristics.
///
/// This processor formats chunks by inserting line breaks at period-to-capital transitions
/// and filters out Gemini CLI noise messages.
pub(crate) fn create_gemini_style_processor(
index_provider: EntryIndexProvider,
) -> PlainTextLogProcessor {
PlainTextLogProcessor::builder()
.normalized_entry_producer(Box::new(|content: String| NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::AssistantMessage,
content,
metadata: None,
}))
.format_chunk(Box::new(|partial, chunk| {
Self::format_stdout_chunk(&chunk, partial.unwrap_or(""))
}))
.transform_lines(Box::new(|lines: &mut Vec<String>| {
lines.retain(|l| l != "Data collection is disabled.\n");
}))
.index_provider(index_provider)
.build()
}
/// Make Gemini output more readable by inserting line breaks where periods are directly
/// followed by capital letters (common Gemini CLI formatting issue).
/// Handles both intra-chunk and cross-chunk period-to-capital transitions.
fn format_stdout_chunk(content: &str, accumulated_message: &str) -> String {
let mut result = String::with_capacity(content.len() + 100);
let chars: Vec<char> = content.chars().collect();
// Check for cross-chunk boundary: previous chunk ended with period, current starts with capital
if !accumulated_message.is_empty() && !content.is_empty() {
let ends_with_period = accumulated_message.ends_with('.');
let starts_with_capital = chars
.first()
.map(|&c| c.is_uppercase() && c.is_alphabetic())
.unwrap_or(false);
if ends_with_period && starts_with_capital {
result.push('\n');
}
}
// Handle intra-chunk period-to-capital transitions
for i in 0..chars.len() {
result.push(chars[i]);
// Check if current char is '.' and next char is uppercase letter (no space between)
if chars[i] == '.' && i + 1 < chars.len() {
let next_char = chars[i + 1];
if next_char.is_uppercase() && next_char.is_alphabetic() {
result.push('\n');
}
}
}
result
}
async fn record_session(
mut stdout_stream: BoxStream<'static, std::io::Result<String>>,
current_dir: PathBuf,
prompt: String,
resume_session: bool,
) {
let file_path = Self::get_session_file_path(&current_dir).await;
// Ensure the directory exists
if let Some(parent) = file_path.parent() {
let _ = fs::create_dir_all(parent).await;
}
// If not resuming session, delete the file first
if !resume_session {
let _ = fs::remove_file(&file_path).await;
}
// Always append from here on
let mut file = match OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)
.await
{
Ok(file) => file,
Err(_) => {
tracing::error!("Failed to open session file: {:?}", file_path);
return;
}
};
// Write user message as normalized entry
let mut user_message_json = serde_json::to_string(&NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::UserMessage,
content: prompt,
metadata: None,
})
.unwrap_or_default();
user_message_json.push('\n');
let _ = file.write_all(user_message_json.as_bytes()).await;
// Read stdout incrementally and append assistant message
let mut stdout_content = String::new();
// Read stdout until the process finishes
while let Some(Ok(chunk)) = stdout_stream.next().await {
stdout_content.push_str(&chunk);
}
let mut assistant_message_json = serde_json::to_string(&NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::AssistantMessage,
content: stdout_content,
metadata: None,
})
.unwrap_or_default();
assistant_message_json.push('\n');
let _ = file.write_all(assistant_message_json.as_bytes()).await;
}
/// Build comprehensive prompt with session context for follow-up execution
async fn build_followup_prompt(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<String, ExecutorError> {
let session_file_path = Self::get_session_file_path(current_dir).await;
// Read existing session context
let session_context = fs::read_to_string(&session_file_path).await.map_err(|e| {
ExecutorError::FollowUpNotSupported(format!(
"No existing Gemini session found for this worktree. Session file not found at {session_file_path:?}: {e}"
))
})?;
Ok(format!(
r#"RESUME CONTEXT FOR CONTINUING TASK
=== EXECUTION HISTORY ===
The following is the conversation history from this session:
{session_context}
=== CURRENT REQUEST ===
{prompt}
=== INSTRUCTIONS ===
You are continuing work on the above task. The execution history shows the previous conversation in this session. Please continue from where the previous execution left off, taking into account all the context provided above.{}
"#,
self.append_prompt.get().unwrap_or_default(),
))
}
fn get_sessions_base_dir() -> PathBuf {
// Determine base directory under user's home
let home = dirs::home_dir().unwrap_or_else(std::env::temp_dir);
if cfg!(debug_assertions) {
home.join(".vibe-kanban")
.join("dev")
.join("gemini_sessions")
} else {
home.join(".vibe-kanban").join("gemini_sessions")
}
}
fn get_legacy_sessions_base_dir() -> PathBuf {
// Previous location was under the temp-based vibe-kanban dir
workspace_utils::path::get_vibe_kanban_temp_dir().join("gemini_sessions")
}
async fn get_session_file_path(current_dir: &Path) -> PathBuf {
let file_name = current_dir.file_name().unwrap_or_default();
let new_base = Self::get_sessions_base_dir();
let new_path = new_base.join(file_name);
// Ensure base directory exists
if let Some(parent) = new_path.parent() {
let _ = fs::create_dir_all(parent).await;
}
// If the new file doesn't exist yet, try to migrate from legacy location
let new_exists = fs::metadata(&new_path).await.is_ok();
if !new_exists {
let legacy_path = Self::get_legacy_sessions_base_dir().join(file_name);
if fs::metadata(&legacy_path).await.is_ok() {
if let Err(e) = fs::rename(&legacy_path, &new_path).await {
tracing::warn!(
"Failed to migrate Gemini session from {:?} to {:?}: {}",
legacy_path,
new_path,
e
);
} else {
tracing::info!(
"Migrated Gemini session file from legacy temp directory to persistent directory: {:?}",
new_path
);
}
}
}
new_path
}
}

View File

@@ -20,6 +20,7 @@ use crate::{
mcp_config::McpConfig,
};
pub mod acp;
pub mod amp;
pub mod claude;
pub mod codex;
@@ -125,7 +126,9 @@ impl CodingAgent {
Self::ClaudeCode(_) => vec![BaseAgentCapability::SessionFork],
Self::Amp(_) => vec![BaseAgentCapability::SessionFork],
Self::Codex(_) => vec![BaseAgentCapability::SessionFork],
Self::Gemini(_) | Self::Opencode(_) | Self::Cursor(_) | Self::QwenCode(_) => vec![],
Self::Gemini(_) => vec![BaseAgentCapability::SessionFork],
Self::QwenCode(_) => vec![BaseAgentCapability::SessionFork],
Self::Opencode(_) | Self::Cursor(_) => vec![],
}
}
}
@@ -133,17 +136,13 @@ impl CodingAgent {
#[async_trait]
#[enum_dispatch(CodingAgent)]
pub trait StandardCodingAgentExecutor {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError>;
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError>;
async fn spawn_follow_up(
&self,
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError>;
) -> Result<SpawnedChild, ExecutorError>;
fn normalize_logs(&self, _raw_logs_event_store: Arc<MsgStore>, _worktree_path: &Path);
// MCP configuration methods
@@ -156,6 +155,26 @@ pub trait StandardCodingAgentExecutor {
}
}
/// Optional exit notification from an executor.
/// When this receiver resolves, the container should gracefully stop the process
/// and mark it as successful (exit code 0).
pub type ExecutorExitSignal = tokio::sync::oneshot::Receiver<()>;
#[derive(Debug)]
pub struct SpawnedChild {
pub child: AsyncGroupChild,
pub exit_signal: Option<ExecutorExitSignal>,
}
impl From<AsyncGroupChild> for SpawnedChild {
fn from(child: AsyncGroupChild) -> Self {
Self {
child,
exit_signal: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)]
#[serde(transparent)]
#[schemars(

View File

@@ -7,7 +7,7 @@ use std::{
};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use command_group::AsyncCommandGroup;
use fork_stream::StreamExt as _;
use futures::{StreamExt, future::ready, stream::BoxStream};
use lazy_static::lazy_static;
@@ -21,7 +21,7 @@ use workspace_utils::{msg_store::MsgStore, path::make_path_relative, shell::get_
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{
AppendPrompt, ExecutorError, StandardCodingAgentExecutor,
AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor,
opencode::share_bridge::Bridge as ShareBridge,
},
logs::{
@@ -128,11 +128,7 @@ impl Opencode {
#[async_trait]
impl StandardCodingAgentExecutor for Opencode {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
// Start a dedicated local share bridge bound to this opencode process
let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?;
let (shell_cmd, shell_arg) = get_shell_command();
@@ -189,7 +185,7 @@ impl StandardCodingAgentExecutor for Opencode {
tracing::debug!("Opencode process stdout closed");
bridge_for_shutdown.shutdown().await;
});
Ok(child)
Ok(child.into())
}
async fn spawn_follow_up(
@@ -197,7 +193,7 @@ impl StandardCodingAgentExecutor for Opencode {
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<SpawnedChild, ExecutorError> {
// Start a dedicated local share bridge bound to this opencode process
let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?;
let (shell_cmd, shell_arg) = get_shell_command();
@@ -253,7 +249,7 @@ impl StandardCodingAgentExecutor for Opencode {
while let Some(_chunk) = dup_stream.next().await {}
bridge_for_shutdown.shutdown().await;
});
Ok(child)
Ok(child.into())
}
/// Normalize logs for OpenCode executor

View File

@@ -1,17 +1,17 @@
use std::{path::Path, process::Stdio, sync::Arc};
use std::{path::Path, sync::Arc};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command};
use ts_rs::TS;
use workspace_utils::{msg_store::MsgStore, shell::get_shell_command};
use workspace_utils::msg_store::MsgStore;
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor, gemini::Gemini},
logs::{stderr_processor::normalize_stderr_logs, utils::EntryIndexProvider},
executors::{
AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor,
gemini::AcpAgentHarness,
},
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)]
@@ -31,42 +31,20 @@ impl QwenCode {
if self.yolo.unwrap_or(false) {
builder = builder.extend_params(["--yolo"]);
}
builder = builder.extend_params(["--experimental-acp"]);
apply_overrides(builder, &self.cmd)
}
}
#[async_trait]
impl StandardCodingAgentExecutor for QwenCode {
async fn spawn(
&self,
current_dir: &Path,
prompt: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let qwen_command = self.build_command_builder().build_initial();
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&qwen_command);
let mut child = command.group_spawn()?;
// Feed the prompt in, then close the pipe
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(combined_prompt.as_bytes()).await?;
stdin.shutdown().await?;
}
Ok(child)
let harness = AcpAgentHarness::with_session_namespace("qwen_sessions");
harness
.spawn_with_command(current_dir, combined_prompt, qwen_command)
.await
}
async fn spawn_follow_up(
@@ -74,64 +52,17 @@ impl StandardCodingAgentExecutor for QwenCode {
current_dir: &Path,
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let qwen_command = self
.build_command_builder()
.build_follow_up(&["--resume".to_string(), session_id.to_string()]);
) -> Result<SpawnedChild, ExecutorError> {
let qwen_command = self.build_command_builder().build_follow_up(&[]);
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&qwen_command);
let mut child = command.group_spawn()?;
// Feed the followup prompt in, then close the pipe
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(combined_prompt.as_bytes()).await?;
stdin.shutdown().await?;
let harness = AcpAgentHarness::with_session_namespace("qwen_sessions");
harness
.spawn_follow_up_with_command(current_dir, combined_prompt, session_id, qwen_command)
.await
}
Ok(child)
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &Path) {
// QwenCode has similar output format to Gemini CLI
// Use Gemini's proven sentence-break formatting instead of simple replace
let entry_index_counter = EntryIndexProvider::start_from(&msg_store);
normalize_stderr_logs(msg_store.clone(), entry_index_counter.clone());
// Send session ID to msg_store to enable follow-ups
msg_store.push_session_id(
current_dir
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
);
// Use Gemini's log processor for consistent formatting
tokio::spawn(async move {
use futures::StreamExt;
let mut stdout = msg_store.stdout_chunked_stream();
// Use Gemini's proven sentence-break heuristics
let mut processor = Gemini::create_gemini_style_processor(entry_index_counter);
while let Some(Ok(chunk)) = stdout.next().await {
for patch in processor.process(chunk) {
msg_store.push_patch(patch);
}
}
});
fn normalize_logs(&self, msg_store: Arc<MsgStore>, worktree_path: &Path) {
crate::executors::acp::normalize_logs(msg_store, worktree_path);
}
// MCP configuration methods

View File

@@ -159,6 +159,25 @@ pub fn tee_stdout_with_appender(
))
}
/// Create a fresh stdout pipe for the child process and return an async writer
/// that writes directly to the child's new stdout.
///
/// This helper does not read or duplicate any existing stdout; it simply
/// replaces the child's stdout with a new pipe reader and returns the
/// corresponding async writer for the caller to write into.
pub fn create_stdout_pipe_writer<'b>(
child: &mut AsyncGroupChild,
) -> Result<impl AsyncWrite + 'b, ExecutorError> {
// Create replacement pipe and set as new child stdout
let (pipe_reader, pipe_writer) = os_pipe::pipe().map_err(|e| {
ExecutorError::Io(std::io::Error::other(format!("Failed to create pipe: {e}")))
})?;
child.inner().stdout = Some(wrap_fd_as_child_stdout(pipe_reader)?);
// Return async writer to the caller
wrap_fd_as_tokio_writer(pipe_writer)
}
// =========================================
// OS file descriptor helper functions
// =========================================

View File

@@ -1,6 +1,7 @@
use std::{
collections::{HashMap, HashSet},
io,
os::unix::process::ExitStatusExt,
path::{Path, PathBuf},
sync::{
Arc,
@@ -40,7 +41,7 @@ use executors::{
},
},
};
use futures::{StreamExt, TryStreamExt, stream::select};
use futures::{FutureExt, StreamExt, TryStreamExt, stream::select};
use notify_debouncer_full::DebouncedEvent;
use serde_json::json;
use services::services::{
@@ -346,7 +347,11 @@ impl LocalContainerService {
/// Spawn a background task that polls the child process for completion and
/// cleans up the execution entry when it exits.
pub fn spawn_exit_monitor(&self, exec_id: &Uuid) -> JoinHandle<()> {
pub fn spawn_exit_monitor(
&self,
exec_id: &Uuid,
exit_signal: Option<tokio::sync::oneshot::Receiver<()>>,
) -> JoinHandle<()> {
let exec_id = *exec_id;
let child_store = self.child_store.clone();
let msg_stores = self.msg_stores.clone();
@@ -355,27 +360,36 @@ impl LocalContainerService {
let container = self.clone();
let analytics = self.analytics.clone();
let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id);
tokio::spawn(async move {
loop {
let status_opt = {
let child_lock = {
let map = child_store.read().await;
map.get(&exec_id)
.cloned()
.unwrap_or_else(|| panic!("Child handle missing for {exec_id}"))
};
let mut exit_signal_future = exit_signal
.map(|rx| rx.map(|_| ()).boxed()) // wait for signal
.unwrap_or_else(|| std::future::pending::<()>().boxed()); // no signal, stall forever
let mut child_handler = child_lock.write().await;
match child_handler.try_wait() {
Ok(Some(status)) => Some(Ok(status)),
Ok(None) => None,
Err(e) => Some(Err(e)),
let status_result: std::io::Result<std::process::ExitStatus>;
// Wait for process to exit, or exit signal from executor
tokio::select! {
// Exit signal.
// Some coding agent processes do not automatically exit after processing the user request; instead the executor
// signals when processing has finished to gracefully kill the process.
_ = &mut exit_signal_future => {
// Executor signaled completion: kill group and remember to force Completed(0)
if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() {
let mut child = child_lock.write().await ;
if let Err(err) = command::kill_process_group(&mut child).await {
tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err);
}
}
status_result = Ok(std::process::ExitStatus::from_raw(0));
}
// Process exit
exit_status_result = &mut process_exit_rx => {
status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e)));
}
}
};
// Update execution process and cleanup if exit
if let Some(status_result) = status_opt {
// Update execution process record with completion info
let (exit_code, status) = match status_result {
Ok(exit_status) => {
let code = exit_status.code().unwrap_or(-1) as i64;
@@ -390,13 +404,8 @@ impl LocalContainerService {
};
if !ExecutionProcess::was_killed(&db.pool, exec_id).await
&& let Err(e) = ExecutionProcess::update_completion(
&db.pool,
exec_id,
status.clone(),
exit_code,
)
.await
&& let Err(e) =
ExecutionProcess::update_completion(&db.pool, exec_id, status, exit_code).await
{
tracing::error!("Failed to update execution process completion: {}", e);
}
@@ -407,8 +416,6 @@ impl LocalContainerService {
tracing::warn!("Failed to update executor session summary: {}", e);
}
// (moved) capture after-head commit occurs later, after commit/next-action handling
if matches!(
ctx.execution_process.status,
ExecutionProcessStatus::Completed
@@ -418,34 +425,25 @@ impl LocalContainerService {
let changes_committed = match container.try_commit_changes(&ctx).await {
Ok(committed) => committed,
Err(e) => {
tracing::error!(
"Failed to commit changes after execution: {}",
e
);
tracing::error!("Failed to commit changes after execution: {}", e);
// Treat commit failures as if changes were made to be safe
true
}
};
// Determine whether to start the next action based on execution context
let should_start_next = if matches!(
ctx.execution_process.run_reason,
ExecutionProcessRunReason::CodingAgent
) {
// Skip CleanupScript when CodingAgent produced no changes
changes_committed
} else {
// SetupScript always proceeds to CodingAgent
true
};
if should_start_next {
// If the process exited successfully, start the next action
if let Err(e) = container.try_start_next_action(&ctx).await {
tracing::error!(
"Failed to start next action after completion: {}",
e
);
tracing::error!("Failed to start next action after completion: {}", e);
}
} else {
tracing::info!(
@@ -470,7 +468,7 @@ impl LocalContainerService {
}
}
// Fire event when CodingAgent execution has finished
// Fire analytics event when CodingAgent execution has finished
if config.read().await.analytics_enabled == Some(true)
&& matches!(
&ctx.execution_process.run_reason,
@@ -493,16 +491,11 @@ impl LocalContainerService {
if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await {
let worktree_dir = container.task_attempt_to_current_dir(&ctx.task_attempt);
if let Ok(head) = container.git().get_head_info(&worktree_dir)
&& let Err(e) = ExecutionProcess::update_after_head_commit(
&db.pool, exec_id, &head.oid,
)
&& let Err(e) =
ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid)
.await
{
tracing::warn!(
"Failed to update after_head_commit for {}: {}",
exec_id,
e
);
tracing::warn!("Failed to update after_head_commit for {}: {}", exec_id, e);
}
}
@@ -522,13 +515,44 @@ impl LocalContainerService {
// Cleanup child handle
child_store.write().await.remove(&exec_id);
break;
})
}
// still running, sleep and try again
pub fn spawn_os_exit_watcher(
&self,
exec_id: Uuid,
) -> tokio::sync::oneshot::Receiver<std::io::Result<std::process::ExitStatus>> {
let (tx, rx) = tokio::sync::oneshot::channel::<std::io::Result<std::process::ExitStatus>>();
let child_store = self.child_store.clone();
tokio::spawn(async move {
loop {
let child_lock = {
let map = child_store.read().await;
map.get(&exec_id).cloned()
};
if let Some(child_lock) = child_lock {
let mut child_handler = child_lock.write().await;
match child_handler.try_wait() {
Ok(Some(status)) => {
let _ = tx.send(Ok(status));
break;
}
Ok(None) => {}
Err(e) => {
let _ = tx.send(Err(e));
break;
}
}
} else {
let _ = tx.send(Err(io::Error::other(format!(
"Child handle missing for {exec_id}"
))));
break;
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
})
});
rx
}
pub fn dir_name_from_task_attempt(attempt_id: &Uuid, task_title: &str) -> String {
@@ -1043,15 +1067,16 @@ impl ContainerService for LocalContainerService {
let current_dir = PathBuf::from(container_ref);
// Create the child and stream, add to execution tracker
let mut child = executor_action.spawn(&current_dir).await?;
let mut spawned = executor_action.spawn(&current_dir).await?;
self.track_child_msgs_in_store(execution_process.id, &mut child)
self.track_child_msgs_in_store(execution_process.id, &mut spawned.child)
.await;
self.add_child_to_store(execution_process.id, child).await;
self.add_child_to_store(execution_process.id, spawned.child)
.await;
// Spawn exit monitor
let _hn = self.spawn_exit_monitor(&execution_process.id);
// Spawn unified exit monitor: watches OS exit and optional executor signal
let _hn = self.spawn_exit_monitor(&execution_process.id, spawned.exit_signal);
Ok(())
}