Opencode ACP (#1471)

* Opencode ACP

switch opencode to ACP.

Simplifies the implementation and adds fork/retry support.

* display relative file path for read tool

* remove empty assitant messages
This commit is contained in:
Solomon
2025-12-09 11:29:19 +00:00
committed by GitHub
parent 1ee05ea862
commit aee6ac35b8
9 changed files with 142 additions and 1128 deletions

View File

@@ -38,7 +38,7 @@ convert_case = "0.6"
sqlx = "0.8.6"
axum = { workspace = true }
shlex = "1.3.0"
agent-client-protocol = "0.8"
agent-client-protocol = { version = "0.8", features = ["unstable"] }
codex-protocol = { git = "https://github.com/openai/codex.git", package = "codex-protocol", rev = "80d6a3868ef1414e0fb1c2e28a369f2ef4fa4dcc" }
codex-app-server-protocol = { git = "https://github.com/openai/codex.git", package = "codex-app-server-protocol", rev = "80d6a3868ef1414e0fb1c2e28a369f2ef4fa4dcc" }
codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "80d6a3868ef1414e0fb1c2e28a369f2ef4fa4dcc" }

View File

@@ -79,6 +79,11 @@
"OPENCODE": {
"DEFAULT": {
"OPENCODE": {}
},
"PLAN": {
"OPENCODE": {
"mode": "plan"
}
}
},
"QWEN_CODE": {

View File

@@ -26,6 +26,8 @@ use crate::{
/// Reusable harness for ACP-based conns (Gemini, Qwen, etc.)
pub struct AcpAgentHarness {
session_namespace: String,
model: Option<String>,
mode: Option<String>,
}
impl Default for AcpAgentHarness {
@@ -40,6 +42,8 @@ impl AcpAgentHarness {
pub fn new() -> Self {
Self {
session_namespace: "gemini_sessions".to_string(),
model: None,
mode: None,
}
}
@@ -47,9 +51,21 @@ impl AcpAgentHarness {
pub fn with_session_namespace(namespace: impl Into<String>) -> Self {
Self {
session_namespace: namespace.into(),
model: None,
mode: None,
}
}
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.model = Some(model.into());
self
}
pub fn with_mode(mut self, mode: impl Into<String>) -> Self {
self.mode = Some(mode.into());
self
}
pub async fn spawn_with_command(
&self,
current_dir: &Path,
@@ -83,6 +99,8 @@ impl AcpAgentHarness {
prompt,
Some(exit_tx),
self.session_namespace.clone(),
self.model.clone(),
self.mode.clone(),
)
.await?;
@@ -127,6 +145,8 @@ impl AcpAgentHarness {
prompt,
Some(exit_tx),
self.session_namespace.clone(),
self.model.clone(),
self.mode.clone(),
)
.await?;
@@ -137,6 +157,7 @@ impl AcpAgentHarness {
})
}
#[allow(clippy::too_many_arguments)]
async fn bootstrap_acp_connection(
child: &mut AsyncGroupChild,
cwd: PathBuf,
@@ -144,6 +165,8 @@ impl AcpAgentHarness {
prompt: String,
exit_signal: Option<tokio::sync::oneshot::Sender<ExecutorExitResult>>,
session_namespace: String,
model: Option<String>,
mode: Option<String>,
) -> Result<(), ExecutorError> {
// Take child's stdio for ACP wiring
let orig_stdout = child.inner().stdout.take().ok_or_else(|| {
@@ -329,6 +352,32 @@ impl AcpAgentHarness {
let _ = log_tx
.send(AcpEvent::SessionStart(display_session_id.clone()).to_string());
if let Some(model) = model.clone() {
match conn
.set_session_model(proto::SetSessionModelRequest::new(
proto::SessionId::new(acp_session_id.clone()),
model,
))
.await
{
Ok(_) => {}
Err(e) => error!("Failed to set session mode: {}", e),
}
}
if let Some(mode) = mode.clone() {
match conn
.set_session_mode(proto::SetSessionModeRequest::new(
proto::SessionId::new(acp_session_id.clone()),
mode,
))
.await
{
Ok(_) => {}
Err(e) => error!("Failed to set session mode: {}", e),
}
}
// Start raw event forwarder and persistence
let app_tx_clone = log_tx.clone();
let sess_id_for_writer = display_session_id.clone();

View File

@@ -8,14 +8,14 @@ use agent_client_protocol::{self as acp, SessionNotification};
use futures::StreamExt;
use regex::Regex;
use serde::Deserialize;
use tracing::debug;
use tracing::{debug, trace};
use workspace_utils::msg_store::MsgStore;
pub use super::AcpAgentHarness;
use super::AcpEvent;
use crate::logs::{
ActionType, FileChange, NormalizedEntry, NormalizedEntryError, NormalizedEntryType, ToolResult,
ToolResultValueType, ToolStatus as LogToolStatus,
ActionType, FileChange, NormalizedEntry, NormalizedEntryError, NormalizedEntryType, TodoItem,
ToolResult, ToolResultValueType, ToolStatus as LogToolStatus,
stderr_processor::normalize_stderr_logs,
utils::{ConversationPatch, EntryIndexProvider},
};
@@ -38,7 +38,7 @@ pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
let mut stdout_lines = msg_store.stdout_lines_stream();
while let Some(Ok(line)) = stdout_lines.next().await {
if let Some(parsed) = AcpEventParser::parse_line(&line) {
debug!("Parsed ACP line: {:?}", parsed);
trace!("Parsed ACP line: {:?}", parsed);
match parsed {
AcpEvent::SessionStart(id) => {
if !stored_session_id {
@@ -67,6 +67,9 @@ pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
if let agent_client_protocol::ContentBlock::Text(text) = content {
let is_new = streaming.assistant_text.is_none();
if is_new {
if text.text == "\n" {
continue;
}
let idx = entry_index.next();
streaming.assistant_text = Some(StreamingText {
index: idx,
@@ -121,15 +124,33 @@ pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
AcpEvent::Plan(plan) => {
streaming.assistant_text = None;
streaming.thinking_text = None;
let mut body = String::from("Plan:\n");
for (i, e) in plan.entries.iter().enumerate() {
body.push_str(&format!("{}. {}\n", i + 1, e.content));
}
let todos: Vec<TodoItem> = plan
.entries
.iter()
.map(|e| TodoItem {
content: e.content.clone(),
status: serde_json::to_value(&e.status)
.ok()
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_else(|| "unknown".to_string()),
priority: serde_json::to_value(&e.priority)
.ok()
.and_then(|v| v.as_str().map(|s| s.to_string())),
})
.collect();
let idx = entry_index.next();
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::SystemMessage,
content: body,
entry_type: NormalizedEntryType::ToolUse {
tool_name: "plan".to_string(),
action_type: ActionType::TodoManagement {
todos,
operation: "update".to_string(),
},
status: LogToolStatus::Success,
},
content: "Plan updated".to_string(),
metadata: None,
};
msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry));
@@ -186,7 +207,7 @@ pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
.map(|s| s.title.clone())
.or_else(|| Some("".to_string()));
}
debug!("Got tool call update: {:?}", update);
trace!("Got tool call update: {:?}", update);
if let Ok(tc) = agent_client_protocol::ToolCall::try_from(update.clone()) {
handle_tool_call(
&tc,
@@ -282,10 +303,9 @@ pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
// Prefer structured raw_output, else fallback to aggregated text content
let completed =
matches!(tc.status, agent_client_protocol::ToolCallStatus::Completed);
tracing::debug!(
trace!(
"Mapping execute tool call, completed: {}, command: {}",
completed,
command
completed, command
);
let tc_exit_status = match tc.status {
agent_client_protocol::ToolCallStatus::Completed => {
@@ -463,7 +483,10 @@ pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
if tc.id.0.starts_with("read_many_files") {
"Read files".to_string()
} else {
tc.title.clone()
tc.path
.as_ref()
.map(|p| p.display().to_string())
.unwrap_or_else(|| tc.title.clone())
}
}
_ => tc.title.clone(),

View File

@@ -159,13 +159,14 @@ impl CodingAgent {
| Self::Amp(_)
| Self::Gemini(_)
| Self::QwenCode(_)
| Self::Droid(_) => vec![BaseAgentCapability::SessionFork],
| Self::Droid(_)
| Self::Opencode(_) => vec![BaseAgentCapability::SessionFork],
Self::Codex(_) => vec![
BaseAgentCapability::SessionFork,
BaseAgentCapability::SetupHelper,
],
Self::CursorAgent(_) => vec![BaseAgentCapability::SetupHelper],
Self::Opencode(_) | Self::Copilot(_) => vec![],
Self::Copilot(_) => vec![],
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,196 +0,0 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use axum::{
Json, Router, body::Bytes, extract::State, http::StatusCode, response::IntoResponse,
routing::post,
};
use serde::{Deserialize, Serialize};
use tokio::{
net::TcpListener,
sync::{Mutex, RwLock, broadcast},
task::JoinHandle,
};
/// Minimal subset of OpenCode share API that we need to ingest structured events locally.
///
/// We run a lightweight HTTP server on 127.0.0.1 with an ephemeral port and point
/// OpenCode to it by setting OPENCODE_API and enabling auto-share. The CLI then POSTs
/// tool/message updates to /share_sync which we rebroadcast to interested consumers.
#[derive(Debug)]
pub struct Bridge {
pub base_url: String,
tx: broadcast::Sender<ShareEvent>,
#[allow(dead_code)]
secrets: Arc<RwLock<HashMap<String, String>>>,
shutdown_tx: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
_server_task: JoinHandle<()>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShareCreateReq {
#[serde(rename = "sessionID")]
pub session_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShareCreateResp {
pub url: String,
pub secret: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShareSyncReq {
#[serde(rename = "sessionID")]
pub session_id: String,
pub secret: String,
pub key: String,
pub content: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmptyResp {}
#[derive(Debug, Clone)]
pub enum ShareEvent {
Sync(ShareSyncReq),
}
#[derive(Clone)]
struct AppState {
base_url: String,
tx: broadcast::Sender<ShareEvent>,
secrets: Arc<RwLock<HashMap<String, String>>>,
}
impl Bridge {
/// Start a new, isolated bridge server bound to localhost on an ephemeral port.
pub async fn start() -> std::io::Result<Arc<Bridge>> {
let (tx, _rx) = broadcast::channel(10_000);
let secrets = Arc::new(RwLock::new(HashMap::new()));
// Bind to localhost:0 to get an ephemeral port
let listener = TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0)).await?;
let addr: SocketAddr = listener.local_addr()?;
let base_url = format!("http://{}:{}", addr.ip(), addr.port());
tracing::debug!(
"OpenCode share bridge started: base_url={}, port={}",
base_url,
addr.port()
);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let shutdown_tx = Arc::new(Mutex::new(Some(shutdown_tx)));
let app_state = AppState {
base_url: base_url.clone(),
tx: tx.clone(),
secrets: secrets.clone(),
};
let server_task = tokio::spawn(async move {
let app = Router::new()
.route("/share_create", post(share_create))
.route("/share_delete", post(share_delete))
.route("/share_sync", post(share_sync))
.with_state(app_state);
// Serve with graceful shutdown
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(async move {
// wait for shutdown signal
let _ = shutdown_rx.await;
})
.await
{
tracing::error!("opencode share bridge server error: {}", e);
}
});
Ok(Arc::new(Bridge {
base_url,
tx,
secrets,
shutdown_tx,
_server_task: server_task,
}))
}
/// Subscribe to events from this bridge instance.
pub fn subscribe(&self) -> broadcast::Receiver<ShareEvent> {
self.tx.subscribe()
}
/// Trigger graceful shutdown of this bridge server.
pub async fn shutdown(&self) {
tracing::debug!("Shutting down OpenCode share bridge: {}", self.base_url);
if let Some(tx) = self.shutdown_tx.lock().await.take() {
let _ = tx.send(());
}
}
}
async fn share_create(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
// accept JSON regardless of content-type
let payload: ShareCreateReq = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(_) => ShareCreateReq {
session_id: "".into(),
},
};
// Generate a simple secret and store against session id
let secret = uuid::Uuid::new_v4().to_string();
{
let mut map = state.secrets.write().await;
map.insert(payload.session_id.clone(), secret.clone());
}
(
StatusCode::OK,
Json(ShareCreateResp {
secret,
url: format!("{}/s/{}", state.base_url, short(&payload.session_id)),
}),
)
}
async fn share_delete(_state: State<AppState>, _body: Bytes) -> impl IntoResponse {
(StatusCode::OK, Json(EmptyResp {}))
}
async fn share_sync(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
let payload: ShareSyncReq = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(_) => {
return (StatusCode::BAD_REQUEST, Json(EmptyResp {}));
}
};
// Validate secret (best-effort)
let ok = {
let map = state.secrets.read().await;
map.get(&payload.session_id)
.map(|expected| expected == &payload.secret)
.unwrap_or(false)
};
if !ok {
// Still emit for debugging but warn
tracing::debug!(
"share_sync with invalid secret for session {}",
payload.session_id
);
}
// Broadcast event
let _ = state.tx.send(ShareEvent::Sync(payload));
(StatusCode::OK, Json(EmptyResp {}))
}
fn short(id: &str) -> String {
id.chars()
.rev()
.take(8)
.collect::<String>()
.chars()
.rev()
.collect()
}