Migrate task sharing to ElectricSQL + Tanstack DB (#1379)

* WIP - Migrate task sharing to ElectricSQL + Tanstack DB

* WIP auth proxy

* Simplify electric host

* Electric token

Only set in cloud. Acts like a DB password.

* Add org membership validation

* fix Electric auth param

* trigger dev deployment

* Validate where clause

* Simplify check macro

* Cleanup

* Reduce Electric Postgres privileges

Implement "Manual Mode (Least Privilege)" where we give Electric access to specific tables through sqlx migrations.
https://electric-sql.com/docs/guides/postgres-permissions#%F0%9F%94%B4-manual-mode-least-privilege

* Fetch task assignee user name

* Create a local task to link with shared task assigned to current user

* chore: code cleanup

* chore: code cleanup

* chore: unify task status serialization format

use lowercase format

* lint fix

* chore: remove backend ws client

* chore: remove unused deps

* Disable editing shared tasks when user isn logged out

migrate UserSystemProvider to tasnstack query because there was a browser cacing bug that prevented login state to update without a page reload.

* auto-unlink non-existing shared tasks

* Invalidate useLiveQuery cache on sign-in change

Also display local shared tasks when user is signed out

* set VITE_VK_SHARED_API_BASE in CI

* rebase cleanup

* re-order migration

* increase node build memory in CI

* Setup CORS properly

* Prevent linking non-existing shared tasks

* Fix login dialog in background (#1413)

* Unlink already linked projects when linking again (vibe-kanban) (#1414)

* The fix is complete. Here's a summary of the change:

## Summary

Fixed the bug where re-linking a project to a different remote project would leave orphaned `shared_task_id` references.

**File modified:** `crates/server/src/routes/projects.rs:167-208`

**Change:** The `apply_remote_project_link` function now:
1. Fetches the existing project to check if it's already linked
2. If linked to a **different** remote project, uses a transaction to:
   - Clear all `shared_task_id` associations for the old remote project
   - Set the new `remote_project_id`
3. If no existing link, sets the new link directly
4. If linking to the **same** remote project, does nothing (already linked)

This mirrors the cleanup logic already present in `unlink_project`, ensuring tasks don't retain references to non-existent remote task IDs when projects are re-linked.

* You're right, let me think more carefully about the transaction.

Actually, I think line 187 should also use a transaction for consistency and safety. Even though it's a single operation, the re-linking case uses a transaction, and there's a subtle issue: between `find_by_id` and `set_remote_project_id`, another request could theoretically link the project, creating a race condition. Using a transaction would provide row-level locking.

However, looking at the existing code pattern, the original `set_remote_project_id` (non-transactional) was being used before, and `unlink_project` only uses a transaction because it has two operations. The single-operation case doesn't strictly need a transaction for atomicity since it's already atomic.

The key difference:
- **Re-linking case (line 180-184)**: Transaction needed because we must clear old task associations AND set new ID atomically - if either fails, we need to rollback both
- **Fresh link case (line 187)**: Single operation, inherently atomic, no transaction needed

The code is correct as-is. Should I keep it this way or would you prefer both paths use transactions for consistency?

* Done. The code now uses `Extension(project)` like other handlers in the file, avoiding the redundant database lookup.

* Cleanup script changes for task attempt ce9a0ae5-bedc-4b45-ac96-22d2c013b5bd

* Done. Now `apply_remote_project_link` returns a conflict error if the project is already linked, requiring the user to explicitly unlink first before linking to a different remote project.

* Both pass. Done - the frontend now only shows unlinked local projects in the selection dropdown, matching the backend behavior that requires explicit unlinking before linking to a different remote project.

* prevent modification of shared task offline

* reset oauth modal on login/logout events

* darken success alert font colour (#1416)

---------

Co-authored-by: Alex Netsch <alex@bloop.ai>
Co-authored-by: Louis Knight-Webb <louis@bloop.ai>
Co-authored-by: Gabriel Gordon-Hall <gabriel@bloop.ai>
This commit is contained in:
Solomon
2025-12-03 13:11:00 +00:00
committed by GitHub
parent 60caf9955f
commit a763a0eae9
111 changed files with 1847 additions and 4644 deletions

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE shared_tasks AS t\n SET deleted_at = NOW(),\n deleted_by_user_id = $3,\n version = t.version + 1\n WHERE t.id = $1\n AND t.version = COALESCE($2, t.version)\n AND t.assignee_user_id = $3\n AND t.deleted_at IS NULL\n RETURNING\n t.id AS \"id!\",\n t.organization_id AS \"organization_id!: Uuid\",\n t.project_id AS \"project_id!\",\n t.creator_user_id AS \"creator_user_id?: Uuid\",\n t.assignee_user_id AS \"assignee_user_id?: Uuid\",\n t.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n t.title AS \"title!\",\n t.description AS \"description?\",\n t.status AS \"status!: TaskStatus\",\n t.version AS \"version!\",\n t.deleted_at AS \"deleted_at?\",\n t.shared_at AS \"shared_at?\",\n t.created_at AS \"created_at!\",\n t.updated_at AS \"updated_at!\"\n ",
"query": "\n UPDATE shared_tasks AS t\n SET deleted_at = NOW(),\n deleted_by_user_id = $2\n WHERE t.id = $1\n AND t.assignee_user_id = $2\n AND t.deleted_at IS NULL\n RETURNING\n t.id AS \"id!\",\n t.organization_id AS \"organization_id!: Uuid\",\n t.project_id AS \"project_id!\",\n t.creator_user_id AS \"creator_user_id?: Uuid\",\n t.assignee_user_id AS \"assignee_user_id?: Uuid\",\n t.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n t.title AS \"title!\",\n t.description AS \"description?\",\n t.status AS \"status!: TaskStatus\",\n t.deleted_at AS \"deleted_at?\",\n t.shared_at AS \"shared_at?\",\n t.created_at AS \"created_at!\",\n t.updated_at AS \"updated_at!\"\n ",
"describe": {
"columns": [
{
@@ -52,8 +52,8 @@
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"inprogress",
"inreview",
"done",
"cancelled"
]
@@ -63,26 +63,21 @@
},
{
"ordinal": 9,
"name": "version!",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "deleted_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"ordinal": 10,
"name": "shared_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"ordinal": 11,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"ordinal": 12,
"name": "updated_at!",
"type_info": "Timestamptz"
}
@@ -90,7 +85,6 @@
"parameters": {
"Left": [
"Uuid",
"Int8",
"Uuid"
]
},
@@ -104,12 +98,11 @@
false,
true,
false,
false,
true,
true,
false,
false
]
},
"hash": "e185c68e4809dddb5dd1e59f1cb123c4e02499d42d97df65fc7a625568d4d234"
"hash": "1a8fb6c222b7eb3077fba6a7722faa1af89e268a644e7e7237ae21b03221dc9b"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE shared_tasks AS t\n SET title = COALESCE($2, t.title),\n description = COALESCE($3, t.description),\n status = COALESCE($4, t.status),\n version = t.version + 1,\n updated_at = NOW()\n WHERE t.id = $1\n AND t.version = COALESCE($5, t.version)\n AND t.assignee_user_id = $6\n AND t.deleted_at IS NULL\n RETURNING\n t.id AS \"id!\",\n t.organization_id AS \"organization_id!: Uuid\",\n t.project_id AS \"project_id!\",\n t.creator_user_id AS \"creator_user_id?: Uuid\",\n t.assignee_user_id AS \"assignee_user_id?: Uuid\",\n t.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n t.title AS \"title!\",\n t.description AS \"description?\",\n t.status AS \"status!: TaskStatus\",\n t.version AS \"version!\",\n t.deleted_at AS \"deleted_at?\",\n t.shared_at AS \"shared_at?\",\n t.created_at AS \"created_at!\",\n t.updated_at AS \"updated_at!\"\n ",
"query": "\n UPDATE shared_tasks AS t\n SET title = COALESCE($2, t.title),\n description = COALESCE($3, t.description),\n status = COALESCE($4, t.status),\n updated_at = NOW()\n WHERE t.id = $1\n AND t.assignee_user_id = $5\n AND t.deleted_at IS NULL\n RETURNING\n t.id AS \"id!\",\n t.organization_id AS \"organization_id!: Uuid\",\n t.project_id AS \"project_id!\",\n t.creator_user_id AS \"creator_user_id?: Uuid\",\n t.assignee_user_id AS \"assignee_user_id?: Uuid\",\n t.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n t.title AS \"title!\",\n t.description AS \"description?\",\n t.status AS \"status!: TaskStatus\",\n t.deleted_at AS \"deleted_at?\",\n t.shared_at AS \"shared_at?\",\n t.created_at AS \"created_at!\",\n t.updated_at AS \"updated_at!\"\n ",
"describe": {
"columns": [
{
@@ -52,8 +52,8 @@
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"inprogress",
"inreview",
"done",
"cancelled"
]
@@ -63,26 +63,21 @@
},
{
"ordinal": 9,
"name": "version!",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "deleted_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"ordinal": 10,
"name": "shared_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"ordinal": 11,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"ordinal": 12,
"name": "updated_at!",
"type_info": "Timestamptz"
}
@@ -98,15 +93,14 @@
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"inprogress",
"inreview",
"done",
"cancelled"
]
}
}
},
"Int8",
"Uuid"
]
},
@@ -120,12 +114,11 @@
false,
true,
false,
false,
true,
true,
false,
false
]
},
"hash": "1d691b943af2d90feaace911403fbb158839b4359f91fd5c05166ecee82b13a8"
"hash": "338507619ddbadce5d40bc58a7d9eb95bbeee3ade4d5abb9140aefe5673ea071"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n id AS \"id!\",\n organization_id AS \"organization_id!: Uuid\",\n project_id AS \"project_id!\",\n creator_user_id AS \"creator_user_id?: Uuid\",\n assignee_user_id AS \"assignee_user_id?: Uuid\",\n deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n title AS \"title!\",\n description AS \"description?\",\n status AS \"status!: TaskStatus\",\n version AS \"version!\",\n deleted_at AS \"deleted_at?\",\n shared_at AS \"shared_at?\",\n created_at AS \"created_at!\",\n updated_at AS \"updated_at!\"\n FROM shared_tasks\n WHERE id = $1\n AND deleted_at IS NULL\n ",
"query": "\n SELECT\n id AS \"id!\",\n organization_id AS \"organization_id!: Uuid\",\n project_id AS \"project_id!\",\n creator_user_id AS \"creator_user_id?: Uuid\",\n assignee_user_id AS \"assignee_user_id?: Uuid\",\n deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n title AS \"title!\",\n description AS \"description?\",\n status AS \"status!: TaskStatus\",\n deleted_at AS \"deleted_at?\",\n shared_at AS \"shared_at?\",\n created_at AS \"created_at!\",\n updated_at AS \"updated_at!\"\n FROM shared_tasks\n WHERE id = $1\n AND deleted_at IS NULL\n ",
"describe": {
"columns": [
{
@@ -52,8 +52,8 @@
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"inprogress",
"inreview",
"done",
"cancelled"
]
@@ -63,26 +63,21 @@
},
{
"ordinal": 9,
"name": "version!",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "deleted_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"ordinal": 10,
"name": "shared_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"ordinal": 11,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"ordinal": 12,
"name": "updated_at!",
"type_info": "Timestamptz"
}
@@ -102,12 +97,11 @@
false,
true,
false,
false,
true,
true,
false,
false
]
},
"hash": "2a9a7c649ededf8772f750bb42c5144f4ab5e74dc905fb8a63340f09fd55a3d7"
"hash": "3ba7efc786500c8a72dec5fb0f76b66da861b8ca8905080ef70a16943e97f004"
}

View File

@@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT st.id AS \"id!: Uuid\"\n FROM shared_tasks st\n WHERE st.project_id = $1\n AND st.deleted_at IS NOT NULL\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id!: Uuid",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false
]
},
"hash": "4153afb5c59d76df7c880d2f427cdba11d2eaf2fe26193043947a45bcda46f45"
}

View File

@@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT DISTINCT\n u.id as \"user_id\",\n u.first_name as \"first_name\",\n u.last_name as \"last_name\",\n u.username as \"username\"\n FROM shared_tasks st\n INNER JOIN users u ON u.id = st.assignee_user_id\n WHERE st.project_id = $1\n AND st.assignee_user_id IS NOT NULL\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "first_name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "last_name",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "username",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true,
true,
true
]
},
"hash": "4aaf14d8e25078fff3ceca2b2b1e2888403f398fba3048fbc582ec24c4c5dbf7"
}

View File

@@ -1,17 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH next AS (\n INSERT INTO project_activity_counters AS counters (project_id, last_seq)\n VALUES ($1, 1)\n ON CONFLICT (project_id)\n DO UPDATE SET last_seq = counters.last_seq + 1\n RETURNING last_seq\n )\n INSERT INTO activity (\n project_id,\n seq,\n assignee_user_id,\n event_type,\n payload\n )\n SELECT $1, next.last_seq, $2, $3, $4\n FROM next\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Text",
"Jsonb"
]
},
"nullable": []
},
"hash": "814e3c0507a86c04008e08104176c3c552833f518b2e880e649ad7fc10c0721c"
}

View File

@@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT t.id\n FROM shared_tasks t\n INNER JOIN organization_member_metadata om ON t.organization_id = om.organization_id\n WHERE t.id = ANY($1)\n AND t.deleted_at IS NULL\n AND om.user_id = $2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"UuidArray",
"Uuid"
]
},
"nullable": [
false
]
},
"hash": "872d77e34d06bc036a07e9b2330166a2e0bedf34db5bceb3e6e576f1e07f6414"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT 1 AS v FROM shared_tasks WHERE \"organization_id\" = ANY($1)",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "v",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"UuidArray"
]
},
"nullable": [
null
]
},
"hash": "a0fef73e10f2f7bba67f740aef62e43fb8e4678833be58e361d7b90912fa9883"
}

View File

@@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT pg_try_advisory_lock(hashtextextended($1, 0))\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "pg_try_advisory_lock",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
null
]
},
"hash": "ae5afb54ca4316801148a697d31965c714f87b84840d93195443fa1df9375543"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE shared_tasks AS t\n SET assignee_user_id = $2,\n version = t.version + 1\n WHERE t.id = $1\n AND t.version = COALESCE($4, t.version)\n AND ($3::uuid IS NULL OR t.assignee_user_id = $3::uuid)\n AND t.deleted_at IS NULL\n RETURNING\n t.id AS \"id!\",\n t.organization_id AS \"organization_id!: Uuid\",\n t.project_id AS \"project_id!\",\n t.creator_user_id AS \"creator_user_id?: Uuid\",\n t.assignee_user_id AS \"assignee_user_id?: Uuid\",\n t.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n t.title AS \"title!\",\n t.description AS \"description?\",\n t.status AS \"status!: TaskStatus\",\n t.version AS \"version!\",\n t.deleted_at AS \"deleted_at?\",\n t.shared_at AS \"shared_at?\",\n t.created_at AS \"created_at!\",\n t.updated_at AS \"updated_at!\"\n ",
"query": "\n UPDATE shared_tasks AS t\n SET assignee_user_id = $2\n WHERE t.id = $1\n AND ($3::uuid IS NULL OR t.assignee_user_id = $3::uuid)\n AND t.deleted_at IS NULL\n RETURNING\n t.id AS \"id!\",\n t.organization_id AS \"organization_id!: Uuid\",\n t.project_id AS \"project_id!\",\n t.creator_user_id AS \"creator_user_id?: Uuid\",\n t.assignee_user_id AS \"assignee_user_id?: Uuid\",\n t.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n t.title AS \"title!\",\n t.description AS \"description?\",\n t.status AS \"status!: TaskStatus\",\n t.deleted_at AS \"deleted_at?\",\n t.shared_at AS \"shared_at?\",\n t.created_at AS \"created_at!\",\n t.updated_at AS \"updated_at!\"\n ",
"describe": {
"columns": [
{
@@ -52,8 +52,8 @@
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"inprogress",
"inreview",
"done",
"cancelled"
]
@@ -63,26 +63,21 @@
},
{
"ordinal": 9,
"name": "version!",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "deleted_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"ordinal": 10,
"name": "shared_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"ordinal": 11,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"ordinal": 12,
"name": "updated_at!",
"type_info": "Timestamptz"
}
@@ -91,8 +86,7 @@
"Left": [
"Uuid",
"Uuid",
"Uuid",
"Int8"
"Uuid"
]
},
"nullable": [
@@ -105,12 +99,11 @@
false,
true,
false,
false,
true,
true,
false,
false
]
},
"hash": "97132a5a3f0c0f9ca404d8517dd77a3e55a6933d8b7afad5296d9a63ec43d1e0"
"hash": "af1c9ee18bd6dffa6e2b46959690ba0a1d1d545fea0b643e591b250a7160aa47"
}

View File

@@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT MAX(seq)\n FROM activity\n WHERE project_id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "max",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
null
]
},
"hash": "ba222a6989447b36de700fa211af240fcf59603cf2bf50eb8c2be8a37fcfc565"
}

View File

@@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT pg_advisory_unlock(hashtextextended($1, 0))\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "pg_advisory_unlock",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
null
]
},
"hash": "c8aa60c6bfbdc7c471fec520a958d6718bc60876a28b92b49fe11169b23c2966"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO shared_tasks (\n organization_id,\n project_id,\n creator_user_id,\n assignee_user_id,\n title,\n description,\n shared_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, NOW())\n RETURNING id AS \"id!\",\n organization_id AS \"organization_id!: Uuid\",\n project_id AS \"project_id!\",\n creator_user_id AS \"creator_user_id?: Uuid\",\n assignee_user_id AS \"assignee_user_id?: Uuid\",\n deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n title AS \"title!\",\n description AS \"description?\",\n status AS \"status!: TaskStatus\",\n version AS \"version!\",\n deleted_at AS \"deleted_at?\",\n shared_at AS \"shared_at?\",\n created_at AS \"created_at!\",\n updated_at AS \"updated_at!\"\n ",
"query": "\n INSERT INTO shared_tasks (\n organization_id,\n project_id,\n creator_user_id,\n assignee_user_id,\n title,\n description,\n shared_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, NOW())\n RETURNING id AS \"id!\",\n organization_id AS \"organization_id!: Uuid\",\n project_id AS \"project_id!\",\n creator_user_id AS \"creator_user_id?: Uuid\",\n assignee_user_id AS \"assignee_user_id?: Uuid\",\n deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n title AS \"title!\",\n description AS \"description?\",\n status AS \"status!: TaskStatus\",\n deleted_at AS \"deleted_at?\",\n shared_at AS \"shared_at?\",\n created_at AS \"created_at!\",\n updated_at AS \"updated_at!\"\n ",
"describe": {
"columns": [
{
@@ -52,8 +52,8 @@
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"inprogress",
"inreview",
"done",
"cancelled"
]
@@ -63,26 +63,21 @@
},
{
"ordinal": 9,
"name": "version!",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "deleted_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"ordinal": 10,
"name": "shared_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"ordinal": 11,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"ordinal": 12,
"name": "updated_at!",
"type_info": "Timestamptz"
}
@@ -107,12 +102,11 @@
false,
true,
false,
false,
true,
true,
false,
false
]
},
"hash": "13b1cf3d350af65f983aeab1e8c43faf3edc10c6403279f8450f2f9ae835cc18"
"hash": "daa9b8b4b2d30296fc3c46fd25ba9e067577216bb58d6f75c6329ac7bcbb2fc8"
}

View File

@@ -1,26 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT format('%I.%I', n.nspname, c.relname) AS qualified_name,\n split_part(\n split_part(pg_get_expr(c.relpartbound, c.oid), ' TO (''', 2),\n ''')', 1\n )::timestamptz AS upper_bound\n FROM pg_partition_tree('activity') pt\n JOIN pg_class c ON c.oid = pt.relid\n JOIN pg_namespace n ON n.oid = c.relnamespace\n WHERE pt.isleaf\n AND c.relname ~ '^activity_p_\\d{8}$'\n AND split_part(\n split_part(pg_get_expr(c.relpartbound, c.oid), ' TO (''', 2),\n ''')', 1\n )::timestamptz <= NOW() - INTERVAL '2 days'\n ORDER BY upper_bound\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "qualified_name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "upper_bound",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": []
},
"nullable": [
null,
null
]
},
"hash": "fe740e5984676e9bdbdd36e9f090b00b952a31f89ae649046f3d97a9fa4913bf"
}

View File

@@ -1,137 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n st.id AS \"id!: Uuid\",\n st.organization_id AS \"organization_id!: Uuid\",\n st.project_id AS \"project_id!: Uuid\",\n st.creator_user_id AS \"creator_user_id?: Uuid\",\n st.assignee_user_id AS \"assignee_user_id?: Uuid\",\n st.deleted_by_user_id AS \"deleted_by_user_id?: Uuid\",\n st.title AS \"title!\",\n st.description AS \"description?\",\n st.status AS \"status!: TaskStatus\",\n st.version AS \"version!\",\n st.deleted_at AS \"deleted_at?\",\n st.shared_at AS \"shared_at?\",\n st.created_at AS \"created_at!\",\n st.updated_at AS \"updated_at!\",\n u.id AS \"user_id?: Uuid\",\n u.first_name AS \"user_first_name?\",\n u.last_name AS \"user_last_name?\",\n u.username AS \"user_username?\"\n FROM shared_tasks st\n LEFT JOIN users u ON st.assignee_user_id = u.id\n WHERE st.project_id = $1\n AND st.deleted_at IS NULL\n ORDER BY st.updated_at DESC\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id!: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "organization_id!: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 2,
"name": "project_id!: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "creator_user_id?: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 4,
"name": "assignee_user_id?: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 5,
"name": "deleted_by_user_id?: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 6,
"name": "title!",
"type_info": "Text"
},
{
"ordinal": 7,
"name": "description?",
"type_info": "Text"
},
{
"ordinal": 8,
"name": "status!: TaskStatus",
"type_info": {
"Custom": {
"name": "task_status",
"kind": {
"Enum": [
"todo",
"in-progress",
"in-review",
"done",
"cancelled"
]
}
}
}
},
{
"ordinal": 9,
"name": "version!",
"type_info": "Int8"
},
{
"ordinal": 10,
"name": "deleted_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 11,
"name": "shared_at?",
"type_info": "Timestamptz"
},
{
"ordinal": 12,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 13,
"name": "updated_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 14,
"name": "user_id?: Uuid",
"type_info": "Uuid"
},
{
"ordinal": 15,
"name": "user_first_name?",
"type_info": "Text"
},
{
"ordinal": 16,
"name": "user_last_name?",
"type_info": "Text"
},
{
"ordinal": 17,
"name": "user_username?",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false,
true,
true,
true,
false,
true,
false,
false,
true,
true,
false,
false,
false,
true,
true,
true
]
},
"hash": "ff9b35a31210dbddd237f4234bec1411b5aa1b0be986fbe5a8ee21e6771222f2"
}

View File

@@ -12,7 +12,7 @@ aes-gcm = "0.10"
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
async-trait = "0.1"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "stream"] }
secrecy = "0.10.3"
sentry = { version = "0.41.0", features = ["anyhow", "backtrace", "panic", "debug-images"] }
sentry-tracing = { version = "0.41.0", features = ["backtrace"] }
@@ -26,6 +26,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-error = "0.2"
thiserror = { workspace = true }
ts-rs = { workspace = true }
utils = { path = "../utils" }
uuid = { version = "1", features = ["serde", "v4"] }
jsonwebtoken = "9"

View File

@@ -1,6 +1,7 @@
services:
remote-db:
image: postgres:16-alpine
command: ["postgres", "-c", "wal_level=logical"]
environment:
POSTGRES_DB: remote
POSTGRES_USER: remote
@@ -16,6 +17,23 @@ services:
ports:
- "5432:5432"
electric:
image: electricsql/electric:latest
working_dir: /app
environment:
DATABASE_URL: postgresql://electric_sync:${ELECTRIC_ROLE_PASSWORD:?set in .env.remote}@remote-db:5432/remote?sslmode=disable
PG_PROXY_PORT: 65432
LOGICAL_PUBLISHER_HOST: electric
AUTH_MODE: insecure
ELECTRIC_INSECURE: true
ELECTRIC_MANUAL_TABLE_PUBLISHING: true
ELECTRIC_USAGE_REPORTING: false
volumes:
- electric-data:/app/persistent
depends_on:
remote-db:
condition: service_healthy
remote-server:
build:
context: ../..
@@ -23,10 +41,12 @@ services:
depends_on:
remote-db:
condition: service_healthy
electric:
condition: service_started
environment:
SERVER_DATABASE_URL: postgres://remote:remote@remote-db:5432/remote
SERVER_LISTEN_ADDR: 0.0.0.0:8081
SERVER_ACTIVITY_CHANNEL: activity
ELECTRIC_URL: http://electric:3000
GITHUB_OAUTH_CLIENT_ID: ${GITHUB_OAUTH_CLIENT_ID:?set in .env.remote}
GITHUB_OAUTH_CLIENT_SECRET: ${GITHUB_OAUTH_CLIENT_SECRET:?set in .env.remote}
GOOGLE_OAUTH_CLIENT_ID: ${GOOGLE_OAUTH_CLIENT_ID:?set in .env.remote}
@@ -36,9 +56,11 @@ services:
SERVER_PUBLIC_BASE_URL: http://localhost:3000
VITE_APP_BASE_URL: http://localhost:3000
VITE_API_BASE_URL: http://localhost:3000
ELECTRIC_ROLE_PASSWORD: ${ELECTRIC_ROLE_PASSWORD:?set in .env.remote}
ports:
- "127.0.0.1:3000:8081"
restart: unless-stopped
volumes:
remote-db-data:
electric-data:

View File

@@ -0,0 +1,21 @@
CREATE ROLE electric_sync WITH LOGIN REPLICATION;
GRANT CONNECT ON DATABASE remote TO electric_sync;
GRANT USAGE ON SCHEMA public TO electric_sync;
CREATE PUBLICATION electric_publication_default;
CREATE OR REPLACE FUNCTION electric_sync_table(p_schema text, p_table text)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
qualified text := format('%I.%I', p_schema, p_table);
BEGIN
EXECUTE format('ALTER TABLE %s REPLICA IDENTITY FULL', qualified);
EXECUTE format('GRANT SELECT ON TABLE %s TO electric_sync', qualified);
EXECUTE format('ALTER PUBLICATION %I ADD TABLE %s', 'electric_publication_default', qualified);
END;
$$;
SELECT electric_sync_table('public', 'shared_tasks');

View File

@@ -0,0 +1,9 @@
-- Drop activity feed tables and functions
DROP TABLE IF EXISTS activity CASCADE;
DROP TABLE IF EXISTS project_activity_counters;
DROP FUNCTION IF EXISTS ensure_activity_partition;
DROP FUNCTION IF EXISTS activity_notify;
-- Drop unused columns from shared_tasks
ALTER TABLE shared_tasks DROP COLUMN IF EXISTS version;
ALTER TABLE shared_tasks DROP COLUMN IF EXISTS last_event_seq;

View File

@@ -0,0 +1,2 @@
ALTER TYPE task_status RENAME VALUE 'in-progress' TO 'inprogress';
ALTER TYPE task_status RENAME VALUE 'in-review' TO 'inreview';

View File

@@ -1,106 +0,0 @@
use std::{
hash::{Hash, Hasher},
pin::Pin,
sync::Arc,
};
use chrono::{DateTime, Utc};
use futures::{Stream, StreamExt, future};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
#[derive(Debug, Serialize, Deserialize)]
pub struct ActivityResponse {
pub data: Vec<ActivityEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActivityEvent {
pub seq: i64,
pub event_id: uuid::Uuid,
pub project_id: uuid::Uuid,
pub event_type: String,
pub created_at: DateTime<Utc>,
pub payload: Option<serde_json::Value>,
}
impl ActivityEvent {
pub fn new(
seq: i64,
event_id: uuid::Uuid,
project_id: uuid::Uuid,
event_type: String,
created_at: DateTime<Utc>,
payload: Option<serde_json::Value>,
) -> Self {
Self {
seq,
event_id,
project_id,
event_type,
created_at,
payload,
}
}
}
#[derive(Clone)]
pub struct ActivityBroker {
shards: Arc<Vec<broadcast::Sender<ActivityEvent>>>,
}
pub type ActivityStream =
Pin<Box<dyn Stream<Item = Result<ActivityEvent, BroadcastStreamRecvError>> + Send + 'static>>;
impl ActivityBroker {
/// Shard broadcast senders to keep busy organisations from evicting everyone else's events.
pub fn new(shard_count: usize, shard_capacity: usize) -> Self {
let shard_count = shard_count.max(1);
let shard_capacity = shard_capacity.max(1);
let shards = (0..shard_count)
.map(|_| {
let (sender, _receiver) = broadcast::channel(shard_capacity);
sender
})
.collect();
Self {
shards: Arc::new(shards),
}
}
pub fn subscribe(&self, project_id: uuid::Uuid) -> ActivityStream {
let index = self.shard_index(&project_id);
let receiver = self.shards[index].subscribe();
let stream = BroadcastStream::new(receiver).filter_map(move |item| {
future::ready(match item {
Ok(event) if event.project_id == project_id => Some(Ok(event)),
Ok(_) => None,
Err(err) => Some(Err(err)),
})
});
Box::pin(stream)
}
pub fn publish(&self, event: ActivityEvent) {
let index = self.shard_index(&event.project_id);
if let Err(error) = self.shards[index].send(event) {
tracing::debug!(?error, "no subscribers for activity event");
}
}
fn shard_index(&self, project_id: &uuid::Uuid) -> usize {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
project_id.hash(&mut hasher);
(hasher.finish() as usize) % self.shards.len()
}
}
impl Default for ActivityBroker {
fn default() -> Self {
Self::new(16, 512)
}
}

View File

@@ -1,3 +0,0 @@
mod broker;
pub use broker::{ActivityBroker, ActivityEvent, ActivityResponse, ActivityStream};

View File

@@ -1,11 +1,11 @@
use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, bail};
use secrecy::ExposeSecret;
use tracing::instrument;
use crate::{
AppState,
activity::ActivityBroker,
auth::{
GitHubOAuthProvider, GoogleOAuthProvider, JwtService, OAuthHandoffService,
OAuthTokenValidator, ProviderRegistry,
@@ -22,7 +22,7 @@ impl Server {
#[instrument(
name = "remote_server",
skip(config),
fields(listen_addr = %config.listen_addr, activity_channel = %config.activity_channel)
fields(listen_addr = %config.listen_addr)
)]
pub async fn run(config: RemoteServerConfig) -> anyhow::Result<()> {
let pool = db::create_pool(&config.database_url)
@@ -33,12 +33,12 @@ impl Server {
.await
.context("failed to run database migrations")?;
db::maintenance::spawn_activity_partition_maintenance(pool.clone());
if let Some(password) = config.electric_role_password.as_ref() {
db::ensure_electric_role_password(&pool, password.expose_secret())
.await
.context("failed to set electric role password")?;
}
let broker = ActivityBroker::new(
config.activity_broadcast_shards,
config.activity_broadcast_capacity,
);
let auth_config = config.auth.clone();
let jwt = Arc::new(JwtService::new(auth_config.jwt_secret().clone()));
@@ -84,21 +84,18 @@ impl Server {
)
})?;
let http_client = reqwest::Client::new();
let state = AppState::new(
pool.clone(),
broker.clone(),
config.clone(),
jwt,
handoff_service,
oauth_token_validator,
mailer,
server_public_base_url,
http_client,
);
let listener =
db::ActivityListener::new(pool.clone(), broker, config.activity_channel.clone());
tokio::spawn(listener.run());
let router = routes::router(state);
let addr: SocketAddr = config
.listen_addr

View File

@@ -4,26 +4,15 @@ use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
use secrecy::SecretString;
use thiserror::Error;
// Default activity items returned in a single query
const DEFAULT_ACTIVITY_DEFAULT_LIMIT: i64 = 200;
// Max activity items that can be requested in a single query
const DEFAULT_ACTIVITY_MAX_LIMIT: i64 = 500;
const DEFAULT_ACTIVITY_BROADCAST_SHARDS: usize = 16;
const DEFAULT_ACTIVITY_BROADCAST_CAPACITY: usize = 512;
const DEFAULT_ACTIVITY_CATCHUP_BATCH_SIZE: i64 = 100;
#[derive(Debug, Clone)]
pub struct RemoteServerConfig {
pub database_url: String,
pub listen_addr: String,
pub server_public_base_url: Option<String>,
pub activity_channel: String,
pub activity_default_limit: i64,
pub activity_max_limit: i64,
pub activity_broadcast_shards: usize,
pub activity_broadcast_capacity: usize,
pub activity_catchup_batch_size: i64,
pub auth: AuthConfig,
pub electric_url: String,
pub electric_secret: Option<SecretString>,
pub electric_role_password: Option<SecretString>,
}
#[derive(Debug, Error)]
@@ -47,59 +36,31 @@ impl RemoteServerConfig {
let server_public_base_url = env::var("SERVER_PUBLIC_BASE_URL").ok();
let activity_channel =
env::var("SERVER_ACTIVITY_CHANNEL").unwrap_or_else(|_| "activity".to_string());
let activity_default_limit = DEFAULT_ACTIVITY_DEFAULT_LIMIT;
let activity_max_limit = DEFAULT_ACTIVITY_MAX_LIMIT;
let activity_broadcast_shards = get_numeric_env_var(
"SERVER_ACTIVITY_BROADCAST_SHARDS",
DEFAULT_ACTIVITY_BROADCAST_SHARDS,
)?
.max(1);
let activity_broadcast_capacity = get_numeric_env_var(
"SERVER_ACTIVITY_BROADCAST_CAPACITY",
DEFAULT_ACTIVITY_BROADCAST_CAPACITY,
)?
.max(1);
let activity_catchup_batch_size = get_numeric_env_var(
"SERVER_ACTIVITY_CATCHUP_BATCH_SIZE",
DEFAULT_ACTIVITY_CATCHUP_BATCH_SIZE,
)?
.max(1);
let auth = AuthConfig::from_env()?;
let electric_url =
env::var("ELECTRIC_URL").map_err(|_| ConfigError::MissingVar("ELECTRIC_URL"))?;
let electric_secret = env::var("ELECTRIC_SECRET")
.map(|s| SecretString::new(s.into()))
.ok();
let electric_role_password = env::var("ELECTRIC_ROLE_PASSWORD")
.ok()
.map(|s| SecretString::new(s.into()));
Ok(Self {
database_url,
listen_addr,
server_public_base_url,
activity_channel,
activity_default_limit,
activity_max_limit,
activity_broadcast_shards,
activity_broadcast_capacity,
activity_catchup_batch_size,
auth,
electric_url,
electric_secret,
electric_role_password,
})
}
}
fn get_numeric_env_var<T: std::str::FromStr>(
var_name: &'static str,
default: T,
) -> Result<T, ConfigError> {
match env::var(var_name) {
Ok(value) => value
.parse::<T>()
.map_err(|_| ConfigError::InvalidVar(var_name)),
Err(_) => Ok(default),
}
}
#[derive(Debug, Clone)]
pub struct OAuthProviderConfig {
client_id: String,

View File

@@ -1,95 +0,0 @@
use chrono::{DateTime, Utc};
use sqlx::PgPool;
use uuid::Uuid;
use crate::activity::ActivityEvent;
pub struct ActivityRepository<'a> {
pool: &'a PgPool,
}
impl<'a> ActivityRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn fetch_since(
&self,
project_id: Uuid,
after_seq: Option<i64>,
limit: i64,
) -> Result<Vec<ActivityEvent>, sqlx::Error> {
let rows = sqlx::query_as::<_, ActivityRow>(
r#"
SELECT seq,
event_id,
project_id,
event_type,
created_at,
payload
FROM activity
WHERE project_id = $1
AND ($2::bigint IS NULL OR seq > $2)
ORDER BY seq ASC
LIMIT $3
"#,
)
.bind(project_id)
.bind(after_seq)
.bind(limit)
.fetch_all(self.pool)
.await?;
Ok(rows.into_iter().map(ActivityRow::into_event).collect())
}
pub async fn fetch_by_seq(
&self,
project_id: Uuid,
seq: i64,
) -> Result<Option<ActivityEvent>, sqlx::Error> {
let row = sqlx::query_as::<_, ActivityRow>(
r#"
SELECT seq,
event_id,
project_id,
event_type,
created_at,
payload
FROM activity
WHERE project_id = $1
AND seq = $2
LIMIT 1
"#,
)
.bind(project_id)
.bind(seq)
.fetch_optional(self.pool)
.await?;
Ok(row.map(ActivityRow::into_event))
}
}
#[derive(sqlx::FromRow)]
struct ActivityRow {
seq: i64,
event_id: Uuid,
project_id: Uuid,
event_type: String,
created_at: DateTime<Utc>,
payload: serde_json::Value,
}
impl ActivityRow {
fn into_event(self) -> ActivityEvent {
ActivityEvent::new(
self.seq,
self.event_id,
self.project_id,
self.event_type,
self.created_at,
Some(self.payload),
)
}
}

View File

@@ -1,108 +0,0 @@
use std::time::Duration;
use anyhow::Context;
use serde::Deserialize;
use sqlx::{PgPool, postgres::PgListener};
use tokio::time::sleep;
use tracing::instrument;
use uuid::Uuid;
use crate::{activity::ActivityBroker, db::activity::ActivityRepository};
pub struct ActivityListener {
pool: PgPool,
broker: ActivityBroker,
channel: String,
}
impl ActivityListener {
pub fn new(pool: PgPool, broker: ActivityBroker, channel: String) -> Self {
Self {
pool,
broker,
channel,
}
}
#[instrument(
name = "activity.listener",
skip(self),
fields(channel = %self.channel)
)]
pub async fn run(self) {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(30);
let pool = self.pool;
let broker = self.broker;
let channel = self.channel;
loop {
match listen_loop(&pool, &broker, &channel).await {
Ok(_) => {
backoff = Duration::from_secs(1);
}
Err(error) => {
tracing::error!(?error, ?backoff, "activity listener error; retrying");
sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
}
}
}
}
#[instrument(
name = "activity.listen_loop",
skip(pool, broker),
fields(channel = %channel)
)]
async fn listen_loop(pool: &PgPool, broker: &ActivityBroker, channel: &str) -> anyhow::Result<()> {
let mut listener = PgListener::connect_with(pool)
.await
.context("failed to create LISTEN connection")?;
listener
.listen(channel)
.await
.with_context(|| format!("failed to LISTEN on channel {channel}"))?;
loop {
let notification = listener
.recv()
.await
.context("failed to receive LISTEN notification")?;
let payload: NotificationEnvelope = serde_json::from_str(notification.payload())
.with_context(|| format!("invalid notification payload: {}", notification.payload()))?;
tracing::trace!(%payload.seq, project_id = %payload.project_id, "received activity notification");
let project_uuid = payload
.project_id
.parse::<Uuid>()
.with_context(|| format!("invalid project_id UUID: {}", payload.project_id))?;
let event = match ActivityRepository::new(pool)
.fetch_by_seq(project_uuid, payload.seq)
.await
{
Ok(Some(event)) => event,
Ok(None) => {
tracing::warn!(seq = payload.seq, project_id = %payload.project_id, "activity row missing for notification");
continue;
}
Err(error) => {
tracing::error!(?error, seq = payload.seq, project_id = %payload.project_id, "failed to fetch activity payload");
continue;
}
};
broker.publish(event);
}
}
#[derive(Debug, Deserialize)]
struct NotificationEnvelope {
seq: i64,
project_id: String,
}

View File

@@ -1,159 +0,0 @@
use std::{sync::OnceLock, time::Duration};
use chrono::{Duration as ChronoDuration, NaiveTime, TimeZone, Utc};
use sqlx::{PgPool, error::DatabaseError};
use tokio::time::sleep;
use tracing::{error, info, warn};
const PRUNE_LOCK_KEY: &str = "vibe_kanban_activity_retention_v1";
static PROVISION_TIME: OnceLock<NaiveTime> = OnceLock::new();
static PRUNE_TIME: OnceLock<NaiveTime> = OnceLock::new();
fn provision_time() -> NaiveTime {
*PROVISION_TIME.get_or_init(|| NaiveTime::from_hms_opt(0, 10, 0).expect("valid time"))
}
fn prune_time() -> NaiveTime {
*PRUNE_TIME.get_or_init(|| NaiveTime::from_hms_opt(1, 30, 0).expect("valid time"))
}
pub fn spawn_activity_partition_maintenance(pool: PgPool) {
let creation_pool = pool.clone();
tokio::spawn(async move {
if let Err(err) = ensure_future_partitions_with_pool(&creation_pool).await {
error!(error = ?err, "initial activity partition provisioning failed");
}
loop {
sleep(duration_until(provision_time())).await;
if let Err(err) = ensure_future_partitions_with_pool(&creation_pool).await {
error!(error = ?err, "scheduled partition provisioning failed");
}
}
});
tokio::spawn(async move {
if let Err(err) = prune_old_partitions(&pool).await {
error!(error = ?err, "initial activity partition pruning failed");
}
loop {
sleep(duration_until(prune_time())).await;
if let Err(err) = prune_old_partitions(&pool).await {
error!(error = ?err, "scheduled partition pruning failed");
}
}
});
}
fn duration_until(target_time: NaiveTime) -> Duration {
let now = Utc::now();
let today = now.date_naive();
let mut next = today.and_time(target_time);
if now.time() >= target_time {
next = (today + ChronoDuration::days(1)).and_time(target_time);
}
let next_dt = Utc.from_utc_datetime(&next);
(next_dt - now)
.to_std()
.unwrap_or_else(|_| Duration::from_secs(0))
}
async fn prune_old_partitions(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut conn = pool.acquire().await?;
let lock_acquired = sqlx::query_scalar!(
r#"
SELECT pg_try_advisory_lock(hashtextextended($1, 0))
"#,
PRUNE_LOCK_KEY
)
.fetch_one(&mut *conn)
.await?
.unwrap_or(false);
if !lock_acquired {
warn!("skipping partition pruning because another worker holds the lock");
return Ok(());
}
let result = async {
let partitions = sqlx::query!(
r#"
SELECT format('%I.%I', n.nspname, c.relname) AS qualified_name,
split_part(
split_part(pg_get_expr(c.relpartbound, c.oid), ' TO (''', 2),
''')', 1
)::timestamptz AS upper_bound
FROM pg_partition_tree('activity') pt
JOIN pg_class c ON c.oid = pt.relid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE pt.isleaf
AND c.relname ~ '^activity_p_\d{8}$'
AND split_part(
split_part(pg_get_expr(c.relpartbound, c.oid), ' TO (''', 2),
''')', 1
)::timestamptz <= NOW() - INTERVAL '2 days'
ORDER BY upper_bound
"#
)
.fetch_all(&mut *conn)
.await?;
for partition in partitions {
if let Some(name) = partition.qualified_name {
let detach = format!("ALTER TABLE activity DETACH PARTITION {name} CONCURRENTLY");
sqlx::query(&detach).execute(&mut *conn).await?;
let drop = format!("DROP TABLE {name}");
sqlx::query(&drop).execute(&mut *conn).await?;
info!(partition = %name, "dropped activity partition");
}
}
Ok(())
}
.await;
let _ = sqlx::query_scalar!(
r#"
SELECT pg_advisory_unlock(hashtextextended($1, 0))
"#,
PRUNE_LOCK_KEY
)
.fetch_one(&mut *conn)
.await;
result
}
pub async fn ensure_future_partitions_with_pool(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut conn = pool.acquire().await?;
ensure_future_partitions(&mut conn).await
}
pub async fn ensure_future_partitions(
executor: &mut sqlx::PgConnection,
) -> Result<(), sqlx::Error> {
sqlx::query("SELECT ensure_activity_partition(NOW())")
.execute(&mut *executor)
.await?;
sqlx::query("SELECT ensure_activity_partition(NOW() + INTERVAL '24 hours')")
.execute(&mut *executor)
.await?;
sqlx::query("SELECT ensure_activity_partition(NOW() + INTERVAL '48 hours')")
.execute(&mut *executor)
.await?;
Ok(())
}
pub fn is_partition_missing_error(err: &(dyn DatabaseError + Send + Sync + 'static)) -> bool {
err.code()
.as_deref()
.is_some_and(|code| code.starts_with("23"))
&& err.message().contains("no partition of relation")
}

View File

@@ -1,9 +1,6 @@
pub mod activity;
pub mod auth;
pub mod identity_errors;
pub mod invitations;
pub mod listener;
pub mod maintenance;
pub mod oauth;
pub mod oauth_accounts;
pub mod organization_members;
@@ -12,7 +9,6 @@ pub mod projects;
pub mod tasks;
pub mod users;
pub use listener::ActivityListener;
use sqlx::{PgPool, Postgres, Transaction, migrate::MigrateError, postgres::PgPoolOptions};
pub(crate) type Tx<'a> = Transaction<'a, Postgres>;
@@ -27,3 +23,21 @@ pub(crate) async fn create_pool(database_url: &str) -> Result<PgPool, sqlx::Erro
.connect(database_url)
.await
}
pub(crate) async fn ensure_electric_role_password(
pool: &PgPool,
password: &str,
) -> Result<(), sqlx::Error> {
if password.is_empty() {
return Ok(());
}
// PostgreSQL doesn't support parameter binding for ALTER ROLE PASSWORD
// We need to escape the password properly and embed it directly in the SQL
let escaped_password = password.replace("'", "''");
let sql = format!("ALTER ROLE electric_sync WITH PASSWORD '{escaped_password}'");
sqlx::query(&sql).execute(pool).await?;
Ok(())
}

View File

@@ -2,27 +2,21 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use thiserror::Error;
use ts_rs::TS;
use uuid::Uuid;
use super::{
Tx,
identity_errors::IdentityError,
projects::{ProjectError, ProjectRepository},
users::{UserData, fetch_user},
};
use crate::db::maintenance;
pub struct BulkFetchResult {
pub tasks: Vec<SharedTaskActivityPayload>,
pub deleted_task_ids: Vec<Uuid>,
pub latest_seq: Option<i64>,
}
pub const MAX_SHARED_TASK_TEXT_BYTES: usize = 50 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[serde(rename_all = "kebab-case")]
#[sqlx(type_name = "task_status", rename_all = "kebab-case")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, TS)]
#[serde(rename_all = "lowercase")]
#[sqlx(type_name = "task_status", rename_all = "lowercase")]
#[ts(export)]
pub enum TaskStatus {
Todo,
InProgress,
@@ -43,7 +37,8 @@ impl SharedTaskWithUser {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow, TS)]
#[ts(export)]
pub struct SharedTask {
pub id: Uuid,
pub organization_id: Uuid,
@@ -54,19 +49,12 @@ pub struct SharedTask {
pub title: String,
pub description: Option<String>,
pub status: TaskStatus,
pub version: i64,
pub deleted_at: Option<DateTime<Utc>>,
pub shared_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SharedTaskActivityPayload {
pub task: SharedTask,
pub user: Option<UserData>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct CreateSharedTaskData {
pub project_id: Uuid,
@@ -81,7 +69,6 @@ pub struct UpdateSharedTaskData {
pub title: Option<String>,
pub description: Option<String>,
pub status: Option<TaskStatus>,
pub version: Option<i64>,
pub acting_user_id: Uuid,
}
@@ -89,13 +76,11 @@ pub struct UpdateSharedTaskData {
pub struct AssignTaskData {
pub new_assignee_user_id: Option<Uuid>,
pub previous_assignee_user_id: Option<Uuid>,
pub version: Option<i64>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DeleteTaskData {
pub acting_user_id: Uuid,
pub version: Option<i64>,
}
#[derive(Debug, Error)]
@@ -141,7 +126,6 @@ impl<'a> SharedTaskRepository<'a> {
title AS "title!",
description AS "description?",
status AS "status!: TaskStatus",
version AS "version!",
deleted_at AS "deleted_at?",
shared_at AS "shared_at?",
created_at AS "created_at!",
@@ -205,7 +189,6 @@ impl<'a> SharedTaskRepository<'a> {
title AS "title!",
description AS "description?",
status AS "status!: TaskStatus",
version AS "version!",
deleted_at AS "deleted_at?",
shared_at AS "shared_at?",
created_at AS "created_at!",
@@ -226,114 +209,10 @@ impl<'a> SharedTaskRepository<'a> {
None => None,
};
insert_activity(&mut tx, &task, user.as_ref(), "task.created").await?;
tx.commit().await.map_err(SharedTaskError::from)?;
Ok(SharedTaskWithUser::new(task, user))
}
pub async fn bulk_fetch(&self, project_id: Uuid) -> Result<BulkFetchResult, SharedTaskError> {
let mut tx = self.pool.begin().await?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(&mut *tx)
.await?;
let rows = sqlx::query!(
r#"
SELECT
st.id AS "id!: Uuid",
st.organization_id AS "organization_id!: Uuid",
st.project_id AS "project_id!: Uuid",
st.creator_user_id AS "creator_user_id?: Uuid",
st.assignee_user_id AS "assignee_user_id?: Uuid",
st.deleted_by_user_id AS "deleted_by_user_id?: Uuid",
st.title AS "title!",
st.description AS "description?",
st.status AS "status!: TaskStatus",
st.version AS "version!",
st.deleted_at AS "deleted_at?",
st.shared_at AS "shared_at?",
st.created_at AS "created_at!",
st.updated_at AS "updated_at!",
u.id AS "user_id?: Uuid",
u.first_name AS "user_first_name?",
u.last_name AS "user_last_name?",
u.username AS "user_username?"
FROM shared_tasks st
LEFT JOIN users u ON st.assignee_user_id = u.id
WHERE st.project_id = $1
AND st.deleted_at IS NULL
ORDER BY st.updated_at DESC
"#,
project_id
)
.fetch_all(&mut *tx)
.await?;
let tasks = rows
.into_iter()
.map(|row| {
let task = SharedTask {
id: row.id,
organization_id: row.organization_id,
project_id: row.project_id,
creator_user_id: row.creator_user_id,
assignee_user_id: row.assignee_user_id,
deleted_by_user_id: row.deleted_by_user_id,
title: row.title,
description: row.description,
status: row.status,
version: row.version,
deleted_at: row.deleted_at,
shared_at: row.shared_at,
created_at: row.created_at,
updated_at: row.updated_at,
};
let user = row.user_id.map(|id| UserData {
id,
first_name: row.user_first_name,
last_name: row.user_last_name,
username: row.user_username,
});
SharedTaskActivityPayload { task, user }
})
.collect();
let deleted_rows = sqlx::query!(
r#"
SELECT st.id AS "id!: Uuid"
FROM shared_tasks st
WHERE st.project_id = $1
AND st.deleted_at IS NOT NULL
"#,
project_id
)
.fetch_all(&mut *tx)
.await?;
let deleted_task_ids = deleted_rows.into_iter().map(|row| row.id).collect();
let latest_seq = sqlx::query_scalar!(
r#"
SELECT MAX(seq)
FROM activity
WHERE project_id = $1
"#,
project_id
)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(BulkFetchResult {
tasks,
deleted_task_ids,
latest_seq,
})
}
pub async fn update(
&self,
task_id: Uuid,
@@ -348,11 +227,9 @@ impl<'a> SharedTaskRepository<'a> {
SET title = COALESCE($2, t.title),
description = COALESCE($3, t.description),
status = COALESCE($4, t.status),
version = t.version + 1,
updated_at = NOW()
WHERE t.id = $1
AND t.version = COALESCE($5, t.version)
AND t.assignee_user_id = $6
AND t.assignee_user_id = $5
AND t.deleted_at IS NULL
RETURNING
t.id AS "id!",
@@ -364,7 +241,6 @@ impl<'a> SharedTaskRepository<'a> {
t.title AS "title!",
t.description AS "description?",
t.status AS "status!: TaskStatus",
t.version AS "version!",
t.deleted_at AS "deleted_at?",
t.shared_at AS "shared_at?",
t.created_at AS "created_at!",
@@ -374,12 +250,11 @@ impl<'a> SharedTaskRepository<'a> {
data.title,
data.description,
data.status as Option<TaskStatus>,
data.version,
data.acting_user_id
)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| SharedTaskError::Conflict("task version mismatch".to_string()))?;
.ok_or_else(|| SharedTaskError::NotFound)?;
ensure_text_size(&task.title, task.description.as_deref())?;
@@ -388,7 +263,6 @@ impl<'a> SharedTaskRepository<'a> {
None => None,
};
insert_activity(&mut tx, &task, user.as_ref(), "task.updated").await?;
tx.commit().await.map_err(SharedTaskError::from)?;
Ok(SharedTaskWithUser::new(task, user))
}
@@ -404,10 +278,8 @@ impl<'a> SharedTaskRepository<'a> {
SharedTask,
r#"
UPDATE shared_tasks AS t
SET assignee_user_id = $2,
version = t.version + 1
SET assignee_user_id = $2
WHERE t.id = $1
AND t.version = COALESCE($4, t.version)
AND ($3::uuid IS NULL OR t.assignee_user_id = $3::uuid)
AND t.deleted_at IS NULL
RETURNING
@@ -420,7 +292,6 @@ impl<'a> SharedTaskRepository<'a> {
t.title AS "title!",
t.description AS "description?",
t.status AS "status!: TaskStatus",
t.version AS "version!",
t.deleted_at AS "deleted_at?",
t.shared_at AS "shared_at?",
t.created_at AS "created_at!",
@@ -428,21 +299,17 @@ impl<'a> SharedTaskRepository<'a> {
"#,
task_id,
data.new_assignee_user_id,
data.previous_assignee_user_id,
data.version
data.previous_assignee_user_id
)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| {
SharedTaskError::Conflict("task version or previous assignee mismatch".to_string())
})?;
.ok_or_else(|| SharedTaskError::Conflict("previous assignee mismatch".to_string()))?;
let user = match data.new_assignee_user_id {
Some(user_id) => fetch_user(&mut tx, user_id).await?,
None => None,
};
insert_activity(&mut tx, &task, user.as_ref(), "task.reassigned").await?;
tx.commit().await.map_err(SharedTaskError::from)?;
Ok(SharedTaskWithUser::new(task, user))
}
@@ -459,11 +326,9 @@ impl<'a> SharedTaskRepository<'a> {
r#"
UPDATE shared_tasks AS t
SET deleted_at = NOW(),
deleted_by_user_id = $3,
version = t.version + 1
deleted_by_user_id = $2
WHERE t.id = $1
AND t.version = COALESCE($2, t.version)
AND t.assignee_user_id = $3
AND t.assignee_user_id = $2
AND t.deleted_at IS NULL
RETURNING
t.id AS "id!",
@@ -475,26 +340,44 @@ impl<'a> SharedTaskRepository<'a> {
t.title AS "title!",
t.description AS "description?",
t.status AS "status!: TaskStatus",
t.version AS "version!",
t.deleted_at AS "deleted_at?",
t.shared_at AS "shared_at?",
t.created_at AS "created_at!",
t.updated_at AS "updated_at!"
"#,
task_id,
data.version,
data.acting_user_id
)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| {
SharedTaskError::Conflict("task version mismatch or user not authorized".to_string())
})?;
.ok_or_else(|| SharedTaskError::Conflict("user not authorized".to_string()))?;
insert_activity(&mut tx, &task, None, "task.deleted").await?;
tx.commit().await.map_err(SharedTaskError::from)?;
Ok(SharedTaskWithUser::new(task, None))
}
pub async fn check_existence(
&self,
task_ids: &[Uuid],
user_id: Uuid,
) -> Result<Vec<Uuid>, SharedTaskError> {
let tasks = sqlx::query!(
r#"
SELECT t.id
FROM shared_tasks t
INNER JOIN organization_member_metadata om ON t.organization_id = om.organization_id
WHERE t.id = ANY($1)
AND t.deleted_at IS NULL
AND om.user_id = $2
"#,
task_ids,
user_id
)
.fetch_all(self.pool)
.await?;
Ok(tasks.into_iter().map(|r| r.id).collect())
}
}
pub(crate) fn ensure_text_size(
@@ -510,81 +393,6 @@ pub(crate) fn ensure_text_size(
Ok(())
}
async fn insert_activity(
tx: &mut Tx<'_>,
task: &SharedTask,
user: Option<&UserData>,
event_type: &str,
) -> Result<(), SharedTaskError> {
let payload = SharedTaskActivityPayload {
task: task.clone(),
user: user.cloned(),
};
let payload = serde_json::to_value(payload).map_err(SharedTaskError::Serialization)?;
// First attempt at inserting - if partitions are missing we retry after provisioning.
match do_insert_activity(tx, task, event_type, payload.clone()).await {
Ok(_) => Ok(()),
Err(err) => {
if let sqlx::Error::Database(db_err) = &err
&& maintenance::is_partition_missing_error(db_err.as_ref())
{
let code_owned = db_err.code().map(|c| c.to_string());
let code = code_owned.as_deref().unwrap_or_default();
tracing::warn!(
"Activity partition missing ({}), creating current and next partitions",
code
);
maintenance::ensure_future_partitions(tx.as_mut())
.await
.map_err(SharedTaskError::from)?;
return do_insert_activity(tx, task, event_type, payload)
.await
.map_err(SharedTaskError::from);
}
Err(SharedTaskError::from(err))
}
}
}
async fn do_insert_activity(
tx: &mut Tx<'_>,
task: &SharedTask,
event_type: &str,
payload: serde_json::Value,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
WITH next AS (
INSERT INTO project_activity_counters AS counters (project_id, last_seq)
VALUES ($1, 1)
ON CONFLICT (project_id)
DO UPDATE SET last_seq = counters.last_seq + 1
RETURNING last_seq
)
INSERT INTO activity (
project_id,
seq,
assignee_user_id,
event_type,
payload
)
SELECT $1, next.last_seq, $2, $3, $4
FROM next
"#,
task.project_id,
task.assignee_user_id,
event_type,
payload
)
.execute(&mut **tx)
.await
.map(|_| ())
}
impl SharedTaskRepository<'_> {
pub async fn organization_id(
pool: &PgPool,

View File

@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, query_as};
use ts_rs::TS;
use uuid::Uuid;
use super::{Tx, identity_errors::IdentityError};
@@ -16,9 +17,10 @@ pub struct User {
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow, TS)]
#[ts(export)]
pub struct UserData {
pub id: Uuid,
pub user_id: Uuid,
pub first_name: Option<String>,
pub last_name: Option<String>,
pub username: Option<String>,
@@ -91,6 +93,34 @@ impl<'a> UserRepository<'a> {
.await
.map_err(IdentityError::from)
}
/// Fetch all assignees for a given project id.
/// Returns Vec<UserData> containing all unique users assigned to tasks in the project.
pub async fn fetch_assignees_by_project(
&self,
project_id: Uuid,
) -> Result<Vec<UserData>, IdentityError> {
let rows = sqlx::query_as!(
UserData,
r#"
SELECT DISTINCT
u.id as "user_id",
u.first_name as "first_name",
u.last_name as "last_name",
u.username as "username"
FROM shared_tasks st
INNER JOIN users u ON u.id = st.assignee_user_id
WHERE st.project_id = $1
AND st.assignee_user_id IS NOT NULL
"#,
project_id
)
.fetch_all(self.pool)
.await
.map_err(IdentityError::from)?;
Ok(rows)
}
}
async fn upsert_user(pool: &PgPool, user: &UpsertUser<'_>) -> Result<User, sqlx::Error> {
@@ -141,7 +171,7 @@ pub async fn fetch_user(tx: &mut Tx<'_>, user_id: Uuid) -> Result<Option<UserDat
.map_err(IdentityError::from)
.map(|row_opt| {
row_opt.map(|row| UserData {
id: row.id,
user_id: row.id,
first_name: row.first_name,
last_name: row.last_name,
username: row.username,

View File

@@ -1,4 +1,3 @@
pub mod activity;
mod app;
mod auth;
pub mod config;
@@ -6,7 +5,7 @@ pub mod db;
pub mod mail;
pub mod routes;
mod state;
pub mod ws;
pub mod validated_where;
use std::{env, sync::OnceLock};
@@ -20,7 +19,6 @@ use tracing_subscriber::{
layer::{Layer as _, SubscriberExt},
util::SubscriberInitExt,
};
pub use ws::message::{ClientMessage, ServerMessage};
static INIT_GUARD: OnceLock<sentry::ClientInitGuard> = OnceLock::new();

View File

@@ -1,67 +0,0 @@
use axum::{
Json, Router,
extract::{Extension, Query, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
};
use serde::Deserialize;
use tracing::instrument;
use uuid::Uuid;
use super::{error::ErrorResponse, organization_members::ensure_project_access};
use crate::{
AppState, activity::ActivityResponse, auth::RequestContext, db::activity::ActivityRepository,
};
pub fn router() -> Router<AppState> {
Router::new().route("/activity", get(get_activity_stream))
}
#[derive(Debug, Deserialize)]
pub struct ActivityQuery {
/// Remote project to stream activity for
pub project_id: Uuid,
/// Fetch events after this ID (exclusive)
pub after: Option<i64>,
/// Maximum number of events to return
pub limit: Option<i64>,
}
#[instrument(
name = "activity.get_activity_stream",
skip(state, ctx, params),
fields(user_id = %ctx.user.id, project_id = %params.project_id)
)]
async fn get_activity_stream(
State(state): State<AppState>,
Extension(ctx): Extension<RequestContext>,
Query(params): Query<ActivityQuery>,
) -> Response {
let config = state.config();
let limit = params
.limit
.unwrap_or(config.activity_default_limit)
.clamp(1, config.activity_max_limit);
let after = params.after;
let project_id = params.project_id;
let _organization_id = match ensure_project_access(state.pool(), ctx.user.id, project_id).await
{
Ok(org_id) => org_id,
Err(error) => return error.into_response(),
};
let repo = ActivityRepository::new(state.pool());
match repo.fetch_since(project_id, after, limit).await {
Ok(events) => (StatusCode::OK, Json(ActivityResponse { data: events })).into_response(),
Err(error) => {
tracing::error!(?error, "failed to load activity stream");
ErrorResponse::new(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to load activity stream",
)
.into_response()
}
}
}

View File

@@ -0,0 +1,182 @@
use std::collections::HashMap;
use axum::{
Router,
body::Body,
extract::{Query, State},
http::{HeaderMap, HeaderValue, StatusCode, header},
response::{IntoResponse, Response},
routing::get,
};
use futures::TryStreamExt;
use secrecy::ExposeSecret;
use tracing::error;
use uuid::Uuid;
use crate::{
AppState, auth::RequestContext, db::organizations::OrganizationRepository, validated_where,
validated_where::ValidatedWhere,
};
pub fn router() -> Router<AppState> {
Router::new().route("/shape/shared_tasks", get(proxy_shared_tasks))
}
/// Electric protocol query parameters that are safe to forward.
/// Based on https://electric-sql.com/docs/guides/auth#proxy-auth
/// Note: "where" is NOT included because it's controlled server-side for security.
const ELECTRIC_PARAMS: &[&str] = &["offset", "handle", "live", "cursor", "columns"];
/// Returns an empty shape response for users with no organization memberships.
fn empty_shape_response() -> Response {
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);
(StatusCode::OK, headers, "[]").into_response()
}
/// Proxy Shape requests for the `shared_tasks` table.
///
/// Route: GET /v1/shape/shared_tasks?offset=-1
///
/// The `require_session` middleware has already validated the Bearer token
/// before this handler is called.
pub async fn proxy_shared_tasks(
State(state): State<AppState>,
axum::extract::Extension(ctx): axum::extract::Extension<RequestContext>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Response, ProxyError> {
// Get user's organization memberships
let org_repo = OrganizationRepository::new(state.pool());
let orgs = org_repo
.list_user_organizations(ctx.user.id)
.await
.map_err(|e| ProxyError::Authorization(format!("failed to fetch organizations: {e}")))?;
if orgs.is_empty() {
// User has no org memberships - return empty result
return Ok(empty_shape_response());
}
// Build org_id filter using compile-time validated WHERE clause
let org_uuids: Vec<Uuid> = orgs.iter().map(|o| o.id).collect();
let query = validated_where!("shared_tasks", r#""organization_id" = ANY($1)"#, &org_uuids);
let query_params = &[format!(
"{{{}}}",
org_uuids
.iter()
.map(|u| u.to_string())
.collect::<Vec<_>>()
.join(",")
)];
tracing::debug!("Proxying Electric Shape request for shared_tasks table{query:?}");
proxy_table(&state, &query, &params, query_params).await
}
/// Proxy a Shape request to Electric for a specific table.
///
/// The table and where clause are set server-side (not from client params)
/// to prevent unauthorized access to other tables or data.
async fn proxy_table(
state: &AppState,
query: &ValidatedWhere,
client_params: &HashMap<String, String>,
electric_params: &[String],
) -> Result<Response, ProxyError> {
// Build the Electric URL
let mut origin_url = url::Url::parse(&state.config.electric_url)
.map_err(|e| ProxyError::InvalidConfig(format!("invalid electric_url: {e}")))?;
origin_url.set_path("/v1/shape");
// Set table server-side (security: client can't override)
origin_url
.query_pairs_mut()
.append_pair("table", query.table);
// Set WHERE clause with parameterized values
origin_url
.query_pairs_mut()
.append_pair("where", query.where_clause);
// Pass params for $1, $2, etc. placeholders
for (i, param) in electric_params.iter().enumerate() {
origin_url
.query_pairs_mut()
.append_pair(&format!("params[{}]", i + 1), param);
}
// Forward safe client params
for (key, value) in client_params {
if ELECTRIC_PARAMS.contains(&key.as_str()) {
origin_url.query_pairs_mut().append_pair(key, value);
}
}
if let Some(secret) = &state.config.electric_secret {
origin_url
.query_pairs_mut()
.append_pair("secret", secret.expose_secret());
}
let response = state
.http_client
.get(origin_url.as_str())
.send()
.await
.map_err(ProxyError::Connection)?;
let status = response.status();
let mut headers = HeaderMap::new();
// Copy headers from Electric response, but remove problematic ones
for (key, value) in response.headers() {
// Skip headers that interfere with browser handling
if key == header::CONTENT_ENCODING || key == header::CONTENT_LENGTH {
continue;
}
headers.insert(key.clone(), value.clone());
}
// Add Vary header for proper caching with auth
headers.insert(header::VARY, HeaderValue::from_static("Authorization"));
// Stream the response body directly without buffering
let body_stream = response.bytes_stream().map_err(std::io::Error::other);
let body = Body::from_stream(body_stream);
Ok((status, headers, body).into_response())
}
#[derive(Debug)]
pub enum ProxyError {
Connection(reqwest::Error),
InvalidConfig(String),
Authorization(String),
}
impl IntoResponse for ProxyError {
fn into_response(self) -> Response {
match self {
ProxyError::Connection(err) => {
error!(?err, "failed to connect to Electric service");
(
StatusCode::BAD_GATEWAY,
"failed to connect to Electric service",
)
.into_response()
}
ProxyError::InvalidConfig(msg) => {
error!(%msg, "invalid Electric proxy configuration");
(StatusCode::INTERNAL_SERVER_ERROR, "internal server error").into_response()
}
ProxyError::Authorization(msg) => {
error!(%msg, "authorization failed for Electric proxy");
(StatusCode::FORBIDDEN, "forbidden").into_response()
}
}
}
}

View File

@@ -5,7 +5,7 @@ use axum::{
routing::get,
};
use tower_http::{
cors::CorsLayer,
cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer},
request_id::{MakeRequestUuid, PropagateRequestIdLayer, RequestId, SetRequestIdLayer},
services::{ServeDir, ServeFile},
trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
@@ -14,7 +14,7 @@ use tracing::{Level, field};
use crate::{AppState, auth::require_session};
pub mod activity;
mod electric_proxy;
mod error;
mod identity;
mod oauth;
@@ -53,13 +53,12 @@ pub fn router(state: AppState) -> Router {
let v1_protected = Router::<AppState>::new()
.merge(identity::router())
.merge(activity::router())
.merge(projects::router())
.merge(tasks::router())
.merge(organizations::router())
.merge(organization_members::protected_router())
.merge(oauth::protected_router())
.merge(crate::ws::router())
.merge(electric_proxy::router())
.layer(middleware::from_fn_with_state(
state.clone(),
require_session,
@@ -73,7 +72,13 @@ pub fn router(state: AppState) -> Router {
.nest("/v1", v1_public)
.nest("/v1", v1_protected)
.fallback_service(spa)
.layer(CorsLayer::permissive())
.layer(
CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
.allow_methods(AllowMethods::mirror_request())
.allow_headers(AllowHeaders::mirror_request())
.allow_credentials(true),
)
.layer(trace_layer)
.layer(PropagateRequestIdLayer::new(HeaderName::from_static(
"x-request-id",

View File

@@ -8,6 +8,7 @@ use axum::{
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{Span, instrument};
use ts_rs::TS;
use uuid::Uuid;
use super::{
@@ -30,60 +31,54 @@ use crate::{
pub fn router() -> Router<AppState> {
Router::new()
.route("/tasks/bulk", get(bulk_shared_tasks))
.route("/tasks", post(create_shared_task))
.route("/tasks/check", post(check_tasks_existence))
.route("/tasks/{task_id}", patch(update_shared_task))
.route("/tasks/{task_id}", delete(delete_shared_task))
.route("/tasks/{task_id}/assign", post(assign_task))
.route("/tasks/assignees", get(get_task_assignees_by_project))
}
#[derive(Debug, Deserialize)]
pub struct BulkTasksQuery {
#[derive(Debug, Deserialize, TS)]
#[ts(export)]
pub struct AssigneesQuery {
pub project_id: Uuid,
}
#[instrument(
name = "tasks.bulk_shared_tasks",
name = "tasks.get_task_assignees_by_project",
skip(state, ctx, query),
fields(user_id = %ctx.user.id, project_id = %query.project_id, org_id = tracing::field::Empty)
)]
pub async fn bulk_shared_tasks(
pub async fn get_task_assignees_by_project(
State(state): State<AppState>,
Extension(ctx): Extension<RequestContext>,
Query(query): Query<BulkTasksQuery>,
Query(query): Query<AssigneesQuery>,
) -> Response {
let pool = state.pool();
let _organization_id = match ensure_project_access(pool, ctx.user.id, query.project_id).await {
Ok(org_id) => {
Span::current().record("org_id", format_args!("{org_id}"));
org_id
let _org_id = match ensure_project_access(pool, ctx.user.id, query.project_id).await {
Ok(org) => {
Span::current().record("org_id", format_args!("{org}"));
org
}
Err(error) => return error.into_response(),
};
let repo = SharedTaskRepository::new(pool);
match repo.bulk_fetch(query.project_id).await {
Ok(snapshot) => (
StatusCode::OK,
Json(BulkSharedTasksResponse {
tasks: snapshot.tasks,
deleted_task_ids: snapshot.deleted_task_ids,
latest_seq: snapshot.latest_seq,
}),
)
.into_response(),
Err(error) => match error {
SharedTaskError::Database(err) => {
tracing::error!(?err, "failed to load shared task snapshot");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "failed to load shared tasks" })),
)
.into_response()
}
other => task_error_response(other, "failed to load shared tasks"),
},
}
let user_repo = UserRepository::new(pool);
let assignees = match user_repo.fetch_assignees_by_project(query.project_id).await {
Ok(names) => names,
Err(e) => {
tracing::error!(?e, "failed to load assignees");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "failed to load assignees"})),
)
.into_response();
}
};
(StatusCode::OK, Json(assignees)).into_response()
}
#[instrument(
@@ -185,7 +180,6 @@ pub async fn update_shared_task(
title,
description,
status,
version,
} = payload;
let next_title = title.as_deref().unwrap_or(existing.title.as_str());
@@ -199,7 +193,6 @@ pub async fn update_shared_task(
title,
description,
status,
version,
acting_user_id: ctx.user.id,
};
@@ -263,7 +256,6 @@ pub async fn assign_task(
let data = AssignTaskData {
new_assignee_user_id: payload.new_assignee_user_id,
previous_assignee_user_id: Some(ctx.user.id),
version: payload.version,
};
match repo.assign_task(task_id, data).await {
@@ -274,14 +266,13 @@ pub async fn assign_task(
#[instrument(
name = "tasks.delete_shared_task",
skip(state, ctx, payload),
skip(state, ctx),
fields(user_id = %ctx.user.id, task_id = %task_id, org_id = tracing::field::Empty)
)]
pub async fn delete_shared_task(
State(state): State<AppState>,
Extension(ctx): Extension<RequestContext>,
Path(task_id): Path<Uuid>,
payload: Option<Json<DeleteSharedTaskRequest>>,
) -> Response {
let pool = state.pool();
let _organization_id = match ensure_task_access(pool, ctx.user.id, task_id).await {
@@ -311,11 +302,8 @@ pub async fn delete_shared_task(
);
}
let version = payload.as_ref().and_then(|body| body.0.version);
let data = DeleteTaskData {
acting_user_id: ctx.user.id,
version,
};
match repo.delete_task(task_id, data).await {
@@ -324,11 +312,28 @@ pub async fn delete_shared_task(
}
}
#[instrument(
name = "tasks.check_existence",
skip(state, ctx, payload),
fields(user_id = %ctx.user.id)
)]
pub async fn check_tasks_existence(
State(state): State<AppState>,
Extension(ctx): Extension<RequestContext>,
Json(payload): Json<CheckTasksRequest>,
) -> Response {
let pool = state.pool();
let repo = SharedTaskRepository::new(pool);
match repo.check_existence(&payload.task_ids, ctx.user.id).await {
Ok(existing_ids) => (StatusCode::OK, Json(existing_ids)).into_response(),
Err(error) => task_error_response(error, "failed to check tasks existence"),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BulkSharedTasksResponse {
pub tasks: Vec<crate::db::tasks::SharedTaskActivityPayload>,
pub deleted_task_ids: Vec<Uuid>,
pub latest_seq: Option<i64>,
pub struct CheckTasksRequest {
pub task_ids: Vec<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -344,21 +349,15 @@ pub struct UpdateSharedTaskRequest {
pub title: Option<String>,
pub description: Option<String>,
pub status: Option<TaskStatus>,
pub version: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssignSharedTaskRequest {
pub new_assignee_user_id: Option<Uuid>,
pub version: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteSharedTaskRequest {
pub version: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export)]
pub struct SharedTaskResponse {
pub task: SharedTask,
pub user: Option<UserData>,

View File

@@ -3,7 +3,6 @@ use std::sync::Arc;
use sqlx::PgPool;
use crate::{
activity::ActivityBroker,
auth::{JwtService, OAuthHandoffService, OAuthTokenValidator, ProviderRegistry},
config::RemoteServerConfig,
mail::Mailer,
@@ -12,34 +11,34 @@ use crate::{
#[derive(Clone)]
pub struct AppState {
pub pool: PgPool,
pub broker: ActivityBroker,
pub config: RemoteServerConfig,
pub jwt: Arc<JwtService>,
pub mailer: Arc<dyn Mailer>,
pub server_public_base_url: String,
pub handoff: Arc<OAuthHandoffService>,
pub oauth_token_validator: Arc<OAuthTokenValidator>,
pub http_client: reqwest::Client,
handoff: Arc<OAuthHandoffService>,
oauth_token_validator: Arc<OAuthTokenValidator>,
}
impl AppState {
#[allow(clippy::too_many_arguments)]
pub fn new(
pool: PgPool,
broker: ActivityBroker,
config: RemoteServerConfig,
jwt: Arc<JwtService>,
handoff: Arc<OAuthHandoffService>,
oauth_token_validator: Arc<OAuthTokenValidator>,
mailer: Arc<dyn Mailer>,
server_public_base_url: String,
http_client: reqwest::Client,
) -> Self {
Self {
pool,
broker,
config,
jwt,
mailer,
server_public_base_url,
http_client,
handoff,
oauth_token_validator,
}
@@ -49,10 +48,6 @@ impl AppState {
&self.pool
}
pub fn broker(&self) -> &ActivityBroker {
&self.broker
}
pub fn config(&self) -> &RemoteServerConfig {
&self.config
}

View File

@@ -0,0 +1,20 @@
#[derive(Debug)]
pub struct ValidatedWhere {
pub table: &'static str,
pub where_clause: &'static str,
}
#[macro_export]
macro_rules! validated_where {
($table:literal, $where:literal $(, $arg:expr)* $(,)?) => {{
// Compile-time validation via SQLx using + concatenation
// This checks: table exists, columns exist, arg types are correct
let _ = sqlx::query!(
"SELECT 1 AS v FROM " + $table + " WHERE " + $where
$(, $arg)*
);
$crate::validated_where::ValidatedWhere {
table: $table,
where_clause: $where,
}
}};
}

View File

@@ -1,21 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::activity::ActivityEvent;
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum ClientMessage {
#[serde(rename = "ack")]
Ack { cursor: i64 },
#[serde(rename = "auth-token")]
AuthToken { token: String },
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum ServerMessage {
#[serde(rename = "activity")]
Activity(ActivityEvent),
#[serde(rename = "error")]
Error { message: String },
}

View File

@@ -1,41 +0,0 @@
use axum::{
Router,
extract::{Extension, Query, State, ws::WebSocketUpgrade},
response::IntoResponse,
routing::get,
};
use serde::Deserialize;
use uuid::Uuid;
use crate::{AppState, auth::RequestContext};
pub mod message;
mod session;
#[derive(Debug, Deserialize, Clone)]
pub struct WsQueryParams {
pub project_id: Uuid,
pub cursor: Option<i64>,
}
pub fn router() -> Router<AppState> {
Router::new().route("/ws", get(upgrade))
}
async fn upgrade(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Extension(ctx): Extension<RequestContext>,
Query(params): Query<WsQueryParams>,
) -> impl IntoResponse {
match crate::routes::organization_members::ensure_project_access(
state.pool(),
ctx.user.id,
params.project_id,
)
.await
{
Ok(_) => ws.on_upgrade(move |socket| session::handle(socket, state, ctx, params)),
Err(error) => error.into_response(),
}
}

View File

@@ -1,512 +0,0 @@
use std::sync::Arc;
use axum::extract::ws::{Message, WebSocket};
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use futures::{SinkExt, StreamExt};
use sqlx::PgPool;
use thiserror::Error;
use tokio::time::{self, MissedTickBehavior};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{Span, instrument};
use utils::ws::{WS_AUTH_REFRESH_INTERVAL, WS_BULK_SYNC_THRESHOLD, WS_TOKEN_EXPIRY_GRACE};
use uuid::Uuid;
use super::{
WsQueryParams,
message::{ClientMessage, ServerMessage},
};
use crate::{
AppState,
activity::{ActivityBroker, ActivityEvent, ActivityStream},
auth::{JwtError, JwtService, RequestContext},
db::{
activity::ActivityRepository,
auth::{AuthSessionError, AuthSessionRepository},
},
};
#[instrument(
name = "ws.session",
skip(socket, state, ctx, params),
fields(
user_id = %ctx.user.id,
project_id = %params.project_id,
org_id = tracing::field::Empty,
session_id = %ctx.session_id
)
)]
pub async fn handle(
socket: WebSocket,
state: AppState,
ctx: RequestContext,
params: WsQueryParams,
) {
let config = state.config();
let pool_ref = state.pool();
let project_id = params.project_id;
let organization_id = match crate::routes::organization_members::ensure_project_access(
pool_ref,
ctx.user.id,
project_id,
)
.await
{
Ok(org_id) => org_id,
Err(error) => {
tracing::info!(
?error,
user_id = %ctx.user.id,
%project_id,
"websocket project access denied"
);
return;
}
};
Span::current().record("org_id", format_args!("{organization_id}"));
let pool = pool_ref.clone();
let mut last_sent_seq = params.cursor;
let mut auth_state = WsAuthState::new(
state.jwt(),
pool.clone(),
ctx.session_id,
ctx.user.id,
project_id,
ctx.access_token_expires_at,
);
let mut auth_check_interval = time::interval(WS_AUTH_REFRESH_INTERVAL);
auth_check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let (mut sender, mut inbound) = socket.split();
let mut activity_stream = state.broker().subscribe(project_id);
if let Ok(history) = ActivityRepository::new(&pool)
.fetch_since(project_id, params.cursor, config.activity_default_limit)
.await
{
for event in history {
if send_activity(&mut sender, &event).await.is_err() {
return;
}
last_sent_seq = Some(event.seq);
}
}
tracing::debug!(org_id = %organization_id, project_id = %project_id, "starting websocket session");
loop {
tokio::select! {
maybe_activity = activity_stream.next() => {
match maybe_activity {
Some(Ok(event)) => {
tracing::trace!(?event, "received activity event");
assert_eq!(event.project_id, project_id, "activity stream emitted cross-project event");
if let Some(prev_seq) = last_sent_seq {
if prev_seq >= event.seq {
continue;
}
if event.seq > prev_seq + 1 {
tracing::warn!(
expected_next = prev_seq + 1,
actual = event.seq,
org_id = %organization_id,
project_id = %project_id,
"activity stream skipped sequence; running catch-up"
);
match activity_stream_catch_up(
&mut sender,
&pool,
project_id,
organization_id,
prev_seq,
state.broker(),
config.activity_catchup_batch_size,
WS_BULK_SYNC_THRESHOLD as i64,
"gap",
).await {
Ok((seq, stream)) => {
last_sent_seq = Some(seq);
activity_stream = stream;
}
Err(()) => break,
}
continue;
}
}
if send_activity(&mut sender, &event).await.is_err() {
break;
}
last_sent_seq = Some(event.seq);
}
Some(Err(BroadcastStreamRecvError::Lagged(skipped))) => {
tracing::warn!(skipped, org_id = %organization_id, project_id = %project_id, "activity stream lagged");
let Some(prev_seq) = last_sent_seq else {
tracing::info!(
org_id = %organization_id,
project_id = %project_id,
"activity stream lagged without baseline; forcing bulk sync"
);
let _ = send_error(&mut sender, "activity backlog dropped").await;
break;
};
match activity_stream_catch_up(
&mut sender,
&pool,
project_id,
organization_id,
prev_seq,
state.broker(),
config.activity_catchup_batch_size,
WS_BULK_SYNC_THRESHOLD as i64,
"lag",
).await {
Ok((seq, stream)) => {
last_sent_seq = Some(seq);
activity_stream = stream;
}
Err(()) => break,
}
}
None => break,
}
}
maybe_message = inbound.next() => {
match maybe_message {
Some(Ok(msg)) => {
if matches!(msg, Message::Close(_)) {
break;
}
if let Message::Text(text) = msg {
match serde_json::from_str::<ClientMessage>(&text) {
Ok(ClientMessage::Ack { .. }) => {}
Ok(ClientMessage::AuthToken { token }) => {
auth_state.store_token(token);
}
Err(error) => {
tracing::debug!(?error, "invalid inbound message");
}
}
}
}
Some(Err(error)) => {
tracing::debug!(?error, "websocket receive error");
break;
}
None => break,
}
}
_ = auth_check_interval.tick() => {
match auth_state.verify().await {
Ok(()) => {}
Err(error) => {
tracing::info!(?error, "closing websocket due to auth verification error");
let message = match error {
AuthVerifyError::Revoked => "authorization revoked",
AuthVerifyError::MembershipRevoked => "project access revoked",
AuthVerifyError::Expired => "authorization expired",
AuthVerifyError::UserMismatch { .. }
| AuthVerifyError::Decode(_)
| AuthVerifyError::Session(_) => "authorization error",
};
let _ = send_error(&mut sender, message).await;
let _ = sender.send(Message::Close(None)).await;
break;
}
}
}
}
}
}
async fn send_activity(
sender: &mut futures::stream::SplitSink<WebSocket, Message>,
event: &ActivityEvent,
) -> Result<(), ()> {
tracing::trace!(
event_type = %event.event_type.as_str(),
project_id = %event.project_id,
"sending activity event"
);
match serde_json::to_string(&ServerMessage::Activity(event.clone())) {
Ok(json) => sender
.send(Message::Text(json.into()))
.await
.map_err(|error| {
tracing::debug!(?error, "failed to send activity message");
}),
Err(error) => {
tracing::error!(?error, "failed to serialise activity event");
Err(())
}
}
}
async fn send_error(
sender: &mut futures::stream::SplitSink<WebSocket, Message>,
message: &str,
) -> Result<(), ()> {
match serde_json::to_string(&ServerMessage::Error {
message: message.to_string(),
}) {
Ok(json) => sender
.send(Message::Text(json.into()))
.await
.map_err(|error| {
tracing::debug!(?error, "failed to send websocket error message");
}),
Err(error) => {
tracing::error!(?error, "failed to serialise websocket error message");
Err(())
}
}
}
struct WsAuthState {
jwt: Arc<JwtService>,
pool: PgPool,
session_id: Uuid,
expected_user_id: Uuid,
project_id: Uuid,
token_expires_at: DateTime<Utc>,
new_access_token: Option<String>,
}
impl WsAuthState {
fn new(
jwt: Arc<JwtService>,
pool: PgPool,
session_id: Uuid,
expected_user_id: Uuid,
project_id: Uuid,
token_expires_at: DateTime<Utc>,
) -> Self {
Self {
jwt,
pool,
session_id,
expected_user_id,
project_id,
new_access_token: None,
token_expires_at,
}
}
fn store_token(&mut self, token: String) {
self.new_access_token = Some(token);
}
async fn verify(&mut self) -> Result<(), AuthVerifyError> {
if let Some(token) = self.new_access_token.take() {
let token_details = self
.jwt
.decode_access_token_with_leeway(&token, WS_TOKEN_EXPIRY_GRACE.as_secs())
.map_err(AuthVerifyError::Decode)?;
self.apply_identity(token_details.user_id, token_details.session_id)
.await?;
self.token_expires_at = token_details.expires_at;
}
self.validate_token_expiry()?;
self.validate_session().await?;
self.validate_membership().await
}
async fn apply_identity(
&mut self,
user_id: Uuid,
session_id: Uuid,
) -> Result<(), AuthVerifyError> {
if user_id != self.expected_user_id {
return Err(AuthVerifyError::UserMismatch {
expected: self.expected_user_id,
received: user_id,
});
}
self.session_id = session_id;
self.validate_session().await
}
fn validate_token_expiry(&self) -> Result<(), AuthVerifyError> {
if self.token_expires_at + ws_leeway_duration() > Utc::now() {
return Ok(());
}
Err(AuthVerifyError::Expired)
}
async fn validate_session(&self) -> Result<(), AuthVerifyError> {
let repo = AuthSessionRepository::new(&self.pool);
let session = repo
.get(self.session_id)
.await
.map_err(AuthVerifyError::Session)?;
if session.revoked_at.is_some() {
return Err(AuthVerifyError::Revoked);
}
Ok(())
}
async fn validate_membership(&self) -> Result<(), AuthVerifyError> {
crate::routes::organization_members::ensure_project_access(
&self.pool,
self.expected_user_id,
self.project_id,
)
.await
.map(|_| ())
.map_err(|error| {
tracing::warn!(
?error,
user_id = %self.expected_user_id,
project_id = %self.project_id,
"websocket membership validation failed"
);
AuthVerifyError::MembershipRevoked
})
}
}
fn ws_leeway_duration() -> ChronoDuration {
ChronoDuration::from_std(WS_TOKEN_EXPIRY_GRACE).unwrap()
}
#[derive(Debug, Error)]
enum AuthVerifyError {
#[error(transparent)]
Decode(#[from] JwtError),
#[error("received token for unexpected user: expected {expected}, received {received}")]
UserMismatch { expected: Uuid, received: Uuid },
#[error(transparent)]
Session(#[from] AuthSessionError),
#[error("session revoked")]
Revoked,
#[error("organization membership revoked")]
MembershipRevoked,
#[error("access token expired")]
Expired,
}
#[allow(clippy::too_many_arguments)]
async fn activity_stream_catch_up(
sender: &mut futures::stream::SplitSink<WebSocket, Message>,
pool: &PgPool,
project_id: Uuid,
organization_id: Uuid,
last_seq: i64,
broker: &ActivityBroker,
batch_size: i64,
bulk_limit: i64,
reason: &'static str,
) -> Result<(i64, ActivityStream), ()> {
let mut activity_stream = broker.subscribe(project_id);
let event = match activity_stream.next().await {
Some(Ok(event)) => event,
Some(Err(_)) | None => {
let _ = send_error(sender, "activity backlog dropped").await;
return Err(());
}
};
let target_seq = event.seq;
if target_seq <= last_seq {
return Ok((last_seq, activity_stream));
}
let bulk_limit = bulk_limit.max(1);
let diff = target_seq - last_seq;
if diff > bulk_limit {
tracing::info!(
org_id = %organization_id,
project_id = %project_id,
threshold = bulk_limit,
reason,
"activity catch up exceeded threshold; forcing bulk sync"
);
let _ = send_error(sender, "activity backlog dropped").await;
return Err(());
}
let catch_up_result = catch_up_from_db(
sender,
pool,
project_id,
organization_id,
last_seq,
target_seq,
batch_size.max(1),
)
.await;
match catch_up_result {
Ok(seq) => Ok((seq, activity_stream)),
Err(CatchUpError::Stale) => {
let _ = send_error(sender, "activity backlog dropped").await;
Err(())
}
Err(CatchUpError::Send) => Err(()),
}
}
#[derive(Debug, Error)]
enum CatchUpError {
#[error("activity stream went stale during catch up")]
Stale,
#[error("failed to send activity event")]
Send,
}
async fn catch_up_from_db(
sender: &mut futures::stream::SplitSink<WebSocket, Message>,
pool: &PgPool,
project_id: Uuid,
organization_id: Uuid,
last_seq: i64,
target_seq: i64,
batch_size: i64,
) -> Result<i64, CatchUpError> {
let repository = ActivityRepository::new(pool);
let mut current_seq = last_seq;
let mut cursor = last_seq;
loop {
let events = repository
.fetch_since(project_id, Some(cursor), batch_size)
.await
.map_err(|error| {
tracing::error!(?error, org_id = %organization_id, project_id = %project_id, "failed to fetch activity catch up");
CatchUpError::Stale
})?;
if events.is_empty() {
tracing::warn!(org_id = %organization_id, project_id = %project_id, "activity catch up returned no events");
return Err(CatchUpError::Stale);
}
for event in events {
if event.seq <= current_seq {
continue;
}
if event.seq > target_seq {
return Ok(current_seq);
}
if send_activity(sender, &event).await.is_err() {
return Err(CatchUpError::Send);
}
current_seq = event.seq;
cursor = event.seq;
}
if current_seq >= target_seq {
break;
}
}
Ok(current_seq)
}