diff --git a/crates/executors/Cargo.toml b/crates/executors/Cargo.toml index 46af093b..ea10cfc9 100644 --- a/crates/executors/Cargo.toml +++ b/crates/executors/Cargo.toml @@ -38,7 +38,7 @@ convert_case = "0.6" sqlx = "0.8.6" axum = { workspace = true } shlex = "1.3.0" -agent-client-protocol = "0.8" +agent-client-protocol = { version = "0.8", features = ["unstable"] } codex-protocol = { git = "https://github.com/openai/codex.git", package = "codex-protocol", rev = "80d6a3868ef1414e0fb1c2e28a369f2ef4fa4dcc" } codex-app-server-protocol = { git = "https://github.com/openai/codex.git", package = "codex-app-server-protocol", rev = "80d6a3868ef1414e0fb1c2e28a369f2ef4fa4dcc" } codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "80d6a3868ef1414e0fb1c2e28a369f2ef4fa4dcc" } diff --git a/crates/executors/default_profiles.json b/crates/executors/default_profiles.json index 27ccc802..481e5044 100644 --- a/crates/executors/default_profiles.json +++ b/crates/executors/default_profiles.json @@ -79,6 +79,11 @@ "OPENCODE": { "DEFAULT": { "OPENCODE": {} + }, + "PLAN": { + "OPENCODE": { + "mode": "plan" + } } }, "QWEN_CODE": { diff --git a/crates/executors/src/executors/acp/harness.rs b/crates/executors/src/executors/acp/harness.rs index 95beec3a..f987d832 100644 --- a/crates/executors/src/executors/acp/harness.rs +++ b/crates/executors/src/executors/acp/harness.rs @@ -26,6 +26,8 @@ use crate::{ /// Reusable harness for ACP-based conns (Gemini, Qwen, etc.) pub struct AcpAgentHarness { session_namespace: String, + model: Option, + mode: Option, } impl Default for AcpAgentHarness { @@ -40,6 +42,8 @@ impl AcpAgentHarness { pub fn new() -> Self { Self { session_namespace: "gemini_sessions".to_string(), + model: None, + mode: None, } } @@ -47,9 +51,21 @@ impl AcpAgentHarness { pub fn with_session_namespace(namespace: impl Into) -> Self { Self { session_namespace: namespace.into(), + model: None, + mode: None, } } + pub fn with_model(mut self, model: impl Into) -> Self { + self.model = Some(model.into()); + self + } + + pub fn with_mode(mut self, mode: impl Into) -> Self { + self.mode = Some(mode.into()); + self + } + pub async fn spawn_with_command( &self, current_dir: &Path, @@ -83,6 +99,8 @@ impl AcpAgentHarness { prompt, Some(exit_tx), self.session_namespace.clone(), + self.model.clone(), + self.mode.clone(), ) .await?; @@ -127,6 +145,8 @@ impl AcpAgentHarness { prompt, Some(exit_tx), self.session_namespace.clone(), + self.model.clone(), + self.mode.clone(), ) .await?; @@ -137,6 +157,7 @@ impl AcpAgentHarness { }) } + #[allow(clippy::too_many_arguments)] async fn bootstrap_acp_connection( child: &mut AsyncGroupChild, cwd: PathBuf, @@ -144,6 +165,8 @@ impl AcpAgentHarness { prompt: String, exit_signal: Option>, session_namespace: String, + model: Option, + mode: Option, ) -> Result<(), ExecutorError> { // Take child's stdio for ACP wiring let orig_stdout = child.inner().stdout.take().ok_or_else(|| { @@ -329,6 +352,32 @@ impl AcpAgentHarness { let _ = log_tx .send(AcpEvent::SessionStart(display_session_id.clone()).to_string()); + if let Some(model) = model.clone() { + match conn + .set_session_model(proto::SetSessionModelRequest::new( + proto::SessionId::new(acp_session_id.clone()), + model, + )) + .await + { + Ok(_) => {} + Err(e) => error!("Failed to set session mode: {}", e), + } + } + + if let Some(mode) = mode.clone() { + match conn + .set_session_mode(proto::SetSessionModeRequest::new( + proto::SessionId::new(acp_session_id.clone()), + mode, + )) + .await + { + Ok(_) => {} + Err(e) => error!("Failed to set session mode: {}", e), + } + } + // Start raw event forwarder and persistence let app_tx_clone = log_tx.clone(); let sess_id_for_writer = display_session_id.clone(); diff --git a/crates/executors/src/executors/acp/normalize_logs.rs b/crates/executors/src/executors/acp/normalize_logs.rs index fbfcda88..d954d482 100644 --- a/crates/executors/src/executors/acp/normalize_logs.rs +++ b/crates/executors/src/executors/acp/normalize_logs.rs @@ -8,14 +8,14 @@ use agent_client_protocol::{self as acp, SessionNotification}; use futures::StreamExt; use regex::Regex; use serde::Deserialize; -use tracing::debug; +use tracing::{debug, trace}; use workspace_utils::msg_store::MsgStore; pub use super::AcpAgentHarness; use super::AcpEvent; use crate::logs::{ - ActionType, FileChange, NormalizedEntry, NormalizedEntryError, NormalizedEntryType, ToolResult, - ToolResultValueType, ToolStatus as LogToolStatus, + ActionType, FileChange, NormalizedEntry, NormalizedEntryError, NormalizedEntryType, TodoItem, + ToolResult, ToolResultValueType, ToolStatus as LogToolStatus, stderr_processor::normalize_stderr_logs, utils::{ConversationPatch, EntryIndexProvider}, }; @@ -38,7 +38,7 @@ pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { let mut stdout_lines = msg_store.stdout_lines_stream(); while let Some(Ok(line)) = stdout_lines.next().await { if let Some(parsed) = AcpEventParser::parse_line(&line) { - debug!("Parsed ACP line: {:?}", parsed); + trace!("Parsed ACP line: {:?}", parsed); match parsed { AcpEvent::SessionStart(id) => { if !stored_session_id { @@ -67,6 +67,9 @@ pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { if let agent_client_protocol::ContentBlock::Text(text) = content { let is_new = streaming.assistant_text.is_none(); if is_new { + if text.text == "\n" { + continue; + } let idx = entry_index.next(); streaming.assistant_text = Some(StreamingText { index: idx, @@ -121,15 +124,33 @@ pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { AcpEvent::Plan(plan) => { streaming.assistant_text = None; streaming.thinking_text = None; - let mut body = String::from("Plan:\n"); - for (i, e) in plan.entries.iter().enumerate() { - body.push_str(&format!("{}. {}\n", i + 1, e.content)); - } + let todos: Vec = plan + .entries + .iter() + .map(|e| TodoItem { + content: e.content.clone(), + status: serde_json::to_value(&e.status) + .ok() + .and_then(|v| v.as_str().map(|s| s.to_string())) + .unwrap_or_else(|| "unknown".to_string()), + priority: serde_json::to_value(&e.priority) + .ok() + .and_then(|v| v.as_str().map(|s| s.to_string())), + }) + .collect(); + let idx = entry_index.next(); let entry = NormalizedEntry { timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content: body, + entry_type: NormalizedEntryType::ToolUse { + tool_name: "plan".to_string(), + action_type: ActionType::TodoManagement { + todos, + operation: "update".to_string(), + }, + status: LogToolStatus::Success, + }, + content: "Plan updated".to_string(), metadata: None, }; msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry)); @@ -186,7 +207,7 @@ pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { .map(|s| s.title.clone()) .or_else(|| Some("".to_string())); } - debug!("Got tool call update: {:?}", update); + trace!("Got tool call update: {:?}", update); if let Ok(tc) = agent_client_protocol::ToolCall::try_from(update.clone()) { handle_tool_call( &tc, @@ -282,10 +303,9 @@ pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { // Prefer structured raw_output, else fallback to aggregated text content let completed = matches!(tc.status, agent_client_protocol::ToolCallStatus::Completed); - tracing::debug!( + trace!( "Mapping execute tool call, completed: {}, command: {}", - completed, - command + completed, command ); let tc_exit_status = match tc.status { agent_client_protocol::ToolCallStatus::Completed => { @@ -463,7 +483,10 @@ pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { if tc.id.0.starts_with("read_many_files") { "Read files".to_string() } else { - tc.title.clone() + tc.path + .as_ref() + .map(|p| p.display().to_string()) + .unwrap_or_else(|| tc.title.clone()) } } _ => tc.title.clone(), diff --git a/crates/executors/src/executors/mod.rs b/crates/executors/src/executors/mod.rs index 4dbbc8c7..6aababc7 100644 --- a/crates/executors/src/executors/mod.rs +++ b/crates/executors/src/executors/mod.rs @@ -159,13 +159,14 @@ impl CodingAgent { | Self::Amp(_) | Self::Gemini(_) | Self::QwenCode(_) - | Self::Droid(_) => vec![BaseAgentCapability::SessionFork], + | Self::Droid(_) + | Self::Opencode(_) => vec![BaseAgentCapability::SessionFork], Self::Codex(_) => vec![ BaseAgentCapability::SessionFork, BaseAgentCapability::SetupHelper, ], Self::CursorAgent(_) => vec![BaseAgentCapability::SetupHelper], - Self::Opencode(_) | Self::Copilot(_) => vec![], + Self::Copilot(_) => vec![], } } } diff --git a/crates/executors/src/executors/opencode.rs b/crates/executors/src/executors/opencode.rs index 3fb3cabd..2c2c402b 100644 --- a/crates/executors/src/executors/opencode.rs +++ b/crates/executors/src/executors/opencode.rs @@ -1,129 +1,41 @@ -mod share_bridge; - -use std::{ - path::{Path, PathBuf}, - process::Stdio, - sync::{Arc, LazyLock}, -}; +use std::{path::Path, sync::Arc}; use async_trait::async_trait; -use command_group::AsyncCommandGroup; -use fork_stream::StreamExt as _; -use futures::{StreamExt, future::ready, stream::BoxStream}; -use regex::Regex; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::{io::AsyncWriteExt, process::Command}; use ts_rs::TS; -use workspace_utils::{msg_store::MsgStore, path::make_path_relative}; +use workspace_utils::msg_store::MsgStore; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, env::ExecutionEnv, executors::{ AppendPrompt, AvailabilityInfo, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, - opencode::share_bridge::Bridge as ShareBridge, + acp::AcpAgentHarness, }, - logs::{ - ActionType, FileChange, NormalizedEntry, NormalizedEntryError, NormalizedEntryType, - TodoItem, ToolStatus, utils::EntryIndexProvider, - }, - stdout_dup, }; -// Typed structures for oc-share tool state -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -struct OcToolInput { - #[serde(rename = "filePath", default)] - file_path: Option, - #[serde(default)] - path: Option, - #[serde(default)] - include: Option, - #[serde(default)] - pattern: Option, - #[serde(default)] - command: Option, - #[serde(default)] - description: Option, - #[serde(default)] - url: Option, - #[serde(default)] - format: Option, - #[serde(default)] - timeout: Option, - #[serde(rename = "oldString", default)] - old_string: Option, - #[serde(rename = "newString", default)] - new_string: Option, - #[serde(rename = "replaceAll", default)] - replace_all: Option, - #[serde(default)] - content: Option, - #[serde(default)] - todos: Option>, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -struct OcToolMetadata { - #[serde(default)] - description: Option, - #[serde(default)] - exit: Option, - #[serde(default)] - diff: Option, - #[serde(default)] - count: Option, - #[serde(default)] - truncated: Option, - #[serde(default)] - preview: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -struct OcToolState { - #[serde(default)] - input: Option, - #[serde(default)] - metadata: Option, - #[serde(default)] - output: Option, - #[serde(default)] - status: Option, - #[serde(default)] - title: Option, -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] pub struct Opencode { #[serde(default)] pub append_prompt: AppendPrompt, #[serde(default, skip_serializing_if = "Option::is_none")] pub model: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub agent: Option, + #[serde(default, skip_serializing_if = "Option::is_none", alias = "agent")] + pub mode: Option, #[serde(flatten)] pub cmd: CmdOverrides, } impl Opencode { fn build_command_builder(&self) -> CommandBuilder { - let mut builder = CommandBuilder::new("npx -y opencode-ai@1.0.68 run").params([ - "--print-logs", - "--log-level", - "ERROR", - ]); - - if let Some(model) = &self.model { - builder = builder.extend_params(["--model", model]); - } - - if let Some(agent) = &self.agent { - builder = builder.extend_params(["--agent", agent]); - } - + let builder = CommandBuilder::new("npx -y opencode-ai@1.0.134 acp"); apply_overrides(builder, &self.cmd) } + + fn harness() -> AcpAgentHarness { + AcpAgentHarness::with_session_namespace("opencode_sessions") + } } #[async_trait] @@ -134,66 +46,25 @@ impl StandardCodingAgentExecutor for Opencode { prompt: &str, env: &ExecutionEnv, ) -> Result { - // Start a dedicated local share bridge bound to this opencode process - let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; - let command_parts = self.build_command_builder().build_initial()?; - let (program_path, args) = command_parts.into_resolved().await?; - let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(program_path); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) // Keep stdout but we won't use it - .stderr(Stdio::piped()) - .current_dir(current_dir) - .args(&args) - .env("NODE_NO_WARNINGS", "1") - .env("OPENCODE_AUTO_SHARE", "1") - .env("OPENCODE_API", bridge.base_url.clone()); - - env.clone() - .with_profile(&self.cmd) - .apply_to_command(&mut command); - - let mut child = match command.group_spawn() { - Ok(c) => c, - Err(e) => { - // If opencode fails to start, shut down the bridge to free the port - bridge.shutdown().await; - return Err(ExecutorError::SpawnError(e)); - } - }; - - // Write prompt to stdin - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; + let mut harness = Self::harness(); + if let Some(model) = &self.model { + harness = harness.with_model(model); } - // Transfer share events as lines for normalization through stdout - let (mut dup_stream, appender) = stdout_dup::tee_stdout_with_appender(&mut child)?; - let mut rx = bridge.subscribe(); - tokio::spawn(async move { - while let Ok(crate::executors::opencode::share_bridge::ShareEvent::Sync(mut req)) = - rx.recv().await - { - req.secret.clear(); - if let Ok(json) = serde_json::to_string(&req) { - appender.append_line(format!("{}{}", Opencode::SHARE_PREFIX, json)); - } - } - }); - - // Monitor child's stdout end; when it closes, shut down the bridge to release the port - let bridge_for_shutdown = bridge.clone(); - tokio::spawn(async move { - use futures::StreamExt; - while let Some(_chunk) = dup_stream.next().await {} - tracing::debug!("Opencode process stdout closed"); - bridge_for_shutdown.shutdown().await; - }); - Ok(child.into()) + if let Some(agent) = &self.mode { + harness = harness.with_mode(agent); + } + let opencode_command = self.build_command_builder().build_initial()?; + harness + .spawn_with_command( + current_dir, + combined_prompt, + opencode_command, + env, + &self.cmd, + ) + .await } async fn spawn_follow_up( @@ -203,117 +74,31 @@ impl StandardCodingAgentExecutor for Opencode { session_id: &str, env: &ExecutionEnv, ) -> Result { - // Start a dedicated local share bridge bound to this opencode process - let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; - let command_parts = self - .build_command_builder() - .build_follow_up(&["--session".to_string(), session_id.to_string()])?; - let (program_path, args) = command_parts.into_resolved().await?; - let combined_prompt = self.append_prompt.combine_prompt(prompt); - - let mut command = Command::new(program_path); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) // Keep stdout but we won't use it - .stderr(Stdio::piped()) - .current_dir(current_dir) - .args(&args) - .env("NODE_NO_WARNINGS", "1") - .env("OPENCODE_AUTO_SHARE", "1") - .env("OPENCODE_API", bridge.base_url.clone()); - - env.clone() - .with_profile(&self.cmd) - .apply_to_command(&mut command); - - let mut child = match command.group_spawn() { - Ok(c) => c, - Err(e) => { - bridge.shutdown().await; - return Err(ExecutorError::SpawnError(e)); - } - }; - - // Write prompt to stdin - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; + let mut harness = Self::harness(); + if let Some(model) = &self.model { + harness = harness.with_model(model); } - // Transfer share events as lines for normalization through stdout - let (mut dup_stream, appender) = stdout_dup::tee_stdout_with_appender(&mut child)?; - let mut rx = bridge.subscribe(); - tokio::spawn(async move { - while let Ok(crate::executors::opencode::share_bridge::ShareEvent::Sync(mut req)) = - rx.recv().await - { - req.secret.clear(); - if let Ok(json) = serde_json::to_string(&req) { - appender.append_line(format!("{}{}", Opencode::SHARE_PREFIX, json)); - } - } - }); - - let bridge_for_shutdown = bridge.clone(); - tokio::spawn(async move { - use futures::StreamExt; - while let Some(_chunk) = dup_stream.next().await {} - bridge_for_shutdown.shutdown().await; - }); - Ok(child.into()) + if let Some(agent) = &self.mode { + harness = harness.with_mode(agent); + } + let opencode_command = self.build_command_builder().build_follow_up(&[])?; + harness + .spawn_follow_up_with_command( + current_dir, + combined_prompt, + session_id, + opencode_command, + env, + &self.cmd, + ) + .await } - /// Normalize logs for OpenCode executor - /// - /// This implementation uses three separate threads: - /// 1. Session ID thread: read by line, search for session ID format, store it. - /// 2. Error log recognition thread: read by line, identify error log lines, store them as error messages. - /// 3. Main normalizer thread: read stderr by line, filter out log lines, send lines (with '\n' appended) to plain text normalizer, - /// then define predicate for split and create appropriate normalized entry (either assistant or tool call). fn normalize_logs(&self, msg_store: Arc, worktree_path: &Path) { - let entry_index_counter = EntryIndexProvider::start_from(&msg_store); - - let stderr_lines = msg_store - .stderr_lines_stream() - .filter_map(|res| ready(res.ok())) - .map(|line| strip_ansi_escapes::strip_str(&line)) - .fork(); - - // Log line: INFO 2025-08-05T10:17:26 +1ms service=session id=ses_786439b6dffe4bLqNBS4fGd7mJ - // error line: ! some error message - let log_lines = stderr_lines - .clone() - .filter(|line| { - ready(OPENCODE_LOG_REGEX.is_match(line) || LogUtils::is_error_line(line)) - }) - .boxed(); - - // Process log lines, which contain error messages. We now source session ID - // from the oc-share stream instead of stderr. - tokio::spawn(Self::process_opencode_log_lines( - log_lines, - msg_store.clone(), - entry_index_counter.clone(), - worktree_path.to_path_buf(), - )); - - // Also parse share events from stdout - let share_events = msg_store - .stdout_lines_stream() - .filter_map(|res| ready(res.ok())) - .filter(|line| ready(line.starts_with(Opencode::SHARE_PREFIX))) - .map(|line| line[Opencode::SHARE_PREFIX.len()..].to_string()) - .boxed(); - tokio::spawn(Self::process_share_events( - share_events, - worktree_path.to_path_buf(), - entry_index_counter.clone(), - msg_store.clone(), - )); + crate::executors::acp::normalize_logs(msg_store, worktree_path); } - // MCP configuration methods fn default_mcp_config_path(&self) -> Option { #[cfg(unix)] { @@ -342,656 +127,3 @@ impl StandardCodingAgentExecutor for Opencode { } } } -impl Opencode { - const SHARE_PREFIX: &'static str = "[oc-share] "; - async fn process_opencode_log_lines( - mut log_lines: BoxStream<'_, String>, - msg_store: Arc, - entry_index_counter: EntryIndexProvider, - _worktree_path: PathBuf, - ) { - while let Some(line) = log_lines.next().await { - if line.starts_with("ERROR") || LogUtils::is_error_line(&line) { - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ErrorMessage { - error_type: NormalizedEntryError::Other, - }, - content: line.clone(), - metadata: None, - }; - - // Create a patch for this single entry - let patch = crate::logs::utils::ConversationPatch::add_normalized_entry( - entry_index_counter.next(), - entry, - ); - msg_store.push_patch(patch); - } - } - } -} - -impl Opencode { - /// Parse share events and emit normalized patches - async fn process_share_events( - mut lines: BoxStream<'_, String>, - worktree_path: PathBuf, - entry_index_counter: EntryIndexProvider, - msg_store: Arc, - ) { - use std::collections::HashMap; - - use serde::Deserialize; - - use crate::logs::utils::ConversationPatch; - - #[derive(Debug, Clone, Deserialize)] - #[allow(dead_code)] - struct TimeObj { - #[serde(default)] - start: Option, - #[serde(default)] - end: Option, - } - - // Tool input/state structures are defined at module level (OcToolInput/OcToolState) - - #[derive(Debug, Clone, Deserialize)] - #[serde(tag = "type", rename_all = "lowercase")] - #[allow(clippy::large_enum_variant)] - #[allow(dead_code)] - enum ShareContent { - Text { - id: String, - #[serde(rename = "messageID")] - message_id: String, - #[serde(rename = "sessionID")] - session_id: String, - #[serde(default)] - text: Option, - #[serde(default)] - time: Option, - }, - Tool { - id: String, - #[serde(rename = "messageID")] - message_id: String, - #[serde(rename = "sessionID")] - session_id: String, - #[serde(rename = "callID", default)] - call_id: Option, - tool: String, - #[serde(default)] - state: Box>, - }, - } - - #[derive(Debug, Clone, Deserialize)] - #[allow(dead_code)] - struct ShareSyncEnvelope { - #[serde(rename = "sessionID")] - session_id: String, - secret: String, - key: String, - content: serde_json::Value, - } - - let mut index_by_part: HashMap = HashMap::new(); - // For text aggregation across parts under the same message (scoped by session) - // Key format: "{sessionID}:{messageID}" - let mut index_by_message: HashMap = HashMap::new(); - let mut message_parts_order: HashMap> = HashMap::new(); - let mut message_part_texts: HashMap> = HashMap::new(); - let mut message_aggregated: HashMap = HashMap::new(); - // Segment tracking per message to force splits after tool events - // base_key = "{sessionID}:{messageID}", seg_key = "{base_key}#" - let mut message_segment: HashMap = HashMap::new(); - let mut message_pending_break: HashMap = HashMap::new(); - let mut message_roles: HashMap = HashMap::new(); - let mut session_id_set = false; - - use std::collections::hash_map::Entry; - let mut upsert_by_part = |entry: NormalizedEntry, part_id: String| { - let (idx, is_new) = match index_by_part.entry(part_id) { - Entry::Occupied(o) => (*o.get(), false), - Entry::Vacant(v) => { - let i = entry_index_counter.next(); - v.insert(i); - (i, true) - } - }; - if is_new { - ConversationPatch::add_normalized_entry(idx, entry) - } else { - ConversationPatch::replace(idx, entry) - } - }; - - while let Some(line) = lines.next().await { - let Ok(env) = serde_json::from_str::(&line) else { - continue; - }; - // Record session id once from stream - if !session_id_set { - msg_store.push_session_id(env.session_id.clone()); - session_id_set = true; - } - - // Capture message role metadata from session/message events - if env.key.starts_with("session/message/") { - #[derive(Deserialize)] - struct MessageMeta { - id: String, - #[serde(default)] - role: Option, - } - if let Ok(meta) = serde_json::from_value::(env.content.clone()) - && let Some(role) = meta.role - { - message_roles.insert(meta.id.clone(), role.clone()); - - // If we have aggregated text already for this message, create or update the entry now - let base_key = format!("{}:{}", env.session_id, meta.id); - let seg = *message_segment.get(&base_key).unwrap_or(&0); - let seg_key = format!("{base_key}#{seg}"); - if let Some(content) = message_aggregated.get(&seg_key).cloned() { - // Skip emitting user role messages entirely - if role == "user" { - // Do not emit user text messages - } else { - let entry_type = match role.as_str() { - "system" => NormalizedEntryType::SystemMessage, - _ => NormalizedEntryType::AssistantMessage, - }; - use std::collections::hash_map::Entry as HmEntry; - match index_by_message.entry(seg_key) { - HmEntry::Occupied(o) => { - let idx = *o.get(); - let entry = NormalizedEntry { - timestamp: None, - entry_type, - content, - metadata: None, - }; - msg_store.push_patch(ConversationPatch::replace(idx, entry)); - } - HmEntry::Vacant(v) => { - let idx = entry_index_counter.next(); - v.insert(idx); - let entry = NormalizedEntry { - timestamp: None, - entry_type, - content, - metadata: None, - }; - msg_store.push_patch(ConversationPatch::add_normalized_entry( - idx, entry, - )); - } - } - } - } - } - continue; - } - - if !env.key.starts_with("session/part/") { - continue; - } - - match serde_json::from_value::(env.content.clone()) { - Ok(ShareContent::Text { - id, - message_id, - text, - .. - }) => { - let text = text.unwrap_or_default(); - // Scope aggregation by sessionID and segment to avoid cross-session and enforce breaks - let base_key = format!("{}:{}", env.session_id, message_id); - if message_pending_break.remove(&base_key).unwrap_or(false) { - let e = message_segment.entry(base_key.clone()).or_insert(0); - *e += 1; - } - let seg = *message_segment.get(&base_key).unwrap_or(&0); - let msg_key = format!("{base_key}#{seg}"); - - // Track parts order for this message - let parts_order = message_parts_order.entry(msg_key.clone()).or_default(); - if !parts_order.iter().any(|p| p == &id) { - parts_order.push(id.clone()); - } - - // Update latest text for this part under the message - let part_texts = message_part_texts.entry(msg_key.clone()).or_default(); - part_texts.insert(id.clone(), text); - - // Rebuild aggregated message text by concatenating parts in stable order - let aggregated = parts_order - .iter() - .filter_map(|pid| part_texts.get(pid)) - .cloned() - .collect::>() - .join(""); - message_aggregated.insert(msg_key.clone(), aggregated.clone()); - - // Determine role; if unknown yet, wait until message metadata arrives. - match message_roles.get(&message_id).map(|s| s.as_str()) { - Some("user") => { - // Do not emit user text messages - } - Some(role) => { - // Upsert by message id to keep a single entry per message - let (idx, is_new) = match index_by_message.entry(msg_key) { - Entry::Occupied(o) => (*o.get(), false), - Entry::Vacant(v) => { - let i = entry_index_counter.next(); - v.insert(i); - (i, true) - } - }; - let entry_type = match role { - "system" => NormalizedEntryType::SystemMessage, - _ => NormalizedEntryType::AssistantMessage, - }; - let entry = NormalizedEntry { - timestamp: None, - entry_type, - content: aggregated, - metadata: None, - }; - let patch = if is_new { - ConversationPatch::add_normalized_entry(idx, entry) - } else { - ConversationPatch::replace(idx, entry) - }; - msg_store.push_patch(patch); - } - None => { - // Role unknown; accumulate but don't emit yet - } - } - } - Ok(ShareContent::Tool { - id, - tool, - state, - message_id, - .. - }) => { - // If there is pending text in the current segment, mark to break before next text - let base_key = format!("{}:{}", env.session_id, message_id); - let seg = *message_segment.get(&base_key).unwrap_or(&0); - let seg_key = format!("{base_key}#{seg}"); - if message_aggregated - .get(&seg_key) - .map(|s| !s.is_empty()) - .unwrap_or(false) - { - message_pending_break.insert(base_key.clone(), true); - } - let state = (*state).unwrap_or_default(); - let status = state.status.as_deref().unwrap_or(""); - - let exit_status = state - .metadata - .as_ref() - .and_then(|m| m.exit) - .map(|code| crate::logs::CommandExitStatus::ExitCode { code }); - - let (result, mut content_text) = match status { - "completed" => { - let output = state.output.as_deref().unwrap_or(""); - let title = state.title.as_deref().unwrap_or(""); - let header = if title.is_empty() { - format!("{tool} completed") - } else { - format!("{tool}: {title}") - }; - ( - Some(crate::logs::ToolResult { - r#type: crate::logs::ToolResultValueType::Markdown, - value: serde_json::Value::String(output.to_string()), - }), - format!("{header}\n"), - ) - } - "error" => { - let err = state - .metadata - .as_ref() - .and_then(|m| m.description.as_deref()) - .unwrap_or(""); - ( - Some(crate::logs::ToolResult { - r#type: crate::logs::ToolResultValueType::Markdown, - value: serde_json::Value::String(format!("Error: {err}")), - }), - format!("{tool} error: {err}\n"), - ) - } - "running" => (None, format!("{tool} started\n")), - _ => (None, String::new()), - }; - - // Compute concise normalized summary for known tools using a typed mapping - let worktree = worktree_path.to_string_lossy(); - #[derive(Deserialize)] - #[serde(tag = "tool", rename_all = "lowercase")] - #[allow(dead_code)] - enum TypedTool { - #[serde(rename = "read")] - Read { - #[serde(default)] - input: OcToolInput, - }, - #[serde(rename = "list")] - List { - #[serde(default)] - input: OcToolInput, - }, - #[serde(rename = "grep")] - Grep { - #[serde(default)] - input: OcToolInput, - }, - #[serde(rename = "glob")] - Glob { - #[serde(default)] - input: OcToolInput, - }, - #[serde(rename = "webfetch")] - Webfetch { - #[serde(default)] - input: OcToolInput, - }, - #[serde(other)] - Other, - } - if let Ok(v) = serde_json::to_value(&state.input).and_then(|input| { - serde_json::from_value::(serde_json::json!({ - "tool": tool, - "input": input - })) - }) { - match v { - TypedTool::Read { input } => { - let p = input.file_path.as_deref().unwrap_or(""); - content_text = format!("`{}`", make_path_relative(p, &worktree)); - } - TypedTool::List { input } => { - let p = input.path.as_deref().unwrap_or("."); - content_text = format!( - "List directory: `{}`", - make_path_relative(p, &worktree) - ); - } - TypedTool::Grep { input } => { - let pat = input.pattern.as_deref().unwrap_or(""); - let p = input.path.as_deref().unwrap_or("."); - let rel = make_path_relative(p, &worktree); - if let Some(inc) = input.include.as_deref() { - content_text = format!("`{pat}` in `{rel}` ({inc})"); - } else { - content_text = format!("`{pat}` in `{rel}`"); - } - } - TypedTool::Glob { input } => { - let pat = input.pattern.as_deref().unwrap_or(""); - let p = input.path.as_deref().unwrap_or("."); - let rel = make_path_relative(p, &worktree); - content_text = format!("glob `{pat}` in `{rel}`"); - } - TypedTool::Webfetch { input } => { - let url = input.url.as_deref().unwrap_or(""); - content_text = format!("fetch `{url}`"); - } - TypedTool::Other => {} - } - } - - // Prepare normalized arguments for potential Tool action (fallback only) - let args_json = if let Some(input) = state.input.as_ref() { - let mut map = serde_json::Map::new(); - if let Some(p) = input.file_path.as_deref() { - map.insert( - "filePath".into(), - serde_json::Value::String(make_path_relative( - p, - &worktree_path.to_string_lossy(), - )), - ); - } - if let Some(p) = input.path.as_deref() { - map.insert( - "path".into(), - serde_json::Value::String(make_path_relative( - p, - &worktree_path.to_string_lossy(), - )), - ); - } - if let Some(v) = input.include.as_ref() { - map.insert("include".into(), serde_json::Value::String(v.clone())); - } - if let Some(v) = input.pattern.as_ref() { - map.insert("pattern".into(), serde_json::Value::String(v.clone())); - } - if let Some(v) = input.command.as_ref() { - map.insert("command".into(), serde_json::Value::String(v.clone())); - } - if let Some(v) = input.description.as_ref() { - map.insert("description".into(), serde_json::Value::String(v.clone())); - } - if let Some(v) = input.url.as_ref() { - map.insert("url".into(), serde_json::Value::String(v.clone())); - } - if let Some(v) = input.format.as_ref() { - map.insert("format".into(), serde_json::Value::String(v.clone())); - } - if let Some(v) = input.timeout.as_ref() { - map.insert("timeout".into(), serde_json::Value::from(*v)); - } - serde_json::Value::Object(map) - } else { - serde_json::Value::Null - }; - - // Derive ActionType and attach command results if applicable - let action_type = Self::derive_action_type(&tool, &state, &worktree_path); - let resolved_action_type = match action_type { - Some(mut at) => match (&mut at, &result) { - (ActionType::CommandRun { result: r, .. }, Some(res)) => { - *r = Some(crate::logs::CommandRunResult { - exit_status: exit_status.clone(), - output: res.value.as_str().map(|s| s.to_owned()), - }); - at - } - _ => at, - }, - None => ActionType::Tool { - tool_name: tool.clone(), - arguments: Some(args_json.clone()), - result: result.clone(), - }, - }; - - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name: tool.clone(), - action_type: resolved_action_type, - status: ToolStatus::Success, - }, - content: content_text, - metadata: None, - }; - - let patch = upsert_by_part(entry, id); - msg_store.push_patch(patch); - } - Err(_) => {} - } - } - } - - /// Map tool name and state to a rich ActionType used by frontend renderers. - fn derive_action_type( - tool_name: &str, - state: &OcToolState, - worktree_path: &Path, - ) -> Option { - // Deserialize "tool" + typed input into a tagged enum to avoid stringly logic - #[derive(Deserialize)] - #[serde(tag = "tool", rename_all = "lowercase")] - #[allow(dead_code)] - enum ActionTool { - Read { - input: OcToolInput, - }, - Write { - input: OcToolInput, - }, - Edit { - input: OcToolInput, - }, - Bash { - input: OcToolInput, - }, - Grep { - input: OcToolInput, - }, - Glob { - input: OcToolInput, - }, - Webfetch { - input: OcToolInput, - }, - Task { - input: OcToolInput, - }, - Todowrite { - input: OcToolInput, - }, - Todoread, - List { - input: OcToolInput, - }, - #[serde(other)] - Other, - } - - let input_json = serde_json::to_value(state.input.clone().unwrap_or_default()) - .unwrap_or(serde_json::Value::Null); - let v = serde_json::json!({ "tool": tool_name, "input": input_json }); - let parsed: ActionTool = serde_json::from_value(v).unwrap_or(ActionTool::Other); - match parsed { - ActionTool::Read { input } => { - let path = input.file_path.as_deref().unwrap_or(""); - Some(ActionType::FileRead { - path: make_path_relative(path, &worktree_path.to_string_lossy()), - }) - } - ActionTool::Write { input } => { - let path = input.file_path.as_deref().unwrap_or(""); - let content = input.content.unwrap_or_default(); - Some(ActionType::FileEdit { - path: make_path_relative(path, &worktree_path.to_string_lossy()), - changes: vec![FileChange::Write { content }], - }) - } - ActionTool::Edit { input } => { - let path = input.file_path.as_deref().unwrap_or(""); - let diff = state - .metadata - .as_ref() - .and_then(|m| m.diff.as_deref()) - .unwrap_or(""); - if diff.is_empty() { - return None; - } - Some(ActionType::FileEdit { - path: make_path_relative(path, &worktree_path.to_string_lossy()), - changes: vec![FileChange::Edit { - unified_diff: diff.to_string(), - has_line_numbers: false, - }], - }) - } - ActionTool::Bash { input } => { - let command = input.command.unwrap_or_default(); - Some(ActionType::CommandRun { - command, - result: None, - }) - } - ActionTool::Grep { input } => { - let query = input.pattern.unwrap_or_default(); - Some(ActionType::Search { query }) - } - ActionTool::Glob { input } => { - let query = input.pattern.unwrap_or_default(); - Some(ActionType::Search { query }) - } - ActionTool::Webfetch { input } => { - let url = input.url.unwrap_or_default(); - Some(ActionType::WebFetch { url }) - } - ActionTool::Todowrite { input } => { - let todos = input - .todos - .unwrap_or_default() - .into_iter() - .map(|t| TodoItem { - content: t.content, - status: t.status, - priority: t.priority, - }) - .collect::>(); - Some(ActionType::TodoManagement { - todos, - operation: "write".into(), - }) - } - ActionTool::Todoread => Some(ActionType::TodoManagement { - todos: vec![], - operation: "read".into(), - }), - ActionTool::List { .. } | ActionTool::Task { .. } | ActionTool::Other => None, - } - } -} - -// ============================================================================= -// TOOL DEFINITIONS -// ============================================================================= - -/// TODO information structure -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] -pub struct TodoInfo { - pub content: String, - pub status: String, - #[serde(default)] - pub priority: Option, -} - -// ============================================================================= -// Log interpretation UTILITIES -// ============================================================================= - -// Accurate regex for OpenCode log lines: LEVEL timestamp +ms ... -static OPENCODE_LOG_REGEX: LazyLock = LazyLock::new(|| { - Regex::new(r"^(INFO|DEBUG|WARN|ERROR)\s+\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\s+\+\d+\s*ms.*") - .unwrap() -}); - -/// Log utilities for OpenCode processing -pub struct LogUtils; - -impl LogUtils { - pub fn is_error_line(line: &str) -> bool { - line.starts_with("! ") - } -} diff --git a/crates/executors/src/executors/opencode/share_bridge.rs b/crates/executors/src/executors/opencode/share_bridge.rs deleted file mode 100644 index 44ff8ab0..00000000 --- a/crates/executors/src/executors/opencode/share_bridge.rs +++ /dev/null @@ -1,196 +0,0 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; - -use axum::{ - Json, Router, body::Bytes, extract::State, http::StatusCode, response::IntoResponse, - routing::post, -}; -use serde::{Deserialize, Serialize}; -use tokio::{ - net::TcpListener, - sync::{Mutex, RwLock, broadcast}, - task::JoinHandle, -}; - -/// Minimal subset of OpenCode share API that we need to ingest structured events locally. -/// -/// We run a lightweight HTTP server on 127.0.0.1 with an ephemeral port and point -/// OpenCode to it by setting OPENCODE_API and enabling auto-share. The CLI then POSTs -/// tool/message updates to /share_sync which we rebroadcast to interested consumers. - -#[derive(Debug)] -pub struct Bridge { - pub base_url: String, - tx: broadcast::Sender, - #[allow(dead_code)] - secrets: Arc>>, - shutdown_tx: Arc>>>, - _server_task: JoinHandle<()>, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ShareCreateReq { - #[serde(rename = "sessionID")] - pub session_id: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ShareCreateResp { - pub url: String, - pub secret: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ShareSyncReq { - #[serde(rename = "sessionID")] - pub session_id: String, - pub secret: String, - pub key: String, - pub content: serde_json::Value, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EmptyResp {} - -#[derive(Debug, Clone)] -pub enum ShareEvent { - Sync(ShareSyncReq), -} - -#[derive(Clone)] -struct AppState { - base_url: String, - tx: broadcast::Sender, - secrets: Arc>>, -} - -impl Bridge { - /// Start a new, isolated bridge server bound to localhost on an ephemeral port. - pub async fn start() -> std::io::Result> { - let (tx, _rx) = broadcast::channel(10_000); - let secrets = Arc::new(RwLock::new(HashMap::new())); - - // Bind to localhost:0 to get an ephemeral port - let listener = TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?; - let addr: SocketAddr = listener.local_addr()?; - let base_url = format!("http://{}:{}", addr.ip(), addr.port()); - tracing::debug!( - "OpenCode share bridge started: base_url={}, port={}", - base_url, - addr.port() - ); - - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - let shutdown_tx = Arc::new(Mutex::new(Some(shutdown_tx))); - - let app_state = AppState { - base_url: base_url.clone(), - tx: tx.clone(), - secrets: secrets.clone(), - }; - - let server_task = tokio::spawn(async move { - let app = Router::new() - .route("/share_create", post(share_create)) - .route("/share_delete", post(share_delete)) - .route("/share_sync", post(share_sync)) - .with_state(app_state); - - // Serve with graceful shutdown - if let Err(e) = axum::serve(listener, app) - .with_graceful_shutdown(async move { - // wait for shutdown signal - let _ = shutdown_rx.await; - }) - .await - { - tracing::error!("opencode share bridge server error: {}", e); - } - }); - - Ok(Arc::new(Bridge { - base_url, - tx, - secrets, - shutdown_tx, - _server_task: server_task, - })) - } - - /// Subscribe to events from this bridge instance. - pub fn subscribe(&self) -> broadcast::Receiver { - self.tx.subscribe() - } - - /// Trigger graceful shutdown of this bridge server. - pub async fn shutdown(&self) { - tracing::debug!("Shutting down OpenCode share bridge: {}", self.base_url); - if let Some(tx) = self.shutdown_tx.lock().await.take() { - let _ = tx.send(()); - } - } -} - -async fn share_create(State(state): State, body: Bytes) -> impl IntoResponse { - // accept JSON regardless of content-type - let payload: ShareCreateReq = match serde_json::from_slice(&body) { - Ok(v) => v, - Err(_) => ShareCreateReq { - session_id: "".into(), - }, - }; - // Generate a simple secret and store against session id - let secret = uuid::Uuid::new_v4().to_string(); - { - let mut map = state.secrets.write().await; - map.insert(payload.session_id.clone(), secret.clone()); - } - ( - StatusCode::OK, - Json(ShareCreateResp { - secret, - url: format!("{}/s/{}", state.base_url, short(&payload.session_id)), - }), - ) -} - -async fn share_delete(_state: State, _body: Bytes) -> impl IntoResponse { - (StatusCode::OK, Json(EmptyResp {})) -} - -async fn share_sync(State(state): State, body: Bytes) -> impl IntoResponse { - let payload: ShareSyncReq = match serde_json::from_slice(&body) { - Ok(v) => v, - Err(_) => { - return (StatusCode::BAD_REQUEST, Json(EmptyResp {})); - } - }; - // Validate secret (best-effort) - let ok = { - let map = state.secrets.read().await; - map.get(&payload.session_id) - .map(|expected| expected == &payload.secret) - .unwrap_or(false) - }; - - if !ok { - // Still emit for debugging but warn - tracing::debug!( - "share_sync with invalid secret for session {}", - payload.session_id - ); - } - - // Broadcast event - let _ = state.tx.send(ShareEvent::Sync(payload)); - (StatusCode::OK, Json(EmptyResp {})) -} - -fn short(id: &str) -> String { - id.chars() - .rev() - .take(8) - .collect::() - .chars() - .rev() - .collect() -} diff --git a/shared/schemas/opencode.json b/shared/schemas/opencode.json index f22120a5..0e43cd5c 100644 --- a/shared/schemas/opencode.json +++ b/shared/schemas/opencode.json @@ -17,7 +17,7 @@ "null" ] }, - "agent": { + "mode": { "type": [ "string", "null" diff --git a/shared/types.ts b/shared/types.ts index c4324a34..85e30a85 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -378,7 +378,7 @@ export type CursorAgent = { append_prompt: AppendPrompt, force?: boolean | null, export type Copilot = { append_prompt: AppendPrompt, model?: string | null, allow_all_tools?: boolean | null, allow_tool?: string | null, deny_tool?: string | null, add_dir?: Array | null, disable_mcp_server?: Array | null, base_command_override?: string | null, additional_params?: Array | null, env?: { [key in string]?: string } | null, }; -export type Opencode = { append_prompt: AppendPrompt, model?: string | null, agent?: string | null, base_command_override?: string | null, additional_params?: Array | null, env?: { [key in string]?: string } | null, }; +export type Opencode = { append_prompt: AppendPrompt, model?: string | null, mode?: string | null, base_command_override?: string | null, additional_params?: Array | null, env?: { [key in string]?: string } | null, }; export type QwenCode = { append_prompt: AppendPrompt, yolo?: boolean | null, base_command_override?: string | null, additional_params?: Array | null, env?: { [key in string]?: string } | null, };