From 89beac2daea0f107e43a19afa6995de16b9a69ae Mon Sep 17 00:00:00 2001 From: Solomon Date: Wed, 10 Sep 2025 13:14:07 +0100 Subject: [PATCH] Collect richer logs from opencode (#655) --- crates/executors/Cargo.toml | 1 + crates/executors/src/executors/opencode.rs | 1458 +++++++++-------- .../src/executors/opencode/share_bridge.rs | 196 +++ crates/executors/src/stdout_dup.rs | 83 + crates/utils/src/path.rs | 9 +- 5 files changed, 1016 insertions(+), 731 deletions(-) create mode 100644 crates/executors/src/executors/opencode/share_bridge.rs diff --git a/crates/executors/Cargo.toml b/crates/executors/Cargo.toml index 069312d5..3598a34b 100644 --- a/crates/executors/Cargo.toml +++ b/crates/executors/Cargo.toml @@ -40,3 +40,4 @@ strum = "0.27.2" strum_macros = "0.27.2" convert_case = "0.6" sqlx = "0.8.6" +axum = { workspace = true } diff --git a/crates/executors/src/executors/opencode.rs b/crates/executors/src/executors/opencode.rs index 41fc36c2..3c4003a4 100644 --- a/crates/executors/src/executors/opencode.rs +++ b/crates/executors/src/executors/opencode.rs @@ -1,5 +1,6 @@ +mod share_bridge; + use std::{ - fmt, path::{Path, PathBuf}, process::Stdio, sync::Arc, @@ -15,21 +16,84 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command}; use ts_rs::TS; -use utils::{ - diff::create_unified_diff, msg_store::MsgStore, path::make_path_relative, - shell::get_shell_command, -}; +use utils::{msg_store::MsgStore, path::make_path_relative, shell::get_shell_command}; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, - executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor}, + executors::{ + AppendPrompt, ExecutorError, StandardCodingAgentExecutor, + opencode::share_bridge::Bridge as ShareBridge, + }, logs::{ ActionType, FileChange, NormalizedEntry, NormalizedEntryType, TodoItem, - plain_text_processor::{MessageBoundary, PlainTextLogProcessor}, utils::EntryIndexProvider, }, + stdout_dup, }; +// Typed structures for oc-share tool state +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct OcToolInput { + #[serde(rename = "filePath", default)] + file_path: Option, + #[serde(default)] + path: Option, + #[serde(default)] + include: Option, + #[serde(default)] + pattern: Option, + #[serde(default)] + command: Option, + #[serde(default)] + description: Option, + #[serde(default)] + url: Option, + #[serde(default)] + format: Option, + #[serde(default)] + timeout: Option, + #[serde(rename = "oldString", default)] + old_string: Option, + #[serde(rename = "newString", default)] + new_string: Option, + #[serde(rename = "replaceAll", default)] + replace_all: Option, + #[serde(default)] + content: Option, + #[serde(default)] + todos: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct OcToolMetadata { + #[serde(default)] + description: Option, + #[serde(default)] + exit: Option, + #[serde(default)] + diff: Option, + #[serde(default)] + count: Option, + #[serde(default)] + truncated: Option, + #[serde(default)] + preview: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct OcToolState { + #[serde(default)] + input: Option, + #[serde(default)] + metadata: Option, + #[serde(default)] + output: Option, + #[serde(default)] + status: Option, + #[serde(default)] + title: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] pub struct Opencode { #[serde(default)] @@ -44,8 +108,11 @@ pub struct Opencode { impl Opencode { fn build_command_builder(&self) -> CommandBuilder { - let mut builder = - CommandBuilder::new("npx -y opencode-ai@latest run").params(["--print-logs"]); + let mut builder = CommandBuilder::new("npx -y opencode-ai@latest run").params([ + "--print-logs", + "--log-level", + "ERROR", + ]); if let Some(model) = &self.model { builder = builder.extend_params(["--model", model]); @@ -66,6 +133,8 @@ impl StandardCodingAgentExecutor for Opencode { current_dir: &Path, prompt: &str, ) -> Result { + // Start a dedicated local share bridge bound to this opencode process + let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; let (shell_cmd, shell_arg) = get_shell_command(); let opencode_command = self.build_command_builder().build_initial(); @@ -80,16 +149,46 @@ impl StandardCodingAgentExecutor for Opencode { .current_dir(current_dir) .arg(shell_arg) .arg(opencode_command) - .env("NODE_NO_WARNINGS", "1"); + .env("NODE_NO_WARNINGS", "1") + .env("OPENCODE_AUTO_SHARE", "1") + .env("OPENCODE_API", bridge.base_url.clone()); - let mut child = command.group_spawn()?; + let mut child = match command.group_spawn() { + Ok(c) => c, + Err(e) => { + // If opencode fails to start, shut down the bridge to free the port + bridge.shutdown().await; + return Err(ExecutorError::SpawnError(e)); + } + }; // Write prompt to stdin if let Some(mut stdin) = child.inner().stdin.take() { stdin.write_all(combined_prompt.as_bytes()).await?; stdin.shutdown().await?; } + // Transfer share events as lines for normalization through stdout + let (mut dup_stream, appender) = stdout_dup::tee_stdout_with_appender(&mut child)?; + let mut rx = bridge.subscribe(); + tokio::spawn(async move { + while let Ok(crate::executors::opencode::share_bridge::ShareEvent::Sync(mut req)) = + rx.recv().await + { + req.secret.clear(); + if let Ok(json) = serde_json::to_string(&req) { + appender.append_line(format!("{}{}", Opencode::SHARE_PREFIX, json)); + } + } + }); + // Monitor child's stdout end; when it closes, shut down the bridge to release the port + let bridge_for_shutdown = bridge.clone(); + tokio::spawn(async move { + use futures::StreamExt; + while let Some(_chunk) = dup_stream.next().await {} + tracing::debug!("Opencode process stdout closed"); + bridge_for_shutdown.shutdown().await; + }); Ok(child) } @@ -99,6 +198,8 @@ impl StandardCodingAgentExecutor for Opencode { prompt: &str, session_id: &str, ) -> Result { + // Start a dedicated local share bridge bound to this opencode process + let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; let (shell_cmd, shell_arg) = get_shell_command(); let opencode_command = self .build_command_builder() @@ -115,16 +216,43 @@ impl StandardCodingAgentExecutor for Opencode { .current_dir(current_dir) .arg(shell_arg) .arg(&opencode_command) - .env("NODE_NO_WARNINGS", "1"); + .env("NODE_NO_WARNINGS", "1") + .env("OPENCODE_AUTO_SHARE", "1") + .env("OPENCODE_API", bridge.base_url.clone()); - let mut child = command.group_spawn()?; + let mut child = match command.group_spawn() { + Ok(c) => c, + Err(e) => { + bridge.shutdown().await; + return Err(ExecutorError::SpawnError(e)); + } + }; // Write prompt to stdin if let Some(mut stdin) = child.inner().stdin.take() { stdin.write_all(combined_prompt.as_bytes()).await?; stdin.shutdown().await?; } + // Transfer share events as lines for normalization through stdout + let (mut dup_stream, appender) = stdout_dup::tee_stdout_with_appender(&mut child)?; + let mut rx = bridge.subscribe(); + tokio::spawn(async move { + while let Ok(crate::executors::opencode::share_bridge::ShareEvent::Sync(mut req)) = + rx.recv().await + { + req.secret.clear(); + if let Ok(json) = serde_json::to_string(&req) { + appender.append_line(format!("{}{}", Opencode::SHARE_PREFIX, json)); + } + } + }); + let bridge_for_shutdown = bridge.clone(); + tokio::spawn(async move { + use futures::StreamExt; + while let Some(_chunk) = dup_stream.next().await {} + bridge_for_shutdown.shutdown().await; + }); Ok(child) } @@ -153,29 +281,27 @@ impl StandardCodingAgentExecutor for Opencode { }) .boxed(); - // Process log lines, which contain error messages and session ID + // Process log lines, which contain error messages. We now source session ID + // from the oc-share stream instead of stderr. tokio::spawn(Self::process_opencode_log_lines( log_lines, msg_store.clone(), entry_index_counter.clone(), + worktree_path.to_path_buf(), )); - let agent_logs = stderr_lines - .filter(|line| { - ready( - !LogUtils::is_noise(line) - && !OPENCODE_LOG_REGEX.is_match(line) - && !LogUtils::is_error_line(line), - ) - }) + // Also parse share events from stdout + let share_events = msg_store + .stdout_lines_stream() + .filter_map(|res| ready(res.ok())) + .filter(|line| ready(line.starts_with(Opencode::SHARE_PREFIX))) + .map(|line| line[Opencode::SHARE_PREFIX.len()..].to_string()) .boxed(); - - // Normalize agent logs - tokio::spawn(Self::process_agent_logs( - agent_logs, + tokio::spawn(Self::process_share_events( + share_events, worktree_path.to_path_buf(), - entry_index_counter, - msg_store, + entry_index_counter.clone(), + msg_store.clone(), )); } @@ -192,17 +318,15 @@ impl StandardCodingAgentExecutor for Opencode { } } impl Opencode { + const SHARE_PREFIX: &'static str = "[oc-share] "; async fn process_opencode_log_lines( mut log_lines: BoxStream<'_, String>, msg_store: Arc, entry_index_counter: EntryIndexProvider, + _worktree_path: PathBuf, ) { - let mut session_id_extracted = false; while let Some(line) = log_lines.next().await { - if line.starts_with("ERROR") - || line.starts_with("WARN") - || LogUtils::is_error_line(&line) - { + if line.starts_with("ERROR") || LogUtils::is_error_line(&line) { let entry = NormalizedEntry { timestamp: None, entry_type: NormalizedEntryType::ErrorMessage, @@ -216,84 +340,599 @@ impl Opencode { entry, ); msg_store.push_patch(patch); - } else if !session_id_extracted - && let Some(session_id) = LogUtils::parse_session_id_from_line(&line) - { - msg_store.push_session_id(session_id); - session_id_extracted = true; } } } +} - async fn process_agent_logs( - mut agent_logs: BoxStream<'_, String>, +impl Opencode { + /// Parse share events and emit normalized patches + async fn process_share_events( + mut lines: BoxStream<'_, String>, worktree_path: PathBuf, entry_index_counter: EntryIndexProvider, msg_store: Arc, ) { - // Create processor for stderr content - let mut processor = PlainTextLogProcessor::builder() - .normalized_entry_producer(Box::new(move |content: String| { - Self::create_normalized_entry(content, &worktree_path.clone()) - })) - .message_boundary_predicate(Box::new(|lines: &[String]| Self::detect_tool_call(lines))) - .index_provider(entry_index_counter.clone()) - .build(); + use std::collections::HashMap; - while let Some(line) = agent_logs.next().await { - debug_assert!(!line.ends_with('\n')); + use serde::Deserialize; - // Process the line through the plain text processor - for patch in processor.process(line + "\n") { - msg_store.push_patch(patch); - } - } - } + use crate::logs::utils::ConversationPatch; - /// Create normalized entry from content - pub fn create_normalized_entry(content: String, worktree_path: &Path) -> NormalizedEntry { - // Check if this is a tool call - if let Some(tool_call) = ToolCall::parse(&content) { - let tool_name = tool_call.tool.name(); - let action_type = - ToolUtils::determine_action_type(&tool_call.tool, &worktree_path.to_string_lossy()); - let tool_content = - ToolUtils::generate_tool_content(&tool_call.tool, &worktree_path.to_string_lossy()); - - return NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name, - action_type, - }, - content: tool_content, - metadata: None, - }; + #[derive(Debug, Clone, Deserialize)] + #[allow(dead_code)] + struct TimeObj { + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, } - // Default to assistant message - NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::AssistantMessage, - content, - metadata: None, - } - } + // Tool input/state structures are defined at module level (OcToolInput/OcToolState) - /// Detect message boundaries for tool calls and other content using serde deserialization - pub fn detect_tool_call(lines: &[String]) -> Option { - for (i, line) in lines.iter().enumerate() { - if ToolCall::is_tool_line(line) { - if i == 0 { - // separate tool call from subsequent content - return Some(MessageBoundary::Split(1)); - } else { - // separate tool call from previous content - return Some(MessageBoundary::Split(i)); + #[derive(Debug, Clone, Deserialize)] + #[serde(tag = "type", rename_all = "lowercase")] + #[allow(clippy::large_enum_variant)] + #[allow(dead_code)] + enum ShareContent { + Text { + id: String, + #[serde(rename = "messageID")] + message_id: String, + #[serde(rename = "sessionID")] + session_id: String, + #[serde(default)] + text: Option, + #[serde(default)] + time: Option, + }, + Tool { + id: String, + #[serde(rename = "messageID")] + message_id: String, + #[serde(rename = "sessionID")] + session_id: String, + #[serde(rename = "callID", default)] + call_id: Option, + tool: String, + #[serde(default)] + state: Box>, + }, + } + + #[derive(Debug, Clone, Deserialize)] + #[allow(dead_code)] + struct ShareSyncEnvelope { + #[serde(rename = "sessionID")] + session_id: String, + secret: String, + key: String, + content: serde_json::Value, + } + + let mut index_by_part: HashMap = HashMap::new(); + // For text aggregation across parts under the same message (scoped by session) + // Key format: "{sessionID}:{messageID}" + let mut index_by_message: HashMap = HashMap::new(); + let mut message_parts_order: HashMap> = HashMap::new(); + let mut message_part_texts: HashMap> = HashMap::new(); + let mut message_aggregated: HashMap = HashMap::new(); + // Segment tracking per message to force splits after tool events + // base_key = "{sessionID}:{messageID}", seg_key = "{base_key}#" + let mut message_segment: HashMap = HashMap::new(); + let mut message_pending_break: HashMap = HashMap::new(); + let mut message_roles: HashMap = HashMap::new(); + let mut session_id_set = false; + + use std::collections::hash_map::Entry; + let mut upsert_by_part = |entry: NormalizedEntry, part_id: String| { + let (idx, is_new) = match index_by_part.entry(part_id) { + Entry::Occupied(o) => (*o.get(), false), + Entry::Vacant(v) => { + let i = entry_index_counter.next(); + v.insert(i); + (i, true) } + }; + if is_new { + ConversationPatch::add_normalized_entry(idx, entry) + } else { + ConversationPatch::replace(idx, entry) + } + }; + + while let Some(line) = lines.next().await { + let Ok(env) = serde_json::from_str::(&line) else { + continue; + }; + // Record session id once from stream + if !session_id_set { + msg_store.push_session_id(env.session_id.clone()); + session_id_set = true; + } + + // Capture message role metadata from session/message events + if env.key.starts_with("session/message/") { + #[derive(Deserialize)] + struct MessageMeta { + id: String, + #[serde(default)] + role: Option, + } + if let Ok(meta) = serde_json::from_value::(env.content.clone()) + && let Some(role) = meta.role + { + message_roles.insert(meta.id.clone(), role.clone()); + + // If we have aggregated text already for this message, create or update the entry now + let base_key = format!("{}:{}", env.session_id, meta.id); + let seg = *message_segment.get(&base_key).unwrap_or(&0); + let seg_key = format!("{base_key}#{seg}"); + if let Some(content) = message_aggregated.get(&seg_key).cloned() { + // Skip emitting user role messages entirely + if role == "user" { + // Do not emit user text messages + } else { + let entry_type = match role.as_str() { + "system" => NormalizedEntryType::SystemMessage, + _ => NormalizedEntryType::AssistantMessage, + }; + use std::collections::hash_map::Entry as HmEntry; + match index_by_message.entry(seg_key) { + HmEntry::Occupied(o) => { + let idx = *o.get(); + let entry = NormalizedEntry { + timestamp: None, + entry_type, + content, + metadata: None, + }; + msg_store.push_patch(ConversationPatch::replace(idx, entry)); + } + HmEntry::Vacant(v) => { + let idx = entry_index_counter.next(); + v.insert(idx); + let entry = NormalizedEntry { + timestamp: None, + entry_type, + content, + metadata: None, + }; + msg_store.push_patch(ConversationPatch::add_normalized_entry( + idx, entry, + )); + } + } + } + } + } + continue; + } + + if !env.key.starts_with("session/part/") { + continue; + } + + match serde_json::from_value::(env.content.clone()) { + Ok(ShareContent::Text { + id, + message_id, + text, + .. + }) => { + let text = text.unwrap_or_default(); + // Scope aggregation by sessionID and segment to avoid cross-session and enforce breaks + let base_key = format!("{}:{}", env.session_id, message_id); + if message_pending_break.remove(&base_key).unwrap_or(false) { + let e = message_segment.entry(base_key.clone()).or_insert(0); + *e += 1; + } + let seg = *message_segment.get(&base_key).unwrap_or(&0); + let msg_key = format!("{base_key}#{seg}"); + + // Track parts order for this message + let parts_order = message_parts_order.entry(msg_key.clone()).or_default(); + if !parts_order.iter().any(|p| p == &id) { + parts_order.push(id.clone()); + } + + // Update latest text for this part under the message + let part_texts = message_part_texts.entry(msg_key.clone()).or_default(); + part_texts.insert(id.clone(), text); + + // Rebuild aggregated message text by concatenating parts in stable order + let aggregated = parts_order + .iter() + .filter_map(|pid| part_texts.get(pid)) + .cloned() + .collect::>() + .join(""); + message_aggregated.insert(msg_key.clone(), aggregated.clone()); + + // Determine role; if unknown yet, wait until message metadata arrives. + match message_roles.get(&message_id).map(|s| s.as_str()) { + Some("user") => { + // Do not emit user text messages + } + Some(role) => { + // Upsert by message id to keep a single entry per message + let (idx, is_new) = match index_by_message.entry(msg_key) { + Entry::Occupied(o) => (*o.get(), false), + Entry::Vacant(v) => { + let i = entry_index_counter.next(); + v.insert(i); + (i, true) + } + }; + let entry_type = match role { + "system" => NormalizedEntryType::SystemMessage, + _ => NormalizedEntryType::AssistantMessage, + }; + let entry = NormalizedEntry { + timestamp: None, + entry_type, + content: aggregated, + metadata: None, + }; + let patch = if is_new { + ConversationPatch::add_normalized_entry(idx, entry) + } else { + ConversationPatch::replace(idx, entry) + }; + msg_store.push_patch(patch); + } + None => { + // Role unknown; accumulate but don't emit yet + } + } + } + Ok(ShareContent::Tool { + id, + tool, + state, + message_id, + .. + }) => { + // If there is pending text in the current segment, mark to break before next text + let base_key = format!("{}:{}", env.session_id, message_id); + let seg = *message_segment.get(&base_key).unwrap_or(&0); + let seg_key = format!("{base_key}#{seg}"); + if message_aggregated + .get(&seg_key) + .map(|s| !s.is_empty()) + .unwrap_or(false) + { + message_pending_break.insert(base_key.clone(), true); + } + let state = (*state).unwrap_or_default(); + let status = state.status.as_deref().unwrap_or(""); + + let exit_status = state + .metadata + .as_ref() + .and_then(|m| m.exit) + .map(|code| crate::logs::CommandExitStatus::ExitCode { code }); + + let (result, mut content_text) = match status { + "completed" => { + let output = state.output.as_deref().unwrap_or(""); + let title = state.title.as_deref().unwrap_or(""); + let header = if title.is_empty() { + format!("{tool} completed") + } else { + format!("{tool}: {title}") + }; + ( + Some(crate::logs::ToolResult { + r#type: crate::logs::ToolResultValueType::Markdown, + value: serde_json::Value::String(output.to_string()), + }), + format!("{header}\n"), + ) + } + "error" => { + let err = state + .metadata + .as_ref() + .and_then(|m| m.description.as_deref()) + .unwrap_or(""); + ( + Some(crate::logs::ToolResult { + r#type: crate::logs::ToolResultValueType::Markdown, + value: serde_json::Value::String(format!("Error: {err}")), + }), + format!("{tool} error: {err}\n"), + ) + } + "running" => (None, format!("{tool} started\n")), + _ => (None, String::new()), + }; + + // Compute concise normalized summary for known tools using a typed mapping + let worktree = worktree_path.to_string_lossy(); + #[derive(Deserialize)] + #[serde(tag = "tool", rename_all = "lowercase")] + #[allow(dead_code)] + enum TypedTool { + #[serde(rename = "read")] + Read { + #[serde(default)] + input: OcToolInput, + }, + #[serde(rename = "list")] + List { + #[serde(default)] + input: OcToolInput, + }, + #[serde(rename = "grep")] + Grep { + #[serde(default)] + input: OcToolInput, + }, + #[serde(rename = "glob")] + Glob { + #[serde(default)] + input: OcToolInput, + }, + #[serde(rename = "webfetch")] + Webfetch { + #[serde(default)] + input: OcToolInput, + }, + #[serde(other)] + Other, + } + if let Ok(v) = serde_json::to_value(&state.input).and_then(|input| { + serde_json::from_value::(serde_json::json!({ + "tool": tool, + "input": input + })) + }) { + match v { + TypedTool::Read { input } => { + let p = input.file_path.as_deref().unwrap_or(""); + content_text = format!("`{}`", make_path_relative(p, &worktree)); + } + TypedTool::List { input } => { + let p = input.path.as_deref().unwrap_or("."); + content_text = format!( + "List directory: `{}`", + make_path_relative(p, &worktree) + ); + } + TypedTool::Grep { input } => { + let pat = input.pattern.as_deref().unwrap_or(""); + let p = input.path.as_deref().unwrap_or("."); + let rel = make_path_relative(p, &worktree); + if let Some(inc) = input.include.as_deref() { + content_text = format!("`{pat}` in `{rel}` ({inc})"); + } else { + content_text = format!("`{pat}` in `{rel}`"); + } + } + TypedTool::Glob { input } => { + let pat = input.pattern.as_deref().unwrap_or(""); + let p = input.path.as_deref().unwrap_or("."); + let rel = make_path_relative(p, &worktree); + content_text = format!("glob `{pat}` in `{rel}`"); + } + TypedTool::Webfetch { input } => { + let url = input.url.as_deref().unwrap_or(""); + content_text = format!("fetch `{url}`"); + } + TypedTool::Other => {} + } + } + + // Prepare normalized arguments for potential Tool action (fallback only) + let args_json = if let Some(input) = state.input.as_ref() { + let mut map = serde_json::Map::new(); + if let Some(p) = input.file_path.as_deref() { + map.insert( + "filePath".into(), + serde_json::Value::String(make_path_relative( + p, + &worktree_path.to_string_lossy(), + )), + ); + } + if let Some(p) = input.path.as_deref() { + map.insert( + "path".into(), + serde_json::Value::String(make_path_relative( + p, + &worktree_path.to_string_lossy(), + )), + ); + } + if let Some(v) = input.include.as_ref() { + map.insert("include".into(), serde_json::Value::String(v.clone())); + } + if let Some(v) = input.pattern.as_ref() { + map.insert("pattern".into(), serde_json::Value::String(v.clone())); + } + if let Some(v) = input.command.as_ref() { + map.insert("command".into(), serde_json::Value::String(v.clone())); + } + if let Some(v) = input.description.as_ref() { + map.insert("description".into(), serde_json::Value::String(v.clone())); + } + if let Some(v) = input.url.as_ref() { + map.insert("url".into(), serde_json::Value::String(v.clone())); + } + if let Some(v) = input.format.as_ref() { + map.insert("format".into(), serde_json::Value::String(v.clone())); + } + if let Some(v) = input.timeout.as_ref() { + map.insert("timeout".into(), serde_json::Value::from(*v)); + } + serde_json::Value::Object(map) + } else { + serde_json::Value::Null + }; + + // Derive ActionType and attach command results if applicable + let action_type = Self::derive_action_type(&tool, &state, &worktree_path); + let resolved_action_type = match action_type { + Some(mut at) => match (&mut at, &result) { + (ActionType::CommandRun { result: r, .. }, Some(res)) => { + *r = Some(crate::logs::CommandRunResult { + exit_status: exit_status.clone(), + output: res.value.as_str().map(|s| s.to_owned()), + }); + at + } + _ => at, + }, + None => ActionType::Tool { + tool_name: tool.clone(), + arguments: Some(args_json.clone()), + result: result.clone(), + }, + }; + + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: tool.clone(), + action_type: resolved_action_type, + }, + content: content_text, + metadata: None, + }; + + let patch = upsert_by_part(entry, id); + msg_store.push_patch(patch); + } + Err(_) => {} } } - None + } + + /// Map tool name and state to a rich ActionType used by frontend renderers. + fn derive_action_type( + tool_name: &str, + state: &OcToolState, + worktree_path: &Path, + ) -> Option { + // Deserialize "tool" + typed input into a tagged enum to avoid stringly logic + #[derive(Deserialize)] + #[serde(tag = "tool", rename_all = "lowercase")] + #[allow(dead_code)] + enum ActionTool { + Read { + input: OcToolInput, + }, + Write { + input: OcToolInput, + }, + Edit { + input: OcToolInput, + }, + Bash { + input: OcToolInput, + }, + Grep { + input: OcToolInput, + }, + Glob { + input: OcToolInput, + }, + Webfetch { + input: OcToolInput, + }, + Task { + input: OcToolInput, + }, + Todowrite { + input: OcToolInput, + }, + Todoread, + List { + input: OcToolInput, + }, + #[serde(other)] + Other, + } + + let input_json = serde_json::to_value(state.input.clone().unwrap_or_default()) + .unwrap_or(serde_json::Value::Null); + let v = serde_json::json!({ "tool": tool_name, "input": input_json }); + let parsed: ActionTool = serde_json::from_value(v).unwrap_or(ActionTool::Other); + match parsed { + ActionTool::Read { input } => { + let path = input.file_path.as_deref().unwrap_or(""); + Some(ActionType::FileRead { + path: make_path_relative(path, &worktree_path.to_string_lossy()), + }) + } + ActionTool::Write { input } => { + let path = input.file_path.as_deref().unwrap_or(""); + let content = input.content.unwrap_or_default(); + Some(ActionType::FileEdit { + path: make_path_relative(path, &worktree_path.to_string_lossy()), + changes: vec![FileChange::Write { content }], + }) + } + ActionTool::Edit { input } => { + let path = input.file_path.as_deref().unwrap_or(""); + let diff = state + .metadata + .as_ref() + .and_then(|m| m.diff.as_deref()) + .unwrap_or(""); + if diff.is_empty() { + return None; + } + Some(ActionType::FileEdit { + path: make_path_relative(path, &worktree_path.to_string_lossy()), + changes: vec![FileChange::Edit { + unified_diff: diff.to_string(), + has_line_numbers: false, + }], + }) + } + ActionTool::Bash { input } => { + let command = input.command.unwrap_or_default(); + Some(ActionType::CommandRun { + command, + result: None, + }) + } + ActionTool::Grep { input } => { + let query = input.pattern.unwrap_or_default(); + Some(ActionType::Search { query }) + } + ActionTool::Glob { input } => { + let query = input.pattern.unwrap_or_default(); + Some(ActionType::Search { query }) + } + ActionTool::Webfetch { input } => { + let url = input.url.unwrap_or_default(); + Some(ActionType::WebFetch { url }) + } + ActionTool::Todowrite { input } => { + let todos = input + .todos + .unwrap_or_default() + .into_iter() + .map(|t| TodoItem { + content: t.content, + status: t.status, + priority: t.priority, + }) + .collect::>(); + Some(ActionType::TodoManagement { + todos, + operation: "write".into(), + }) + } + ActionTool::Todoread => Some(ActionType::TodoManagement { + todos: vec![], + operation: "read".into(), + }), + ActionTool::List { .. } | ActionTool::Task { .. } | ActionTool::Other => None, + } } } @@ -301,87 +940,6 @@ impl Opencode { // TOOL DEFINITIONS // ============================================================================= -/// Represents different types of tools that can be called by OpenCode -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(tag = "tool_name", content = "arguments")] -pub enum Tool { - #[serde(rename = "read")] - Read { - #[serde(rename = "filePath")] - file_path: String, - #[serde(default)] - offset: Option, - #[serde(default)] - limit: Option, - }, - #[serde(rename = "write")] - Write { - #[serde(rename = "filePath")] - file_path: String, - #[serde(default)] - content: Option, - }, - #[serde(rename = "edit")] - Edit { - #[serde(rename = "filePath")] - file_path: String, - #[serde(rename = "oldString", default)] - old_string: Option, - #[serde(rename = "newString", default)] - new_string: Option, - #[serde(rename = "replaceAll", default)] - replace_all: Option, - }, - #[serde(rename = "bash")] - Bash { - command: String, - #[serde(default)] - timeout: Option, - #[serde(default)] - description: Option, - }, - #[serde(rename = "grep")] - Grep { - pattern: String, - #[serde(default)] - path: Option, - #[serde(default)] - include: Option, - }, - #[serde(rename = "glob")] - Glob { - pattern: String, - #[serde(default)] - path: Option, - }, - #[serde(rename = "todowrite")] - TodoWrite { todos: Vec }, - #[serde(rename = "todoread")] - TodoRead, - #[serde(rename = "list")] - List { - #[serde(default)] - path: Option, - #[serde(default)] - ignore: Option>, - }, - #[serde(rename = "webfetch")] - WebFetch { - url: String, - #[serde(default)] - format: Option, - #[serde(default)] - timeout: Option, - }, - #[serde(rename = "task")] - Task { description: String }, - /// Catch-all for unknown tools (including MCP tools) - Other { - tool_name: String, - arguments: serde_json::Value, - }, -} - /// TODO information structure #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] pub struct TodoInfo { @@ -391,477 +949,6 @@ pub struct TodoInfo { pub priority: Option, } -/// Web fetch format options -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] -#[serde(rename_all = "lowercase")] -pub enum WebFetchFormat { - Text, - Markdown, - Html, -} - -impl Tool { - /// Get the tool name as a string - pub fn name(&self) -> String { - match self { - Tool::Read { .. } => "read".to_string(), - Tool::Write { .. } => "write".to_string(), - Tool::Edit { .. } => "edit".to_string(), - Tool::Bash { .. } => "bash".to_string(), - Tool::Grep { .. } => "grep".to_string(), - Tool::Glob { .. } => "glob".to_string(), - Tool::TodoWrite { .. } => "todowrite".to_string(), - Tool::TodoRead => "todoread".to_string(), - Tool::List { .. } => "list".to_string(), - Tool::WebFetch { .. } => "webfetch".to_string(), - Tool::Task { .. } => "task".to_string(), - Tool::Other { tool_name, .. } => tool_name.clone(), - } - } - - /// Get the tool arguments as JSON value - pub fn arguments(&self) -> serde_json::Value { - match self { - Tool::Read { - file_path, - offset, - limit, - } => { - let mut args = serde_json::json!({ "filePath": file_path }); - if let Some(offset) = offset { - args["offset"] = (*offset).into(); - } - if let Some(limit) = limit { - args["limit"] = (*limit).into(); - } - args - } - Tool::Write { file_path, content } => { - let mut args = serde_json::json!({ "filePath": file_path }); - if let Some(content) = content { - args["content"] = content.clone().into(); - } - args - } - Tool::Edit { - file_path, - old_string, - new_string, - replace_all, - } => { - let mut args = serde_json::json!({ - "filePath": file_path - }); - if let Some(old_string) = old_string { - args["oldString"] = old_string.clone().into(); - } - if let Some(new_string) = new_string { - args["newString"] = new_string.clone().into(); - } - if let Some(replace_all) = replace_all { - args["replaceAll"] = (*replace_all).into(); - } - args - } - Tool::Bash { - command, - timeout, - description, - } => { - let mut args = serde_json::json!({ "command": command }); - if let Some(timeout) = timeout { - args["timeout"] = (*timeout).into(); - } - if let Some(description) = description { - args["description"] = description.clone().into(); - } - args - } - Tool::Grep { - pattern, - path, - include, - } => { - let mut args = serde_json::json!({ "pattern": pattern }); - if let Some(path) = path { - args["path"] = path.clone().into(); - } - if let Some(include) = include { - args["include"] = include.clone().into(); - } - args - } - Tool::Glob { pattern, path } => { - let mut args = serde_json::json!({ "pattern": pattern }); - if let Some(path) = path { - args["path"] = path.clone().into(); - } - args - } - Tool::TodoWrite { todos } => { - serde_json::json!({ "todos": todos }) - } - Tool::TodoRead => serde_json::Value::Null, - Tool::List { path, ignore } => { - let mut args = serde_json::Value::Object(serde_json::Map::new()); - if let Some(path) = path { - args["path"] = path.clone().into(); - } - if let Some(ignore) = ignore { - args["ignore"] = ignore.clone().into(); - } - args - } - Tool::WebFetch { - url, - format, - timeout, - } => { - let mut args = serde_json::json!({ "url": url }); - if let Some(format) = format { - args["format"] = match format { - WebFetchFormat::Text => "text".into(), - WebFetchFormat::Markdown => "markdown".into(), - WebFetchFormat::Html => "html".into(), - }; - } - if let Some(timeout) = timeout { - args["timeout"] = (*timeout).into(); - } - args - } - Tool::Task { description } => { - serde_json::json!({ "description": description }) - } - Tool::Other { arguments, .. } => arguments.clone(), - } - } -} - -// ============================================================================= -// TOOL CALL PARSING -// ============================================================================= - -/// Represents a parsed tool call line from OpenCode output -#[derive(Debug, Clone, PartialEq)] -pub struct ToolCall { - pub tool: Tool, -} - -impl ToolCall { - /// Parse a tool call from a string that starts with | - /// - /// Supports both legacy JSON argument format and new simplified formats, e.g.: - /// | Write drill.md - /// | Read drill.md - /// | Edit drill.md - /// | List {"path":"/path","ignore":["node_modules"]} - /// | Glob {"pattern":"*.md"} - /// | Grep pattern here - /// | Bash echo "cmd" - /// | webfetch https://example.com (application/json) - /// | Todo 2 todos - /// | task Some description - pub fn parse(line: &str) -> Option { - let line = line.trim_end(); - if !line.starts_with('|') { - return None; - } - - // Remove the leading '|' and trim surrounding whitespace - let content = line[1..].trim(); - if content.is_empty() { - return None; - } - - // First token is the tool name, remainder are arguments - let mut parts = content.split_whitespace(); - let raw_tool = parts.next()?; - let tool_name = raw_tool.to_lowercase(); - - // Compute the remainder (preserve original spacing after tool name) - let rest = content.get(raw_tool.len()..).unwrap_or("").trim_start(); - - // JSON tool arguments - if rest.starts_with('{') - && let Ok(arguments) = serde_json::from_str::(rest) - { - let tool_json = serde_json::json!({ - "tool_name": tool_name, - "arguments": arguments - }); - - return match serde_json::from_value::(tool_json) { - Ok(tool) => Some(ToolCall { tool }), - Err(_) => Some(ToolCall { - tool: Tool::Other { - tool_name, - arguments, - }, - }), - }; - } - - // Simplified tool argument summary - let tool = match tool_name.as_str() { - "read" => Tool::Read { - file_path: rest.to_string(), - offset: None, - limit: None, - }, - "write" => Tool::Write { - file_path: rest.to_string(), - // Simplified logs omit content; set to None - content: None, - }, - "edit" => { - // Simplified logs provide only file path; set strings to None - Tool::Edit { - file_path: rest.to_string(), - old_string: None, - new_string: None, - replace_all: None, - } - } - "bash" => Tool::Bash { - command: rest.to_string(), - timeout: None, - description: None, - }, - "grep" => Tool::Grep { - // Treat the remainder as the pattern if not JSON - pattern: rest.to_string(), - path: None, - include: None, - }, - "glob" => Tool::Glob { - pattern: rest.to_string(), - path: None, - }, - "list" => { - if rest.is_empty() { - Tool::List { - path: None, - ignore: None, - } - } else { - Tool::List { - path: Some(rest.to_string()), - ignore: None, - } - } - } - "webfetch" => { - // Extract the first token as URL, ignore trailing "(...)" content-type hints - let url = rest.split_whitespace().next().unwrap_or(rest).to_string(); - Tool::WebFetch { - url, - format: None, - timeout: None, - } - } - "todo" => Tool::TodoRead, - "task" => { - // Use the rest as the task description - Tool::Task { - description: rest.to_string(), - } - } - other => { - let arguments = if rest.is_empty() { - serde_json::Value::Null - } else { - serde_json::json!({ "content": rest }) - }; - Tool::Other { - tool_name: other.to_string(), - arguments, - } - } - }; - - Some(ToolCall { tool }) - } - - /// Check if a line is a valid tool line - pub fn is_tool_line(line: &str) -> bool { - Self::parse(line).is_some() - } - - /// Get the tool name - pub fn tool_name(&self) -> String { - self.tool.name() - } - - /// Get the tool arguments as JSON - pub fn arguments(&self) -> serde_json::Value { - self.tool.arguments() - } -} - -impl fmt::Display for ToolCall { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "| {} {}", self.tool.name(), self.tool.arguments()) - } -} - -// ============================================================================= -// TOOL UTILITIES -// ============================================================================= - -/// Utilities for processing tool calls -pub struct ToolUtils; - -impl ToolUtils { - pub fn normalize_tool_name(tool_name: &str) -> String { - tool_name.to_lowercase() - } - - /// Helper function to determine action type for tool usage - pub fn determine_action_type(tool: &Tool, worktree_path: &str) -> ActionType { - match tool { - Tool::Read { file_path, .. } => ActionType::FileRead { - path: make_path_relative(file_path, worktree_path), - }, - Tool::Write { - file_path, content, .. - } => { - let changes = if let Some(content) = content.clone() { - vec![FileChange::Write { content }] - } else { - vec![] - }; - ActionType::FileEdit { - path: make_path_relative(file_path, worktree_path), - changes, - } - } - Tool::Edit { - file_path, - old_string, - new_string, - .. - } => { - let changes = match (old_string, new_string) { - (Some(old), Some(new)) => vec![FileChange::Edit { - unified_diff: create_unified_diff(file_path, old, new), - has_line_numbers: false, - }], - _ => Vec::new(), - }; - ActionType::FileEdit { - path: make_path_relative(file_path, worktree_path), - changes, - } - } - Tool::Bash { command, .. } => ActionType::CommandRun { - command: command.clone(), - result: None, - }, - Tool::Grep { pattern, .. } => ActionType::Search { - query: pattern.clone(), - }, - Tool::Glob { pattern, .. } => ActionType::Search { - query: format!("glob: {pattern}"), - }, - Tool::List { .. } => ActionType::Other { - description: "Directory listing".to_string(), - }, - Tool::WebFetch { url, .. } => ActionType::Other { - description: format!("Web fetch: {url}"), - }, - Tool::TodoWrite { todos } => ActionType::TodoManagement { - todos: todos - .iter() - .map(|t| TodoItem { - content: t.content.clone(), - status: t.status.clone(), - priority: t.priority.clone(), - }) - .collect(), - operation: "write".to_string(), - }, - Tool::TodoRead => ActionType::TodoManagement { - todos: vec![], - operation: "read".to_string(), - }, - Tool::Task { description } => ActionType::Other { - description: format!("Task: {description}"), - }, - Tool::Other { tool_name, .. } => { - // Handle MCP tools (format: client_name_tool_name) - if tool_name.contains('_') { - ActionType::Other { - description: format!("MCP tool: {tool_name}"), - } - } else { - ActionType::Other { - description: format!("Tool: {tool_name}"), - } - } - } - } - } - - /// Helper function to generate concise content for tool usage - pub fn generate_tool_content(tool: &Tool, worktree_path: &str) -> String { - match tool { - Tool::Read { file_path, .. } => { - format!("`{}`", make_path_relative(file_path, worktree_path)) - } - Tool::Write { file_path, .. } | Tool::Edit { file_path, .. } => { - format!("`{}`", make_path_relative(file_path, worktree_path)) - } - Tool::Bash { command, .. } => { - format!("`{command}`") - } - Tool::Grep { - pattern, - path, - include, - } => { - let search_path = path.as_deref().unwrap_or("."); - match include { - Some(include_pattern) => { - format!("`{pattern}` in `{search_path}` ({include_pattern})") - } - None => format!("`{pattern}` in `{search_path}`"), - } - } - Tool::Glob { pattern, path } => { - let search_path = path.as_deref().unwrap_or("."); - format!("glob `{pattern}` in `{search_path}`") - } - Tool::List { path, .. } => { - if let Some(path) = path { - format!( - "List directory: `{}`", - make_path_relative(path, worktree_path) - ) - } else { - "List directory".to_string() - } - } - Tool::WebFetch { url, .. } => { - format!("fetch `{url}`") - } - Tool::Task { description } => { - format!("Task: `{description}`") - } - Tool::TodoWrite { .. } => "TODO list updated".to_string(), - Tool::TodoRead => "TODO list read".to_string(), - Tool::Other { tool_name, .. } => { - // Handle MCP tools (format: client_name_tool_name) - if tool_name.contains('_') { - format!("MCP: `{tool_name}`") - } else { - format!("`{tool_name}`") - } - } - } - } -} - // ============================================================================= // Log interpretation UTILITIES // ============================================================================= @@ -869,92 +956,13 @@ impl ToolUtils { lazy_static! { // Accurate regex for OpenCode log lines: LEVEL timestamp +ms ... static ref OPENCODE_LOG_REGEX: Regex = Regex::new(r"^(INFO|DEBUG|WARN|ERROR)\s+\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\s+\+\d+\s*ms.*").unwrap(); - static ref SESSION_ID_REGEX: Regex = Regex::new(r".*\b(id|session|sessionID)=([^ ]+)").unwrap(); - static ref NPM_WARN_REGEX: Regex = Regex::new(r"^npm warn .*").unwrap(); - static ref CWD_GIT_LOG_NOISE: Regex = Regex::new(r"^ cwd=.* git=.*/snapshots tracking$").unwrap(); } /// Log utilities for OpenCode processing pub struct LogUtils; impl LogUtils { - /// Check if a line should be skipped as noise - pub fn is_noise(line: &str) -> bool { - // Empty lines are noise - if line.is_empty() { - return true; - } - - if CWD_GIT_LOG_NOISE.is_match(line) { - return true; - } - - let line = line.trim(); - - if NPM_WARN_REGEX.is_match(line) { - return true; - } - - // Spinner glyphs - if line.len() == 1 && "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏".contains(line) { - return true; - } - - // Banner lines containing block glyphs (Unicode Block Elements range) - if line - .chars() - .take(1) - .any(|c| ('\u{2580}'..='\u{259F}').contains(&c)) - { - return true; - } - - // UI/stats frames using Box Drawing glyphs (U+2500-257F) - if line - .chars() - .take(1) - .any(|c| ('\u{2500}'..='\u{257F}').contains(&c)) - { - return true; - } - - // Model banner (@ with spaces) - if line.starts_with("@ ") { - return true; - } - - // Share link - if line.starts_with("~ https://opencode.ai/s/") { - return true; - } - - // Everything else is NOT noise - false - } - - /// Detect if a line is an OpenCode log line format using regex - pub fn is_opencode_log_line(line: &str) -> bool { - OPENCODE_LOG_REGEX.is_match(line) - } - pub fn is_error_line(line: &str) -> bool { line.starts_with("! ") } - - /// Parse session_id from OpenCode log lines - pub fn parse_session_id_from_line(line: &str) -> Option { - // Only apply to OpenCode log lines - if !Self::is_opencode_log_line(line) { - return None; - } - - // Try regex for session ID extraction from service=session logs - if let Some(captures) = SESSION_ID_REGEX.captures(line) - && let Some(id) = captures.get(2) - { - return Some(id.as_str().to_string()); - } - - None - } } diff --git a/crates/executors/src/executors/opencode/share_bridge.rs b/crates/executors/src/executors/opencode/share_bridge.rs new file mode 100644 index 00000000..44ff8ab0 --- /dev/null +++ b/crates/executors/src/executors/opencode/share_bridge.rs @@ -0,0 +1,196 @@ +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; + +use axum::{ + Json, Router, body::Bytes, extract::State, http::StatusCode, response::IntoResponse, + routing::post, +}; +use serde::{Deserialize, Serialize}; +use tokio::{ + net::TcpListener, + sync::{Mutex, RwLock, broadcast}, + task::JoinHandle, +}; + +/// Minimal subset of OpenCode share API that we need to ingest structured events locally. +/// +/// We run a lightweight HTTP server on 127.0.0.1 with an ephemeral port and point +/// OpenCode to it by setting OPENCODE_API and enabling auto-share. The CLI then POSTs +/// tool/message updates to /share_sync which we rebroadcast to interested consumers. + +#[derive(Debug)] +pub struct Bridge { + pub base_url: String, + tx: broadcast::Sender, + #[allow(dead_code)] + secrets: Arc>>, + shutdown_tx: Arc>>>, + _server_task: JoinHandle<()>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShareCreateReq { + #[serde(rename = "sessionID")] + pub session_id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShareCreateResp { + pub url: String, + pub secret: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShareSyncReq { + #[serde(rename = "sessionID")] + pub session_id: String, + pub secret: String, + pub key: String, + pub content: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmptyResp {} + +#[derive(Debug, Clone)] +pub enum ShareEvent { + Sync(ShareSyncReq), +} + +#[derive(Clone)] +struct AppState { + base_url: String, + tx: broadcast::Sender, + secrets: Arc>>, +} + +impl Bridge { + /// Start a new, isolated bridge server bound to localhost on an ephemeral port. + pub async fn start() -> std::io::Result> { + let (tx, _rx) = broadcast::channel(10_000); + let secrets = Arc::new(RwLock::new(HashMap::new())); + + // Bind to localhost:0 to get an ephemeral port + let listener = TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?; + let addr: SocketAddr = listener.local_addr()?; + let base_url = format!("http://{}:{}", addr.ip(), addr.port()); + tracing::debug!( + "OpenCode share bridge started: base_url={}, port={}", + base_url, + addr.port() + ); + + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let shutdown_tx = Arc::new(Mutex::new(Some(shutdown_tx))); + + let app_state = AppState { + base_url: base_url.clone(), + tx: tx.clone(), + secrets: secrets.clone(), + }; + + let server_task = tokio::spawn(async move { + let app = Router::new() + .route("/share_create", post(share_create)) + .route("/share_delete", post(share_delete)) + .route("/share_sync", post(share_sync)) + .with_state(app_state); + + // Serve with graceful shutdown + if let Err(e) = axum::serve(listener, app) + .with_graceful_shutdown(async move { + // wait for shutdown signal + let _ = shutdown_rx.await; + }) + .await + { + tracing::error!("opencode share bridge server error: {}", e); + } + }); + + Ok(Arc::new(Bridge { + base_url, + tx, + secrets, + shutdown_tx, + _server_task: server_task, + })) + } + + /// Subscribe to events from this bridge instance. + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + /// Trigger graceful shutdown of this bridge server. + pub async fn shutdown(&self) { + tracing::debug!("Shutting down OpenCode share bridge: {}", self.base_url); + if let Some(tx) = self.shutdown_tx.lock().await.take() { + let _ = tx.send(()); + } + } +} + +async fn share_create(State(state): State, body: Bytes) -> impl IntoResponse { + // accept JSON regardless of content-type + let payload: ShareCreateReq = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(_) => ShareCreateReq { + session_id: "".into(), + }, + }; + // Generate a simple secret and store against session id + let secret = uuid::Uuid::new_v4().to_string(); + { + let mut map = state.secrets.write().await; + map.insert(payload.session_id.clone(), secret.clone()); + } + ( + StatusCode::OK, + Json(ShareCreateResp { + secret, + url: format!("{}/s/{}", state.base_url, short(&payload.session_id)), + }), + ) +} + +async fn share_delete(_state: State, _body: Bytes) -> impl IntoResponse { + (StatusCode::OK, Json(EmptyResp {})) +} + +async fn share_sync(State(state): State, body: Bytes) -> impl IntoResponse { + let payload: ShareSyncReq = match serde_json::from_slice(&body) { + Ok(v) => v, + Err(_) => { + return (StatusCode::BAD_REQUEST, Json(EmptyResp {})); + } + }; + // Validate secret (best-effort) + let ok = { + let map = state.secrets.read().await; + map.get(&payload.session_id) + .map(|expected| expected == &payload.secret) + .unwrap_or(false) + }; + + if !ok { + // Still emit for debugging but warn + tracing::debug!( + "share_sync with invalid secret for session {}", + payload.session_id + ); + } + + // Broadcast event + let _ = state.tx.send(ShareEvent::Sync(payload)); + (StatusCode::OK, Json(EmptyResp {})) +} + +fn short(id: &str) -> String { + id.chars() + .rev() + .take(8) + .collect::() + .chars() + .rev() + .collect() +} diff --git a/crates/executors/src/stdout_dup.rs b/crates/executors/src/stdout_dup.rs index dbc512b4..0f07efb0 100644 --- a/crates/executors/src/stdout_dup.rs +++ b/crates/executors/src/stdout_dup.rs @@ -76,6 +76,89 @@ pub fn duplicate_stdout( Ok(Box::pin(UnboundedReceiverStream::new(dup_reader))) } +/// Handle to append additional lines into the child's stdout stream. +pub struct StdoutAppender { + tx: tokio::sync::mpsc::UnboundedSender, +} + +impl StdoutAppender { + pub fn append_line>(&self, line: S) { + // Best-effort; ignore send errors if writer task ended + let _ = self.tx.send(line.into()); + } +} + +/// Tee the child's stdout and provide both a duplicate stream and an appender to write additional +/// lines into the child's stdout. This keeps the original stdout functional and mirrors output to +/// the returned duplicate stream. +pub fn tee_stdout_with_appender( + child: &mut AsyncGroupChild, +) -> Result<(BoxStream<'static, std::io::Result>, StdoutAppender), ExecutorError> { + // Take original stdout + let original_stdout = child.inner().stdout.take().ok_or_else(|| { + ExecutorError::Io(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Child process has no stdout", + )) + })?; + + // Create replacement pipe and set as new child stdout + let (pipe_reader, pipe_writer) = os_pipe::pipe().map_err(|e| { + ExecutorError::Io(std::io::Error::other(format!("Failed to create pipe: {e}"))) + })?; + child.inner().stdout = Some(wrap_fd_as_child_stdout(pipe_reader)?); + + // Single shared writer for both original stdout forwarding and injected lines + let writer = wrap_fd_as_tokio_writer(pipe_writer)?; + let shared_writer = std::sync::Arc::new(tokio::sync::Mutex::new(writer)); + + // Create duplicate stream publisher + let (dup_tx, dup_rx) = tokio::sync::mpsc::unbounded_channel::>(); + // Create injector channel + let (inj_tx, mut inj_rx) = tokio::sync::mpsc::unbounded_channel::(); + + // Task 1: forward original stdout to child stdout and duplicate stream + { + let shared_writer = shared_writer.clone(); + tokio::spawn(async move { + let mut stdout_stream = ReaderStream::new(original_stdout); + while let Some(res) = stdout_stream.next().await { + match res { + Ok(data) => { + // forward to child stdout + let mut w = shared_writer.lock().await; + let _ = w.write_all(&data).await; + // publish duplicate + let string_chunk = String::from_utf8_lossy(&data).into_owned(); + let _ = dup_tx.send(Ok(string_chunk)); + } + Err(err) => { + let _ = dup_tx.send(Err(err)); + } + } + } + }); + } + + // Task 2: write injected lines to child stdout + { + let shared_writer = shared_writer.clone(); + tokio::spawn(async move { + while let Some(line) = inj_rx.recv().await { + let mut data = line.into_bytes(); + data.push(b'\n'); + let mut w = shared_writer.lock().await; + let _ = w.write_all(&data).await; + } + }); + } + + Ok(( + Box::pin(UnboundedReceiverStream::new(dup_rx)), + StdoutAppender { tx: inj_tx }, + )) +} + // ========================================= // OS file descriptor helper functions // ========================================= diff --git a/crates/utils/src/path.rs b/crates/utils/src/path.rs index 39099752..5e74ec69 100644 --- a/crates/utils/src/path.rs +++ b/crates/utils/src/path.rs @@ -6,19 +6,16 @@ pub const VIBE_IMAGES_DIR: &str = ".vibe-images"; /// Convert absolute paths to relative paths based on worktree path /// This is a robust implementation that handles symlinks and edge cases pub fn make_path_relative(path: &str, worktree_path: &str) -> String { - let path_obj = Path::new(path); - let worktree_path_obj = Path::new(worktree_path); - tracing::debug!("Making path relative: {} -> {}", path, worktree_path); + let path_obj = normalize_macos_private_alias(Path::new(&path)); + let worktree_path_obj = normalize_macos_private_alias(Path::new(worktree_path)); + // If path is already relative, return as is if path_obj.is_relative() { return path.to_string(); } - let path_obj = normalize_macos_private_alias(path_obj); - let worktree_path_obj = normalize_macos_private_alias(worktree_path_obj); - if let Ok(relative_path) = path_obj.strip_prefix(&worktree_path_obj) { let result = relative_path.to_string_lossy().to_string(); tracing::debug!("Successfully made relative: '{}' -> '{}'", path, result);