In the previous example we've added auth to the todo PWA. In this one we'll make all the todos public and provide real-time updates to the clients about every change in them. We'll add a new dependency here - async-broadcast which provides a simple mechanism to share changes with multiple streams:

Cargo.toml:9

...
async-broadcast = "0.6"
...

Besides this manifest remains the same as well as build script and library so their contents are at the bottom.

Until now we've changed the state of the clients only based on their requests and it made sense, but now we'll update the todo list based on server sent events initiated by other users adding or modifying their todos. Render method of our todo won't be based on the Render trait anymore because it will need additional user data to disable controls for non-owners. Also, we won't be returning markup from add, toggle and delete handlers anymore but instead use them to modify the data accordingly and broadcast the changes to all active clients:

src/main.rs

use async_broadcast::{broadcast, Receiver, Sender};
use prest::*;
use todo_pwa_auth_sync::{into_page, shared_routes};

embed_build_output_as!(BuiltAssets);

state!(BROADCAST: (Sender<BroadcastMsg>, Receiver<BroadcastMsg>) = { broadcast(1000) });

#[derive(Clone)]
struct BroadcastMsg {
    pub event: String,
    pub data: Option<Todo>,
}

#[derive(Table, Clone, Default, serde::Serialize, serde::Deserialize)]
#[serde(default)]
struct Todo {
    #[serde(default = "Uuid::new_v4")]
    pub id: Uuid,
    #[serde(default)]
    pub owner: Uuid,
    pub task: String,
    pub done: bool,
}

fn main() {
    Todo::migrate();
    shared_routes()
        .route("/todos", get(todos).put(add).patch(toggle).delete(delete))
        .wrap_non_htmx(into_page)
        .route("/todos/subscribe", get(todos_subscribe))
        .embed(BuiltAssets)
        .run();
}

impl Todo {
    fn render(&self, user: &Option<User>) -> Markup {
        let owned = match user {
            Some(user) => user.id == self.owner,
            None => false,
        };
        html! {
            ."flex items-center" sse-swap=(self.id) hx-swap="outerHTML" hx-vals=(json!(self)) {
                input."toggle toggle-primary" type="checkbox" hx-patch="/todos" disabled[!owned] checked[self.done] {}
                label."ml-4 text-lg" {(self.task)}
                button."btn btn-ghost ml-auto"  hx-delete="/todos" disabled[!owned] {"Delete"}
            }
        }
    }
}

async fn todos(auth: Auth) -> Markup {
    html!(
        @if auth.user.is_some() {
            form hx-put="/todos" hx-swap="none" hx-on--after-request="this.reset()" {
                input."input input-bordered input-primary" type="text" name="task" {}
                button."btn btn-outline btn-primary ml-4" type="submit" {"Add"}
            }
        } @else {
            @if *WITH_GOOGLE_AUTH {
                a."btn btn-ghost" href="/auth/google" {"Login with Google"}
                ."divider" {"OR"}
            }
            ."flex" {
                ."bg-base-100 border-base-300 rounded-box p-6" {
                    form."flex flex-col gap-4 items-center" method="POST" action="/auth/username_password/signin" {
                        input."input input-bordered input-primary" type="text" name="username" placeholder="username" {}
                        input."input input-bordered input-primary" type="password" name="password" placeholder="password" {}
                        button."btn btn-outline btn-primary ml-4" type="submit" {"Sign in"}
                    }
                }
                ."divider divider-horizontal" {}
                ."bg-base-100 border-base-300 rounded-box p-6" {
                    form."flex flex-col gap-4 items-center" method="POST" action="/auth/username_password/signup" {
                        input."input input-bordered input-primary" type="text" name="username" placeholder="username" {}
                        input."input input-bordered input-primary" type="password" name="password" placeholder="password" {}
                        button."btn btn-outline btn-primary ml-4" type="submit" {"Sign up"}
                    }
                }
            }
        }
        #"todos" ."w-full" hx-ext="sse" sse-connect="/todos/subscribe" sse-swap="add" hx-swap="beforeend" {
            @for item in Todo::find_all() {(item.render(&auth.user))}
        }
    )
}

async fn todos_subscribe(auth: Auth) -> Sse<impl Stream<Item = SseItem>> {
    let stream = BROADCAST.1.new_receiver().map(move |msg| {
        let data = match msg.data {
            Some(todo) => todo.render(&auth.user).0,
            None => "".to_owned(),
        };
        SseEvent::default().event(msg.event.as_str()).data(data)
    });
    Sse::new(stream.map(Ok)).keep_alive(SseKeepAlive::default())
}

async fn add(user: User, Form(mut todo): Form<Todo>) -> StatusCode {
    todo.owner = user.id;
    let Ok(_) = todo.save() else {
        return StatusCode::INTERNAL_SERVER_ERROR;
    };
    let Ok(_) = BROADCAST
        .0
        .broadcast_direct(BroadcastMsg {
            event: "add".to_owned(),
            data: Some(todo),
        })
        .await
    else {
        return StatusCode::INTERNAL_SERVER_ERROR;
    };
    StatusCode::OK
}

async fn toggle(user: User, Form(mut todo): Form<Todo>) -> StatusCode {
    if todo.owner != user.id {
        return StatusCode::UNAUTHORIZED;
    }
    let Ok(_) = todo.update_done(!todo.done) else {
        return StatusCode::INTERNAL_SERVER_ERROR;
    };

    let Ok(_) = BROADCAST
        .0
        .broadcast_direct(BroadcastMsg {
            event: todo.id.to_string(),
            data: Some(todo),
        })
        .await
    else {
        return StatusCode::INTERNAL_SERVER_ERROR;
    };
    StatusCode::OK
}
async fn delete(user: User, Form(todo): Form<Todo>) -> StatusCode {
    if todo.owner != user.id {
        return StatusCode::UNAUTHORIZED;
    }
    let Ok(_) = todo.remove() else {
        return StatusCode::INTERNAL_SERVER_ERROR;
    };
    let Ok(_) = BROADCAST
        .0
        .broadcast_direct(BroadcastMsg {
            event: todo.id.to_string(),
            data: None,
        })
        .await
    else {
        return StatusCode::INTERNAL_SERVER_ERROR;
    };
    StatusCode::OK
}

We're using the htmx's sse extension which allows us to easily swap events payloads into the right places based on their names. It starts with the hx-ext="sse" and sse-connect="/todos/subscribe" attributes which activate the extension and connect to the specified route to listen for events. Then sse-swap="EVENT_NAME" attributes can be used on it and its children to listen to events with specified names and swap if got any. In this case we're using add to append and todo's id as names to make sure events reach the right places.

Now we have an installable collaborative real-time full-stack app! No react or another frontend framework involved and without even writing js. This is the end(for now) of the tutorials series, but you can also check out other examples from the menu.

Remaining code used in this example:

Cargo.toml

[package]
name = "todo-pwa-auth-sync"
edition = "2021"

[dependencies]
prest = { version = "0.2", features = ["auth"] }
serde = { version = "1", features = ["derive"] }
wasm-bindgen = "0.2"
async-broadcast = "0.6"

[build-dependencies]
prest-build = "0.2"

build.rs

use prest_build::*;
fn main() {
    build_pwa(PWAOptions::default()).unwrap();
}

src/lib.rs

use prest::*;

pub fn shared_routes() -> Router {
    route("/", get(home))
}

async fn home() -> Markup {
    into_page(html!(
        span."loading loading-spinner loading-lg" hx-get="/todos" hx-trigger="load" hx-swap="outerHTML" hx-push-url="true"
            hx-on--after-request="if (!event.detail.successful) { document.getElementById('alert').style.display = 'flex'; this.remove() }" {}
        div #"alert" role="alert" class="alert alert-error justify-center" style="display: none;" {"Couldn't fetch the todos :("}
    ))
    .await
}

pub async fn into_page(content: Markup) -> Markup {
    html! { html data-theme="dark" { 
        (Head::with_title("Todo app"))
        body."max-w-screen-sm mx-auto mt-12 flex flex-col items-center" {
            (content)
            (Scripts::default())
        }
    }}
}

#[cfg(target_arch = "wasm32")]
#[wasm_bindgen(start)]
pub fn main() {
    shared_routes().handle_fetch_events()
}