Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/operator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@repo/logger": "workspace:*",
"drizzle-orm": "0.45.2",
"hono": "4.12.9",
"node-html-markdown": "2.0.0",
"openai": "6.33.0",
"zod": "4.3.6"
},
Expand Down
7 changes: 5 additions & 2 deletions apps/operator/src/modules/telegram/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { TelegramService } from "../../services/telegram";
import type { AppEnv } from "../../types/env";
import type { TelegramUpdate } from "../../types/telegram";
import { splitMessage } from "../../utils/message";
import { validateSourceUrl } from "../../utils/url-validator";

const DAYS = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"];

Expand Down Expand Up @@ -200,9 +201,11 @@ const handleWebhook = async (c: Context<AppEnv, string, WebhookInput>) => {
return { error: validation.error.message };
}

// Monitor support (source_url) is not yet implemented — reject
if (input.sourceUrl) {
return { error: "URL monitors are not yet supported" };
const urlCheck = validateSourceUrl(input.sourceUrl);
if (!urlCheck.valid) {
return { error: urlCheck.reason };
}
}

// Store as pending in D1 — require user confirmation
Expand Down
70 changes: 67 additions & 3 deletions apps/operator/src/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { Logger } from "@repo/logger";

import { OpenAiService } from "./services/openai";
import { ScheduleService } from "./services/schedule";
import { scrapeUrl } from "./services/scrape";
import { TelegramService } from "./services/telegram";
import type { AppEnv } from "./types/env";
import { splitMessage } from "./utils/message";
import { validateSourceUrl } from "./utils/url-validator";

type Env = AppEnv["Bindings"];

Expand Down Expand Up @@ -41,11 +43,73 @@ const handleScheduled = async (

const results = await Promise.allSettled(
claimed.map(async (schedule) => {
// Monitor path (Stage 2 — stub)
// Monitor path
if (schedule.sourceUrl) {
logger.info("monitor execution not yet implemented, skipping", {
scheduleId: schedule.id,
const urlCheck = validateSourceUrl(schedule.sourceUrl);
if (!urlCheck.valid) {
logger.error("monitor URL validation failed at execution", {
scheduleId: schedule.id,
reason: urlCheck.reason,
});
return;
}

const scrapeResult = await scrapeUrl(schedule.sourceUrl);
if (!scrapeResult.ok) {
throw new Error(`Scrape failed: ${scrapeResult.error}`);
}

let previousState: string | null = null;
if (schedule.stateJson) {
try {
const parsed = JSON.parse(schedule.stateJson) as Record<
string,
unknown
>;
previousState =
typeof parsed.lastContent === "string"
? parsed.lastContent
: null;
} catch {
logger.warn("malformed stateJson, treating as first run", {
scheduleId: schedule.id,
});
}
}

let scrapedContent = scrapeResult.text;
if (scrapeResult.truncated) {
scrapedContent +=
"\n\n[Note: Page content was truncated. Analysis is based on partial content.]";
}

const openai = new OpenAiService(env.OPENAI_API_KEY, logger);
const analysis = await openai.analyzeMonitor({
task: schedule.messagePrompt ?? schedule.description,
scrapedContent,
previousState,
Comment on lines +87 to +90
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle truncated scrape content before monitor analysis

When scrapeUrl truncates large pages, this code still feeds the partial text into analyzeMonitor and then persists the returned state as if it were complete. For pages over the 80k-character cap, matches or changes after the cutoff are silently ignored, which can produce false negatives and poison future comparisons with incomplete state. The run should either fail/defer on truncation or explicitly propagate truncation context so state updates are not based on partial content.

Useful? React with 👍 / 👎.

});

if (analysis.notify) {
for (const chunk of splitMessage(analysis.message)) {
await telegram.sendMessage({ chat_id: chatId, text: chunk });
}
logger.info("monitor notification sent", {
scheduleId: schedule.id,
});
} else {
logger.info("monitor check — no notification needed", {
scheduleId: schedule.id,
});
}

await scheduleService.updateState(
schedule.id,
JSON.stringify({
lastContent: analysis.newState,
lastScrapedAt: now.toISOString(),
})
);
return;
}

Expand Down
140 changes: 140 additions & 0 deletions apps/operator/src/services/openai.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,144 @@ describe("OpenAiService", () => {
expect(executor).not.toHaveBeenCalled();
});
});

describe("analyzeMonitor", () => {
it("returns parsed monitor analysis", async () => {
const analysis = {
notify: true,
message: "Seinfeld is on TV at 20:00 on MTV3",
newState: "Current listings include Seinfeld at 20:00",
};
createMock.mockResolvedValueOnce({
choices: [{ message: { content: JSON.stringify(analysis) } }],
});

const result = await service.analyzeMonitor({
task: "Check if Seinfeld is on today",
scrapedContent: "# TV Listings\n- 20:00 Seinfeld (MTV3)",
previousState: null,
});

expect(result).toEqual(analysis);
});

it("sends structured request with json_object format", async () => {
createMock.mockResolvedValueOnce({
choices: [
{
message: {
content: JSON.stringify({
notify: false,
message: "",
newState: "no changes",
}),
},
},
],
});

await service.analyzeMonitor({
task: "Check for changes",
scrapedContent: "content",
previousState: "old state",
});

expect(createMock).toHaveBeenCalledWith(
expect.objectContaining({
response_format: { type: "json_object" },
max_completion_tokens: 4096,
})
);
});

it("includes previous state in user message", async () => {
createMock.mockResolvedValueOnce({
choices: [
{
message: {
content: JSON.stringify({
notify: true,
message: "Changed",
newState: "new",
}),
},
},
],
});

await service.analyzeMonitor({
task: "Check diff",
scrapedContent: "new content",
previousState: "old content summary",
});

const callArgs = createMock.mock.calls[0] as [
{ messages: { content: string }[] },
];
const userMsg = callArgs[0].messages[1].content;
expect(userMsg).toContain("old content summary");
});

it("uses placeholder when no previous state", async () => {
createMock.mockResolvedValueOnce({
choices: [
{
message: {
content: JSON.stringify({
notify: true,
message: "First check",
newState: "initial",
}),
},
},
],
});

await service.analyzeMonitor({
task: "Monitor page",
scrapedContent: "content",
previousState: null,
});

const callArgs = createMock.mock.calls[0] as [
{ messages: { content: string }[] },
];
const userMsg = callArgs[0].messages[1].content;
expect(userMsg).toContain("First check");
});

it("throws on empty response", async () => {
createMock.mockResolvedValueOnce({
choices: [{ message: { content: null } }],
});

await expect(
service.analyzeMonitor({
task: "test",
scrapedContent: "content",
previousState: null,
})
).rejects.toThrow("OpenAI returned empty response for monitor analysis");
});

it("throws on invalid JSON structure", async () => {
createMock.mockResolvedValueOnce({
choices: [
{
message: {
content: JSON.stringify({ wrong: "structure" }),
},
},
],
});

await expect(
service.analyzeMonitor({
task: "test",
scrapedContent: "content",
previousState: null,
})
).rejects.toThrow();
});
});
});
82 changes: 81 additions & 1 deletion apps/operator/src/services/openai.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Logger } from "@repo/logger";
import OpenAI from "openai";
import type { ChatCompletionTool } from "openai/resources/chat/completions";
import { z } from "zod";

const SYSTEM_PROMPT = `You are a helpful personal assistant called Switch Operator. Be concise and helpful.

Expand All @@ -16,6 +17,14 @@ Schedule types:

Use fixed_message for exact text or message_prompt for AI-generated content.

You can also create web monitors that scrape a URL on a schedule and notify based on conditions.
When the user wants to monitor a website for changes or check for specific content, use create_schedule with source_url + message_prompt.
The message_prompt should describe what to look for or how to analyze the page content.

Monitor examples:
- "Notify me when a specific show is on TV" → source_url with the TV listings page, message_prompt: "Check if [show name] appears in today's listings. Notify with channel and time if found."
- "Weekly report changes" → source_url with the report page, message_prompt: "Compare this week's content to last week. Summarize key changes."

When listing schedules, format them as a numbered list (1, 2, 3...) with key details like description, type, time, and next run.
When the user asks to delete a schedule by number, first call list_schedules to get the current list, then use the ID from the matching position to call delete_schedule.`;

Expand Down Expand Up @@ -64,6 +73,11 @@ const SCHEDULE_TOOLS: ChatCompletionTool[] = [
description:
"Prompt for AI-generated message. Mutually exclusive with fixed_message.",
},
source_url: {
type: "string",
description:
"URL to monitor/scrape. When set, the schedule becomes a monitor: it will fetch this URL on each run, analyze the content using message_prompt, and notify only if the condition is met. Requires message_prompt. Cannot be used with fixed_message.",
},
description: {
type: "string",
description: "Short description of this schedule (max 200 chars).",
Expand Down Expand Up @@ -216,7 +230,73 @@ class OpenAiService {

throw new Error("Tool calling exceeded maximum iterations");
}

async analyzeMonitor(params: {
task: string;
scrapedContent: string;
previousState: string | null;
}): Promise<MonitorAnalysis> {
this.logger.debug("analyzing monitor", {
taskLength: params.task.length,
contentLength: params.scrapedContent.length,
hasPreviousState: params.previousState != null,
});

const previousStateText =
params.previousState ?? "First check — no previous state.";

const response = await this.client.chat.completions.create({
model: "gpt-5.4-mini",
max_completion_tokens: 4096,
response_format: { type: "json_object" },
messages: [
{ role: "system", content: MONITOR_ANALYSIS_PROMPT },
{
role: "user",
content: `## Task\n${params.task}\n\n## Current page content\n${params.scrapedContent}\n\n## Previous state\n${previousStateText}`,
},
],
});

const content = response.choices[0]?.message.content;
if (!content) {
throw new Error("OpenAI returned empty response for monitor analysis");
}

const parsed: unknown = JSON.parse(content);
return monitorAnalysisSchema.parse(parsed);
}
}

const MONITOR_ANALYSIS_PROMPT = `You are analyzing a web page for a monitoring task.

The user will provide:
1. A task describing what to check or monitor
2. The current page content (scraped and converted to markdown)
3. Previous state from the last check (or "First check" if this is the first run)

Respond in JSON with exactly these fields:
{
"notify": true or false,
"message": "notification message to send to the user (max 4000 chars, use markdown formatting)",
"newState": "concise summary of current state for comparison next time (max 5000 chars)"
}

Rules:
- Only set "notify" to true if the condition described in the task is met
- For diff/change detection tasks: compare current content to previous state and summarize what changed. Notify if there are meaningful changes.
- For condition check tasks: evaluate whether the specific condition is satisfied. Notify only if it is.
- The "message" should be informative and actionable — include relevant details from the page
- The "newState" should contain enough information to compare against next time. Keep it concise.
- If this is the first check, always set notify to true with a summary of current state`;

const monitorAnalysisSchema = z.object({
notify: z.boolean(),
message: z.string().max(4000),
newState: z.string().max(5000),
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MONITOR_ANALYSIS_PROMPT instructs newState to be max 2000 chars, but the runtime schema allows up to 5000. This mismatch makes it unclear what limit the system actually relies on and can lead to larger-than-expected state being stored. Align the schema with the prompt (or update the prompt and downstream storage expectations) to keep monitor state bounded and consistent.

Suggested change
newState: z.string().max(5000),
newState: z.string().max(2000),

Copilot uses AI. Check for mistakes.
});

type MonitorAnalysis = z.infer<typeof monitorAnalysisSchema>;

export { MAX_TOOL_ITERATIONS, OpenAiService, SCHEDULE_TOOLS };
export type { ToolExecutor, ToolResult };
export type { MonitorAnalysis, ToolExecutor, ToolResult };
Loading
Loading