Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions desktop/src/app/AppShell.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import { PreventSleepProvider } from "@/features/agents/usePreventSleep";
import { requestOpenCreateAgent } from "@/features/agents/openCreateAgentEvent";
import { useAgentsDataRefresh } from "@/features/agents/lib/useAgentsDataRefresh";
import { usePersonaSync } from "@/features/agents/lib/usePersonaSync";
import { useAgentObserverIngestion } from "@/features/agents/useAgentObserverIngestion";
import {
usePresenceSession,
usePresenceSubscription,
Expand Down Expand Up @@ -146,6 +147,10 @@ export function AppShell() {
);
usePersonaSync(identityQuery.data?.pubkey);
useAgentsDataRefresh();
// Owner-global observer ingestion: receives + decrypts agent observer
// frames and keeps derived active-turn liveness in sync app-wide, so no
// individual screen/panel has to mount its own bridge for ingestion.
useAgentObserverIngestion();
const profileQuery = useProfileQuery();
const deferredPubkey = startupReady ? identityQuery.data?.pubkey : undefined;
useRelayAutoHeal();
Expand Down
6 changes: 2 additions & 4 deletions desktop/src/features/agents/ui/useManagedAgentActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import {
} from "@/features/agents/hooks";
import { useChannelsQuery } from "@/features/channels/hooks";
import { usePresenceQuery } from "@/features/presence/hooks";
import { useManagedAgentObserverBridge } from "@/features/agents/observerRelayStore";
import { useActiveAgentTurnsBridge } from "@/features/agents/activeAgentTurnsStore";
import type {
AcpRuntime,
AcpRuntimeCatalogEntry,
Expand Down Expand Up @@ -85,8 +83,8 @@ export function useManagedAgentActions() {
}),
[managedAgentsQuery.data],
);
useManagedAgentObserverBridge(managedAgents);
useActiveAgentTurnsBridge(managedAgents);
// Observer ingestion is owner-global (useAgentObserverIngestion in
// AppShell); this hook only reads derived state.

const managedPubkeys = React.useMemo(
() => new Set(managedAgents.map((agent) => agent.pubkey)),
Expand Down
88 changes: 88 additions & 0 deletions desktop/src/features/agents/useAgentObserverIngestion.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import assert from "node:assert/strict";
import { describe, it } from "node:test";

import { combineObserverIngestionAgents } from "./useAgentObserverIngestion.ts";

const ME = "aaaa1234aaaa1234aaaa1234aaaa1234aaaa1234aaaa1234aaaa1234aaaa1234";
const OTHER =
"bbbb4321bbbb4321bbbb4321bbbb4321bbbb4321bbbb4321bbbb4321bbbb4321";
const AGENT_LOCAL =
"cccc1111cccc1111cccc1111cccc1111cccc1111cccc1111cccc1111cccc1111";
const AGENT_REMOTE =
"dddd2222dddd2222dddd2222dddd2222dddd2222dddd2222dddd2222dddd2222";
const AGENT_FOREIGN =
"eeee3333eeee3333eeee3333eeee3333eeee3333eeee3333eeee3333eeee3333";

describe("combineObserverIngestionAgents", () => {
it("keeps managed agents with their real status", () => {
const result = combineObserverIngestionAgents(
[{ pubkey: AGENT_LOCAL, status: "running" }],
[],
new Map(),
ME,
);
assert.deepEqual(result, [{ pubkey: AGENT_LOCAL, status: "running" }]);
});

it("adds declared-owned relay agents as deployed", () => {
const result = combineObserverIngestionAgents(
[],
[AGENT_REMOTE],
new Map([[AGENT_REMOTE, ME]]),
ME,
);
assert.deepEqual(result, [{ pubkey: AGENT_REMOTE, status: "deployed" }]);
});

it("excludes relay agents owned by someone else", () => {
const result = combineObserverIngestionAgents(
[],
[AGENT_FOREIGN],
new Map([[AGENT_FOREIGN, OTHER]]),
ME,
);
assert.deepEqual(result, []);
});

it("excludes relay agents with no declared owner", () => {
const result = combineObserverIngestionAgents(
[],
[AGENT_REMOTE],
new Map(),
ME,
);
assert.deepEqual(result, []);
});

it("does not duplicate an agent that is both managed and on the relay", () => {
const result = combineObserverIngestionAgents(
[{ pubkey: AGENT_LOCAL, status: "stopped" }],
[AGENT_LOCAL],
new Map([[AGENT_LOCAL, ME]]),
ME,
);
assert.deepEqual(result, [{ pubkey: AGENT_LOCAL, status: "stopped" }]);
});

it("matches ownership case-insensitively", () => {
const result = combineObserverIngestionAgents(
[],
[AGENT_REMOTE.toUpperCase()],
new Map([[AGENT_REMOTE, ME.toUpperCase()]]),
ME,
);
assert.deepEqual(result, [
{ pubkey: AGENT_REMOTE.toUpperCase(), status: "deployed" },
]);
});

it("returns only managed agents when identity is not resolved yet", () => {
const result = combineObserverIngestionAgents(
[{ pubkey: AGENT_LOCAL, status: "running" }],
[AGENT_REMOTE],
new Map([[AGENT_REMOTE, ME]]),
undefined,
);
assert.deepEqual(result, [{ pubkey: AGENT_LOCAL, status: "running" }]);
});
});
105 changes: 105 additions & 0 deletions desktop/src/features/agents/useAgentObserverIngestion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import * as React from "react";

import { useActiveAgentTurnsBridge } from "@/features/agents/activeAgentTurnsStore";
import {
useManagedAgentsQuery,
useRelayAgentsQuery,
} from "@/features/agents/hooks";
import { useManagedAgentObserverBridge } from "@/features/agents/observerRelayStore";
import { useUsersBatchQuery } from "@/features/profile/hooks";
import { useIdentityQuery } from "@/shared/api/hooks";
import type { ManagedAgent } from "@/shared/api/types";
import { normalizePubkey } from "@/shared/lib/pubkey";

type IngestionAgent = Pick<ManagedAgent, "pubkey" | "status">;

/**
* Combine locally managed agents with relay agents the current identity
* declared-owns (NIP-OA `ownerPubkey == me`) into one ingestion list.
*
* Managed agents keep their real status; owned relay agents that are not
* managed locally are treated as `deployed` so the observer subscription
* starts and their frames decrypt. Registering non-owned agents would be
* pointless — observer frames are `#p`-addressed to the owner, so frames for
* agents we do not own never arrive on our subscription in the first place.
*/
export function combineObserverIngestionAgents(
managedAgents: readonly IngestionAgent[],
relayAgentPubkeys: readonly string[],
ownerByPubkey: ReadonlyMap<string, string>,
currentPubkey: string | null | undefined,
): IngestionAgent[] {
const managed = managedAgents.map((agent) => ({
pubkey: agent.pubkey,
status: agent.status,
}));
if (!currentPubkey) {
return managed;
}

const managedSet = new Set(
managed.map((agent) => normalizePubkey(agent.pubkey)),
);
const me = normalizePubkey(currentPubkey);
const owned: IngestionAgent[] = [];
for (const pubkey of relayAgentPubkeys) {
const key = normalizePubkey(pubkey);
if (managedSet.has(key)) {
continue;
}
const owner = ownerByPubkey.get(key);
if (owner && normalizePubkey(owner) === me) {
owned.push({ pubkey, status: "deployed" as const });
}
}
return [...managed, ...owned];
}

/**
* App-level owner-global observer ingestion.
*
* Mounted once in AppShell so observer frames (kind 24200) are received,
* decrypted, and folded into the derived active-turns store regardless of
* which screen or panel happens to be open. Individual surfaces read from the
* stores; none of them need to mount their own bridge for ingestion to work.
*
* This is the product invariant: if the current identity owns an agent (local
* managed agent or declared-owned relay agent), its turn activity is ingested
* app-wide — not only while a panel that happens to mount a bridge is open.
*/
export function useAgentObserverIngestion() {
const identityQuery = useIdentityQuery();
const currentPubkey = identityQuery.data?.pubkey;

const managedAgentsQuery = useManagedAgentsQuery();
const managedAgents = managedAgentsQuery.data;

const relayAgentsQuery = useRelayAgentsQuery();
const relayAgentPubkeys = React.useMemo(
() => (relayAgentsQuery.data ?? []).map((agent) => agent.pubkey),
[relayAgentsQuery.data],
);

const profilesQuery = useUsersBatchQuery(relayAgentPubkeys, {
enabled: Boolean(currentPubkey) && relayAgentPubkeys.length > 0,
});
const profiles = profilesQuery.data?.profiles;

const ingestionAgents = React.useMemo(() => {
const ownerByPubkey = new Map<string, string>();
for (const [pubkey, summary] of Object.entries(profiles ?? {})) {
if (summary.ownerPubkey) {
ownerByPubkey.set(normalizePubkey(pubkey), summary.ownerPubkey);
}
}
return combineObserverIngestionAgents(
managedAgents ?? [],
relayAgentPubkeys,
ownerByPubkey,
currentPubkey,
);
}, [currentPubkey, managedAgents, profiles, relayAgentPubkeys]);

useManagedAgentObserverBridge(ingestionAgents);
useActiveAgentTurnsBridge(ingestionAgents);
}
15 changes: 2 additions & 13 deletions desktop/src/features/agents/usePreventSleep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { useManagedAgentsQuery } from "@/features/agents/hooks";
import {
getAgentObserverSnapshot,
subscribeAgentObserverStore,
useManagedAgentObserverBridge,
} from "@/features/agents/observerRelayStore";
import { createPreventSleepActivityTracker } from "@/features/agents/preventSleepActivity";
import { setPreventSleepActive } from "@/shared/api/tauri";
Expand Down Expand Up @@ -68,18 +67,8 @@ function usePreventSleepInternal() {
);

const runningAgentPubkeyKey = runningAgentPubkeys.join(",");
const runningObserverAgents = React.useMemo(
() =>
enabled && runningAgentPubkeyKey
? runningAgentPubkeyKey.split(",").map((pubkey) => ({
pubkey,
status: "running" as const,
}))
: [],
[enabled, runningAgentPubkeyKey],
);

useManagedAgentObserverBridge(runningObserverAgents);
// Observer ingestion is owner-global (useAgentObserverIngestion in
// AppShell); this hook only reads observer snapshots for activity tracking.

const hasRunningAgents = runningAgentPubkeys.length > 0;

Expand Down
33 changes: 3 additions & 30 deletions desktop/src/features/channels/ui/ChannelScreen.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import {
usePersonasQuery,
useRelayAgentsQuery,
} from "@/features/agents/hooks";
import { useActiveAgentTurnsBridge } from "@/features/agents/activeAgentTurnsStore";
import { useManagedAgentObserverBridge } from "@/features/agents/observerRelayStore";
import {
mergeMessages,
useChannelMessagesQuery,
Expand Down Expand Up @@ -362,34 +360,9 @@ export function ChannelScreen({
relayAgents,
typingEntries,
});
const observerBridgeAgents = React.useMemo(() => {
if (
!profilePanelPubkey ||
!openAgentSessionPubkey ||
normalizePubkey(profilePanelPubkey) !==
normalizePubkey(openAgentSessionPubkey) ||
managedAgents.some(
(agent) =>
normalizePubkey(agent.pubkey) === normalizePubkey(profilePanelPubkey),
)
) {
return managedAgents;
}

return [
...managedAgents,
{
pubkey: profilePanelPubkey,
status: "deployed" as const,
},
];
}, [managedAgents, openAgentSessionPubkey, profilePanelPubkey]);
useManagedAgentObserverBridge(observerBridgeAgents);
// Derive active-turn/liveness state from the same observer events. Without
// this, raw observer frames (e.g. turn_completed) reach the activity panel
// while the derived active-turns store stays stale, leaving the liveness
// indicator spinning after the turn already finished.
useActiveAgentTurnsBridge(observerBridgeAgents);
// Observer ingestion (frame decryption + derived active-turn liveness) is
// owner-global — mounted once in AppShell via useAgentObserverIngestion —
// so this screen no longer mounts its own observer/turns bridges.
const messageProfiles = React.useMemo(() => {
const base =
mergeCurrentProfileIntoLookup(
Expand Down
38 changes: 3 additions & 35 deletions desktop/src/features/profile/ui/UserProfilePanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ import {
useUpdatePersonaMutation,
} from "@/features/agents/hooks";
import { AddAgentToChannelDialog } from "@/features/agents/ui/AddAgentToChannelDialog";
import { useActiveAgentTurnsBridge } from "@/features/agents/activeAgentTurnsStore";
import { resolvePersonaRuntime } from "@/features/agents/lib/resolvePersonaRuntime";
import {
isManagedAgentActive,
startManagedAgentWithRules,
stopManagedAgentWithRules,
} from "@/features/agents/lib/managedAgentControlActions";
import { useManagedAgentObserverBridge } from "@/features/agents/observerRelayStore";
import { describeLogFile } from "@/features/agents/ui/agentUi";
import { EditAgentDialog } from "@/features/agents/ui/EditAgentDialog";
import {
Expand Down Expand Up @@ -90,7 +88,6 @@ import type {
Channel,
CreateManagedAgentInput,
CreatePersonaInput,
ManagedAgent,
UpdatePersonaInput,
} from "@/shared/api/types";
import { UserProfilePanelFrame } from "@/features/profile/ui/UserProfilePanelFrame";
Expand Down Expand Up @@ -278,38 +275,9 @@ export function UserProfilePanel({
// real boundary is server-side, so this only controls what UI we paint.
const viewerIsOwner = isCurrentUserOwner || isOwner === true;

// Populate the active-turns store for this agent so useActiveAgentTurns works
// even if the Agents page hasn't been visited yet.
const bridgeAgents = React.useMemo(
() =>
managedAgent
? [{ pubkey: managedAgent.pubkey, status: managedAgent.status }]
: [],
[managedAgent],
);
// The observer bridge subscribes on the OWNER's own pubkey and decrypts the
// agent's telemetry with the owner's key — no agent seckey needed. It only
// decrypts frames whose agent pubkey is "known", and only subscribes when an
// agent is running/deployed. For a remote agent we own but don't manage
// locally, `managedAgent` is undefined, so we seed the bridge from the relay
// agent (treated as "deployed") when the viewer is the declared owner. This
// mirrors what the composer-area ingress already does in ChannelScreen.
const observerBridgeAgents = React.useMemo(() => {
if (managedAgent) {
return [{ pubkey: managedAgent.pubkey, status: managedAgent.status }];
}
if (viewerIsOwner && relayAgent) {
return [
{
pubkey: relayAgent.pubkey,
status: "deployed" as ManagedAgent["status"],
},
];
}
return [];
}, [managedAgent, relayAgent, viewerIsOwner]);
useActiveAgentTurnsBridge(bridgeAgents);
useManagedAgentObserverBridge(observerBridgeAgents);
// Observer ingestion (frame decryption + derived active-turn liveness) is
// owner-global — mounted once in AppShell via useAgentObserverIngestion —
// covering both locally managed agents and declared-owned relay agents.
const canEditAgent =
isOwner === true &&
(managedAgent !== undefined ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ import * as React from "react";

import {
type ActiveChannelTurnSummary,
useActiveAgentTurnsBridge,
useActiveAgentTurnsByChannel,
} from "@/features/agents/activeAgentTurnsStore";
import { useManagedAgentsQuery } from "@/features/agents/hooks";
import { useManagedAgentObserverBridge } from "@/features/agents/observerRelayStore";
import { normalizePubkey } from "@/shared/lib/pubkey";

export function resolveActiveWorkingChannelNames(
Expand Down Expand Up @@ -36,9 +34,8 @@ export function useActiveWorkingChannelsById(): ReadonlyMap<
[managedAgentsQuery.data],
);

useManagedAgentObserverBridge(managedAgents);
useActiveAgentTurnsBridge(managedAgents);

// Observer ingestion is owner-global (useAgentObserverIngestion in
// AppShell); this hook only reads derived state.
const activeWorkingChannels = useActiveAgentTurnsByChannel();
return React.useMemo(
() =>
Expand Down
Loading