use async_trait::async_trait; use drama::runtime::Runtime; use drama::ws::{WebsocketRuntime, WsActor}; use drama::{Actor, ActorHandle, Error, Handler}; use futures::stream::{SplitSink, SplitStream}; use futures::StreamExt; use std::collections::HashMap; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; use tokio::sync::Mutex; use warp::ws::{Message, WebSocket}; use warp::Filter; type Arbiter = Arc>>>; static ID: AtomicUsize = AtomicUsize::new(0); struct WebsocketActor { websocket: Option, hello: ActorHandle, } impl WebsocketActor { fn new(ws: WebSocket, handle: ActorHandle) -> Self { Self { websocket: Some(ws), hello: handle, } } } impl Actor for WebsocketActor { fn start(self) -> ActorHandle { WebsocketRuntime::run(self) } } impl WsActor, SplitSink> for WebsocketActor { type Error = warp::Error; fn websocket(&mut self) -> (SplitSink, SplitStream) { self.websocket .take() .expect("Websocket already split") .split() } } #[async_trait] impl Handler for WebsocketActor { type Response = Option; async fn handle( this: Arc>, message: Box, ) -> Result { this.lock() .await .hello .send(crate::Msg { _content: message.to_str().unwrap().to_owned(), }) .unwrap_or_else(|e| println!("{e}")); Ok(Some(*message.clone())) } } struct Hello {} impl Actor for Hello {} struct Msg { pub _content: String, } #[async_trait] impl Handler for Hello { type Response = usize; async fn handle(_: Arc>, _: Box) -> Result { println!("Handling message Hello"); Ok(10) } } #[tokio::main] async fn main() { let pool = Arc::new(RwLock::new(HashMap::new())); let pool = warp::any().map(move || pool.clone()); let hello = Hello {}.start(); let hello = warp::any().map(move || hello.clone()); // GET /chat -> websocket upgrade let chat = warp::path("chat") // The `ws()` filter will prepare Websocket handshake... .and(warp::ws()) .and(pool) .and(hello) .map( |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle| { // This will call our function if the handshake succeeds. ws.on_upgrade(|socket| async move { let actor = WebsocketActor::new(socket, hello); let handle = actor.start(); let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); println!("Adding actor {id}"); pool.write().unwrap().insert(id, handle); }) }, ); // GET / -> index html let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML)); let routes = index.or(chat); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await } static INDEX_HTML: &str = r#" Warp Chat

Warp chat

Connecting...

"#;