feat(scheduler): scheduled-post storage, worker, and UI

- Add scheduled_posts table and migrations.
- Add ScheduledPost model and storage functions.
- Add Tauri commands: list_scheduled_posts, save_scheduled_post, cancel_scheduled_post.
- Add background scheduler worker (30s tick) that finds due pending posts and executes publish.
- Add Svelte scheduled store and wired Scheduler view with a creation form and status list.
This commit is contained in:
Nick
2026-06-15 13:30:36 +01:00
parent f6a5b58357
commit da8db4cd8e
12 changed files with 567 additions and 23 deletions

5
Cargo.lock generated
View File

@ -606,8 +606,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327"
dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-link 0.2.1",
]
@ -668,6 +670,7 @@ name = "conductor-desktop"
version = "0.7.2"
dependencies = [
"base64 0.22.1",
"chrono",
"conductor-core",
"conductor-storage",
"conductor-youtube",
@ -696,6 +699,7 @@ dependencies = [
"tokio",
"url",
"urlencoding",
"uuid",
]
[[package]]
@ -722,6 +726,7 @@ dependencies = [
"rusqlite",
"serde",
"serde_json",
"uuid",
]
[[package]]

View File

@ -10,5 +10,6 @@ conductor-core = { path = "../core" }
rusqlite = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
uuid = "1.23.3"
[dev-dependencies]

View File

@ -0,0 +1,16 @@
-- Migration 002: Scheduled posts table
CREATE TABLE IF NOT EXISTS scheduled_posts (
id TEXT PRIMARY KEY,
platform TEXT NOT NULL DEFAULT 'youtube',
title TEXT NOT NULL DEFAULT '',
body TEXT NOT NULL DEFAULT '',
video_path TEXT,
scheduled_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
error_message TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_scheduled_posts_status_time
ON scheduled_posts (status, scheduled_at);

View File

@ -1,5 +1,12 @@
pub mod db;
pub mod models;
pub mod scheduled;
pub mod scheduled_db;
pub use db::{list_items, load_item, open, save_item};
pub use models::{ContentItem, MetadataStatus};
pub use scheduled::{ScheduledPost, ScheduledPostStatus};
pub use scheduled_db::{
get_due_pending_posts, list_scheduled_posts, load_scheduled_post, save_scheduled_post,
update_scheduled_post_status,
};

View File

@ -0,0 +1,54 @@
use serde::{Deserialize, Serialize};
/// The lifecycle state of a scheduled post.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ScheduledPostStatus {
Pending,
Publishing,
Published,
Failed,
Cancelled,
}
impl std::fmt::Display for ScheduledPostStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Publishing => write!(f, "publishing"),
Self::Published => write!(f, "published"),
Self::Failed => write!(f, "failed"),
Self::Cancelled => write!(f, "cancelled"),
}
}
}
impl std::str::FromStr for ScheduledPostStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"pending" => Ok(Self::Pending),
"publishing" => Ok(Self::Publishing),
"published" => Ok(Self::Published),
"failed" => Ok(Self::Failed),
"cancelled" => Ok(Self::Cancelled),
_ => Err(format!("Unknown scheduled post status: {s}")),
}
}
}
/// A post scheduled for future publication.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledPost {
pub id: String,
pub platform: String,
pub title: String,
pub body: String,
pub video_path: Option<String>,
pub scheduled_at: String,
pub status: ScheduledPostStatus,
pub error_message: Option<String>,
pub created_at: String,
pub updated_at: String,
}

View File

@ -0,0 +1,129 @@
use rusqlite::{Connection, params};
use crate::scheduled::{ScheduledPost, ScheduledPostStatus};
fn migrate_v2(conn: &Connection) -> Result<(), rusqlite::Error> {
let table_exists: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='scheduled_posts'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if !table_exists {
conn.execute_batch(include_str!("../migrations/002_create_scheduled_posts.sql"))?;
}
Ok(())
}
fn ensure_migrated(conn: &Connection) -> Result<(), rusqlite::Error> {
migrate_v2(conn)
}
pub fn save_scheduled_post(conn: &Connection, post: &ScheduledPost) -> Result<(), rusqlite::Error> {
ensure_migrated(conn)?;
conn.execute(
"INSERT OR REPLACE INTO scheduled_posts
(id, platform, title, body, video_path, scheduled_at, status, error_message,
created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
params![
post.id,
post.platform,
post.title,
post.body,
post.video_path,
post.scheduled_at,
post.status.to_string(),
post.error_message,
post.created_at,
post.updated_at,
],
)?;
Ok(())
}
fn map_row(row: &rusqlite::Row<'_>) -> Result<ScheduledPost, rusqlite::Error> {
let status_str: String = row.get(6)?;
let status = status_str.parse().unwrap_or(ScheduledPostStatus::Pending);
Ok(ScheduledPost {
id: row.get(0)?,
platform: row.get(1)?,
title: row.get(2)?,
body: row.get(3)?,
video_path: row.get(4)?,
scheduled_at: row.get(5)?,
status,
error_message: row.get(7)?,
created_at: row.get(8)?,
updated_at: row.get(9)?,
})
}
pub fn load_scheduled_post(
conn: &Connection,
id: &str,
) -> Result<Option<ScheduledPost>, rusqlite::Error> {
ensure_migrated(conn)?;
let mut stmt = conn.prepare(
"SELECT id, platform, title, body, video_path, scheduled_at, status, error_message,
created_at, updated_at
FROM scheduled_posts WHERE id = ?1",
)?;
let result = stmt.query_row(params![id], map_row);
match result {
Ok(post) => Ok(Some(post)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e),
}
}
pub fn list_scheduled_posts(conn: &Connection) -> Result<Vec<ScheduledPost>, rusqlite::Error> {
ensure_migrated(conn)?;
let mut stmt = conn.prepare(
"SELECT id, platform, title, body, video_path, scheduled_at, status, error_message,
created_at, updated_at
FROM scheduled_posts ORDER BY scheduled_at DESC",
)?;
let posts = stmt
.query_map([], map_row)?
.filter_map(|r| r.ok())
.collect();
Ok(posts)
}
pub fn get_due_pending_posts(
conn: &Connection,
before: &str,
) -> Result<Vec<ScheduledPost>, rusqlite::Error> {
ensure_migrated(conn)?;
let mut stmt = conn.prepare(
"SELECT id, platform, title, body, video_path, scheduled_at, status, error_message,
created_at, updated_at
FROM scheduled_posts
WHERE status = 'pending' AND scheduled_at <= ?1
ORDER BY scheduled_at ASC",
)?;
let posts = stmt
.query_map(params![before], map_row)?
.filter_map(|r| r.ok())
.collect();
Ok(posts)
}
pub fn update_scheduled_post_status(
conn: &Connection,
id: &str,
status: ScheduledPostStatus,
error_message: Option<&str>,
) -> Result<(), rusqlite::Error> {
ensure_migrated(conn)?;
conn.execute(
"UPDATE scheduled_posts
SET status = ?1, error_message = ?2, updated_at = datetime('now')
WHERE id = ?3",
params![status.to_string(), error_message, id],
)?;
Ok(())
}

View File

@ -0,0 +1,99 @@
import { invoke } from '@tauri-apps/api/core';
import { listen, type UnlistenFn } from '@tauri-apps/api/event';
export type ScheduledPostStatus =
| 'pending'
| 'publishing'
| 'published'
| 'failed'
| 'cancelled';
export interface ScheduledPost {
id: string;
platform: string;
title: string;
body: string;
video_path: string | null;
scheduled_at: string;
status: ScheduledPostStatus;
error_message: string | null;
created_at: string;
updated_at: string;
}
export interface NewScheduledPost {
platform: string;
title: string;
body: string;
video_path?: string | null;
scheduled_at: string;
}
class ScheduledStore {
posts: ScheduledPost[] = $state([]);
loading: boolean = $state(false);
error: string | null = $state(null);
constructor() {
this.listenForUpdates();
}
private async listenForUpdates() {
let unlisten: UnlistenFn | undefined;
try {
unlisten = await listen<string>('scheduled-post-updated', () => {
this.fetch();
});
} catch {
// Event listener is optional; the store still works on manual refresh.
}
return () => {
if (unlisten) unlisten();
};
}
async fetch() {
this.loading = true;
this.error = null;
try {
const { posts } = await invoke<{ posts: ScheduledPost[] }>('list_scheduled_posts');
this.posts = posts;
} catch (e) {
this.error = String(e);
} finally {
this.loading = false;
}
}
async schedule(post: NewScheduledPost) {
this.loading = true;
this.error = null;
try {
const created = await invoke<ScheduledPost>('save_scheduled_post', { payload: post });
this.posts = [created, ...this.posts];
return created;
} catch (e) {
this.error = String(e);
return null;
} finally {
this.loading = false;
}
}
async cancel(id: string) {
this.loading = true;
this.error = null;
try {
await invoke('cancel_scheduled_post', { id });
this.posts = this.posts.map((p) =>
p.id === id ? { ...p, status: 'cancelled' as ScheduledPostStatus } : p
);
} catch (e) {
this.error = String(e);
} finally {
this.loading = false;
}
}
}
export const scheduledStore = new ScheduledStore();

View File

@ -1,27 +1,64 @@
<script lang="ts">
type ScheduledPost = {
id: string;
platform: string;
title: string;
scheduledAt: string;
status: 'pending' | 'uploading' | 'published' | 'failed';
};
import { open } from '@tauri-apps/plugin-dialog';
import { scheduledStore, type ScheduledPost, type NewScheduledPost } from '$lib/stores/scheduled.svelte';
import { concreteDate } from '$lib/utils/time';
const posts = $state<ScheduledPost[]>([]);
let showForm = $state(false);
let selectedDate = $state<string | null>(null);
function formatDate(iso: string): string {
let newPost = $state<NewScheduledPost>({
platform: 'youtube',
title: '',
body: '',
video_path: null,
scheduled_at: '',
});
$effect(() => {
scheduledStore.fetch();
});
function toLocalDatetimeInput(iso?: string): string {
if (!iso) return '';
const d = new Date(iso);
return d.toLocaleString('en-GB', {
weekday: 'short',
day: 'numeric',
month: 'short',
hour: '2-digit',
minute: '2-digit',
});
if (Number.isNaN(d.getTime())) return '';
const pad = (n: number) => String(n).padStart(2, '0');
return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())}T${pad(d.getHours())}:${pad(d.getMinutes())}`;
}
function fromLocalDatetimeInput(value: string): string {
return new Date(value).toISOString();
}
async function attachVideo() {
const selected = await open({
multiple: false,
filters: [{ name: 'Video', extensions: ['mp4', 'mov', 'avi', 'mkv', 'webm'] }]
});
if (selected && typeof selected === 'string') {
newPost.video_path = selected;
}
}
async function handleSchedule() {
if (!newPost.title.trim() || !newPost.scheduled_at) return;
const created = await scheduledStore.schedule(newPost);
if (created) {
newPost = { platform: 'youtube', title: '', body: '', video_path: null, scheduled_at: '' };
showForm = false;
}
}
async function cancelPost(id: string) {
await scheduledStore.cancel(id);
}
function statusClass(status: ScheduledPost['status']): string {
if (status === 'published') return 'published';
if (status === 'failed') return 'failed';
if (status === 'cancelled') return 'cancelled';
return 'pending';
}
</script>
<div class="scheduler" aria-label="Post scheduler">
@ -34,13 +71,41 @@
<option value="instagram">Instagram</option>
<option value="tiktok">TikTok</option>
</select>
<button type="button" class="action-button" aria-label="Schedule new post">
<button type="button" class="action-button" aria-label="Schedule new post" onclick={() => { showForm = true; }}>
+ New post
</button>
</div>
</header>
{#if posts.length === 0}
{#if showForm}
<div class="schedule-form">
<div class="form-fields">
<select bind:value={newPost.platform} aria-label="Platform">
<option value="youtube">YouTube</option>
<option value="instagram">Instagram</option>
<option value="tiktok">TikTok</option>
</select>
<input type="text" placeholder="Title" bind:value={newPost.title} aria-label="Post title" />
<textarea placeholder="Caption / description" bind:value={newPost.body} rows={3} aria-label="Post body"></textarea>
<input type="datetime-local" bind:value={() => toLocalDatetimeInput(newPost.scheduled_at), (v) => newPost.scheduled_at = fromLocalDatetimeInput(v)} aria-label="Scheduled time" />
<div class="attach-row">
<button type="button" class="attach-button" onclick={attachVideo} aria-label="Attach video">
📎 {newPost.video_path ? newPost.video_path.split('/').pop() : 'Attach video'}
</button>
</div>
</div>
<div class="form-actions">
<button type="button" onclick={() => { showForm = false; }}>Cancel</button>
<button type="button" class="action-button" onclick={handleSchedule} disabled={!newPost.title.trim() || !newPost.scheduled_at}>Schedule</button>
</div>
</div>
{/if}
{#if scheduledStore.loading && scheduledStore.posts.length === 0}
<p class="empty-state">Loading scheduled posts…</p>
{:else if scheduledStore.error}
<p class="empty-state">{scheduledStore.error}</p>
{:else if scheduledStore.posts.length === 0}
<div class="empty-state">
<p>No posts scheduled yet. Create your first scheduled post to see it here.</p>
</div>
@ -75,17 +140,18 @@
<!-- Post queue -->
<div class="post-queue" role="list" aria-label="Scheduled posts">
{#each posts as post (post.id)}
{#each scheduledStore.posts as post (post.id)}
<article class="post-item" role="listitem">
<div class="post-platform-badge">{post.platform}</div>
<div class="post-details">
<h3>{post.title}</h3>
<p class="post-time">{formatDate(post.scheduledAt)}</p>
<p class="post-time">{concreteDate(post.scheduled_at)}</p>
</div>
<span class="post-status" class:status-class={true}>{post.status}</span>
<span class="post-status {statusClass(post.status)}">{post.status}</span>
<div class="post-actions">
<button type="button" aria-label="Edit post"></button>
<button type="button" aria-label="Delete post"></button>
{#if post.status === 'pending'}
<button type="button" aria-label="Cancel post" onclick={() => cancelPost(post.id)}>✕</button>
{/if}
</div>
</article>
{/each}
@ -171,6 +237,32 @@
.post-status.published { background: var(--success-dim, #1a3a1a); color: var(--success, #3fb950); }
.post-status.failed { background: var(--danger-dim, #3a1a1a); color: var(--danger, #f85149); }
.post-status.pending { background: var(--warning-dim, #3a2a1a); color: var(--warning, #d29922); }
.post-status.cancelled { opacity: 0.6; font-style: italic; }
.schedule-form {
background: var(--surface, #161b22); border: 1px solid var(--border, #30363d);
border-radius: 8px; padding: 1rem; margin-bottom: 1.5rem;
}
.form-fields { display: grid; gap: 0.75rem; }
.form-fields input, .form-fields select, .form-fields textarea {
background: var(--bg, #0d1117); border: 1px solid var(--border, #30363d);
color: var(--text, #c9d1d9); padding: 0.5rem 0.75rem; border-radius: 6px;
font: inherit; font-size: 0.9rem;
}
.attach-row { display: flex; align-items: center; }
.attach-button {
background: transparent; border: 1px dashed var(--border, #30363d);
color: var(--text-dim, #8b949e); padding: 0.4rem 0.75rem;
border-radius: 6px; cursor: pointer; font-size: 0.85rem;
}
.form-actions {
display: flex; justify-content: flex-end; gap: 0.5rem; margin-top: 1rem;
}
.form-actions button {
background: var(--surface, #161b22); border: 1px solid var(--border, #30363d);
color: var(--text, #c9d1d9); padding: 0.5rem 1rem; border-radius: 6px;
cursor: pointer; font-size: 0.85rem;
}
.form-actions button:disabled { opacity: 0.5; cursor: not-allowed; }
.post-actions { display: flex; gap: 0.5rem; }
.post-actions button {
background: none; border: none; color: var(--text-dim, #8b949e);

View File

@ -37,4 +37,6 @@ tauri-plugin-upload = "2.4.0"
tauri-plugin-global-shortcut = "2.3.2"
urlencoding = "2.1.3"
dotenvy = "0.15"
uuid = { version = "1.23.3", features = ["v4"] }
chrono = "0.4.45"

View File

@ -5,6 +5,8 @@ mod youtube_connector;
mod youtube_keyring;
mod youtube_oauth;
mod dom_inspector;
mod scheduled_posts;
mod scheduler_worker;
mod youtube_types;
fn main() {
@ -38,7 +40,14 @@ fn main() {
youtube_connector::get_youtube_channel,
youtube_connector::get_youtube_videos,
youtube_connector::upload_youtube_video,
scheduled_posts::list_scheduled_posts,
scheduled_posts::save_scheduled_post,
scheduled_posts::cancel_scheduled_post,
])
.setup(|app| {
scheduler_worker::start(app.handle().clone());
Ok(())
})
.run(tauri::generate_context!());
if let Err(e) = result {

View File

@ -0,0 +1,63 @@
use serde::{Deserialize, Serialize};
use tauri::command;
use uuid::Uuid;
use conductor_storage::{
list_scheduled_posts as list_scheduled, open as open_storage,
save_scheduled_post as save_scheduled_post_db, update_scheduled_post_status as update_status_db,
ScheduledPost, ScheduledPostStatus,
};
#[derive(Debug, Clone, Deserialize)]
pub struct NewScheduledPostPayload {
pub platform: String,
pub title: String,
pub body: String,
#[serde(default)]
pub video_path: Option<String>,
pub scheduled_at: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct ScheduledPostList {
pub posts: Vec<ScheduledPost>,
}
fn now_iso() -> String {
chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
}
#[command]
pub fn list_scheduled_posts() -> Result<ScheduledPostList, String> {
let conn = open_storage().map_err(|e| format!("Database error: {e}"))?;
let posts = list_scheduled(&conn).map_err(|e| format!("Database error: {e}"))?;
Ok(ScheduledPostList { posts })
}
#[command]
pub fn save_scheduled_post(payload: NewScheduledPostPayload) -> Result<ScheduledPost, String> {
let conn = open_storage().map_err(|e| format!("Database error: {e}"))?;
let now = now_iso();
let post = ScheduledPost {
id: Uuid::new_v4().to_string(),
platform: payload.platform,
title: payload.title,
body: payload.body,
video_path: payload.video_path,
scheduled_at: payload.scheduled_at,
status: ScheduledPostStatus::Pending,
error_message: None,
created_at: now.clone(),
updated_at: now,
};
save_scheduled_post_db(&conn, &post).map_err(|e| format!("Database error: {e}"))?;
Ok(post)
}
#[command]
pub fn cancel_scheduled_post(id: String) -> Result<(), String> {
let conn = open_storage().map_err(|e| format!("Database error: {e}"))?;
update_status_db(&conn, &id, ScheduledPostStatus::Cancelled, None)
.map_err(|e| format!("Database error: {e}"))?;
Ok(())
}

View File

@ -0,0 +1,67 @@
use tauri::{AppHandle, Emitter};
use tokio::time::{interval, Duration};
use conductor_storage::{
get_due_pending_posts, open as open_storage, update_scheduled_post_status, ScheduledPostStatus,
};
const TICK_INTERVAL: Duration = Duration::from_secs(30);
/// Start the background worker that checks for due scheduled posts.
pub fn start(app: AppHandle) {
tauri::async_runtime::spawn(worker_loop(app));
}
async fn worker_loop(app: AppHandle) {
let mut ticker = interval(TICK_INTERVAL);
loop {
ticker.tick().await;
if let Err(e) = check_due_posts(&app).await {
eprintln!("Scheduler worker error: {e}");
}
}
}
async fn check_due_posts(app: &AppHandle) -> Result<(), String> {
let conn = open_storage().map_err(|e| format!("DB error: {e}"))?;
let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let due = get_due_pending_posts(&conn, &now).map_err(|e| format!("DB error: {e}"))?;
for post in due {
update_scheduled_post_status(&conn, &post.id, ScheduledPostStatus::Publishing, None)
.map_err(|e| format!("DB error: {e}"))?;
let result = execute_publish(&post).await;
let (status, error_message) = match result {
Ok(_) => (ScheduledPostStatus::Published, None),
Err(msg) => (ScheduledPostStatus::Failed, Some(msg.clone())),
};
let err_ref = error_message.as_deref();
update_scheduled_post_status(&conn, &post.id, status, err_ref)
.map_err(|e| format!("DB error: {e}"))?;
let _ = app.emit("scheduled-post-updated", &post.id);
}
Ok(())
}
async fn execute_publish(post: &conductor_storage::ScheduledPost) -> Result<(), String> {
match post.platform.as_str() {
"youtube" => {
if let Some(video_path) = &post.video_path {
crate::youtube_connector::upload_youtube_video(
video_path.clone(),
post.title.clone(),
post.body.clone(),
)
.await
.map(|_| ())
} else {
Err("YouTube posts require a video path.".to_string())
}
}
_ => Err(format!("Platform {} not yet supported", post.platform)),
}
}