Prevent useConversationHistory conflicts (vibe-kanban) (#872)
* ## Summary Successfully implemented the `mergeIntoDisplayed` helper to prevent useConversationHistory conflicts! Here's what was accomplished: **Changes made to [`frontend/src/hooks/useConversationHistory.ts`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/5e29-prevent-useconve/frontend/src/hooks/useConversationHistory.ts):** 1. **Added mergeIntoDisplayed helper** that performs in-place mutations instead of reassigning the entire object 2. **Refactored 4 direct assignment locations**: - Stream updates in `loadRunningAndEmit` - Initial entries loading - Batch loading (with function signature improvement) - Process removal logic 3. **Maintained the deliberate reset** on attempt change (line 479) **Key improvements**: - Prevents race conditions between concurrent async operations - Eliminates data loss from process log overwrites - Maintains data integrity during streaming and batch loading - Passes type checking successfully The implementation follows the Oracle's recommendations for safe concurrent updates by always reading the latest state and mutating in-place rather than replacing the entire object reference. * Cleanup script changes for task attempt 5e29a440-f572-406d-9f07-be5f11fd3775
This commit is contained in:
committed by
GitHub
parent
4177a54d91
commit
23243dda7a
@@ -61,6 +61,13 @@ export const useConversationHistory = ({
|
||||
const loadedInitialEntries = useRef(false);
|
||||
const lastRunningProcessId = useRef<string | null>(null);
|
||||
const onEntriesUpdatedRef = useRef<OnEntriesUpdated | null>(null);
|
||||
|
||||
const mergeIntoDisplayed = (
|
||||
mutator: (state: ExecutionProcessStateStore) => void
|
||||
) => {
|
||||
const state = displayedExecutionProcesses.current;
|
||||
mutator(state);
|
||||
};
|
||||
useEffect(() => {
|
||||
onEntriesUpdatedRef.current = onEntriesUpdated;
|
||||
}, [onEntriesUpdated]);
|
||||
@@ -122,13 +129,13 @@ export const useConversationHistory = ({
|
||||
const patchesWithKey = entries.map((entry, index) =>
|
||||
patchWithKey(entry, executionProcess.id, index)
|
||||
);
|
||||
const localEntries = displayedExecutionProcesses.current;
|
||||
localEntries[executionProcess.id] = {
|
||||
executionProcess,
|
||||
entries: patchesWithKey,
|
||||
};
|
||||
displayedExecutionProcesses.current = localEntries;
|
||||
emitEntries(localEntries, 'running', false);
|
||||
mergeIntoDisplayed((state) => {
|
||||
state[executionProcess.id] = {
|
||||
executionProcess,
|
||||
entries: patchesWithKey,
|
||||
};
|
||||
});
|
||||
emitEntries(displayedExecutionProcesses.current, 'running', false);
|
||||
},
|
||||
onFinished: () => {
|
||||
emitEntries(displayedExecutionProcesses.current, 'running', false);
|
||||
@@ -372,13 +379,13 @@ export const useConversationHistory = ({
|
||||
|
||||
const loadRemainingEntriesInBatches = async (
|
||||
batchSize: number
|
||||
): Promise<ExecutionProcessStateStore | null> => {
|
||||
const local = displayedExecutionProcesses.current; // keep ref if intentional
|
||||
if (!executionProcesses?.current) return null;
|
||||
): Promise<boolean> => {
|
||||
if (!executionProcesses?.current) return false;
|
||||
|
||||
let anyUpdated = false;
|
||||
for (const executionProcess of [...executionProcesses.current].reverse()) {
|
||||
if (local[executionProcess.id] || executionProcess.status === 'running')
|
||||
const current = displayedExecutionProcesses.current;
|
||||
if (current[executionProcess.id] || executionProcess.status === 'running')
|
||||
continue;
|
||||
|
||||
const entries =
|
||||
@@ -387,17 +394,22 @@ export const useConversationHistory = ({
|
||||
patchWithKey(e, executionProcess.id, idx)
|
||||
);
|
||||
|
||||
local[executionProcess.id] = {
|
||||
executionProcess,
|
||||
entries: entriesWithKey,
|
||||
};
|
||||
if (flattenEntries(local).length > batchSize) {
|
||||
mergeIntoDisplayed((state) => {
|
||||
state[executionProcess.id] = {
|
||||
executionProcess,
|
||||
entries: entriesWithKey,
|
||||
};
|
||||
});
|
||||
|
||||
if (
|
||||
flattenEntries(displayedExecutionProcesses.current).length > batchSize
|
||||
) {
|
||||
anyUpdated = true;
|
||||
break;
|
||||
}
|
||||
anyUpdated = true;
|
||||
}
|
||||
return anyUpdated ? local : null;
|
||||
return anyUpdated;
|
||||
};
|
||||
|
||||
const emitEntries = (
|
||||
@@ -430,19 +442,18 @@ export const useConversationHistory = ({
|
||||
// Initial entries
|
||||
const allInitialEntries = await loadInitialEntries();
|
||||
if (cancelled) return;
|
||||
displayedExecutionProcesses.current = allInitialEntries;
|
||||
emitEntries(allInitialEntries, 'initial', false);
|
||||
mergeIntoDisplayed((state) => {
|
||||
Object.assign(state, allInitialEntries);
|
||||
});
|
||||
emitEntries(displayedExecutionProcesses.current, 'initial', false);
|
||||
loadedInitialEntries.current = true;
|
||||
|
||||
// Then load the remaining in batches
|
||||
let updatedEntries;
|
||||
while (
|
||||
!cancelled &&
|
||||
(updatedEntries =
|
||||
await loadRemainingEntriesInBatches(REMAINING_BATCH_SIZE))
|
||||
(await loadRemainingEntriesInBatches(REMAINING_BATCH_SIZE))
|
||||
) {
|
||||
if (cancelled) return;
|
||||
displayedExecutionProcesses.current = updatedEntries;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
emitEntries(displayedExecutionProcesses.current, 'historic', false);
|
||||
@@ -469,9 +480,13 @@ export const useConversationHistory = ({
|
||||
displayedExecutionProcesses.current
|
||||
).filter((id) => !executionProcessesRaw.some((p) => p.id === id));
|
||||
|
||||
removedProcessIds.forEach((id) => {
|
||||
delete displayedExecutionProcesses.current[id];
|
||||
});
|
||||
if (removedProcessIds.length > 0) {
|
||||
mergeIntoDisplayed((state) => {
|
||||
removedProcessIds.forEach((id) => {
|
||||
delete state[id];
|
||||
});
|
||||
});
|
||||
}
|
||||
}, [attempt.id, idListKey]);
|
||||
|
||||
// Reset state when attempt changes
|
||||
|
||||
Reference in New Issue
Block a user