123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- use actors::ws::WebsocketActor;
- use actors::Actor;
- use warp::Filter;
- #[tokio::main]
- async fn main() {
- // GET /chat -> websocket upgrade
- let chat = warp::path("chat")
- // The `ws()` filter will prepare Websocket handshake...
- .and(warp::ws())
- .map(|ws: warp::ws::Ws| {
- // This will call our function if the handshake succeeds.
- ws.on_upgrade(move |socket| {
- let actor = WebsocketActor::new(socket);
- actor.start();
- futures::future::ready(())
- })
- });
- // GET / -> index html
- let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
- let routes = index.or(chat);
- warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
- }
- /*
- async fn user_connected(ws: WebSocket, users: Users) {
- // Use a counter to assign a new unique ID for this user.
- let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
- eprintln!("new chat user: {}", my_id);
- // Split the socket into a sender and receive of messages.
- let (mut user_ws_tx, mut user_ws_rx) = ws.split();
- // Use an unbounded channel to handle buffering and flushing of messages
- // to the websocket...
- let (tx, rx) = mpsc::unbounded_channel();
- let mut rx = UnboundedReceiverStream::new(rx);
- tokio::task::spawn(async move {
- while let Some(message) = rx.next().await {
- user_ws_tx
- .send(message)
- .unwrap_or_else(|e| {
- eprintln!("websocket send error: {}", e);
- })
- .await;
- }
- });
- // Save the sender in our list of connected users.
- users.write().await.insert(my_id, tx);
- // Return a `Future` that is basically a state machine managing
- // this specific user's connection.
- // Every time the user sends a message, broadcast it to
- // all other users...
- while let Some(result) = user_ws_rx.next().await {
- let msg = match result {
- Ok(msg) => {
- println!("MESSAGE {msg:?}");
- msg
- }
- Err(e) => {
- eprintln!("websocket error(uid={}): {}", my_id, e);
- break;
- }
- };
- // Skip any non-Text messages...
- let msg = if let Ok(s) = msg.to_str() {
- s
- } else {
- return;
- };
- let new_msg = format!("<User#{}>: {}", my_id, msg);
- // New message from this user, send it to everyone else (except same uid)...
- for (&uid, tx) in users.read().await.iter() {
- if my_id != uid {
- if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
- // The tx is disconnected, our `user_disconnected` code
- // should be happening in another task, nothing more to
- // do here.
- }
- }
- }
- }
- // user_ws_rx stream will keep processing as long as the user stays
- // connected. Once they disconnect, then...
- user_disconnected(my_id, &users).await;
- }
- */
- static INDEX_HTML: &str = r#"<!DOCTYPE html>
- <html lang="en">
- <head>
- <title>Warp Chat</title>
- </head>
- <body>
- <h1>Warp chat</h1>
- <div id="chat">
- <p><em>Connecting...</em></p>
- </div>
- <input type="text" id="text" />
- <button type="button" id="send">Send</button>
- <script type="text/javascript">
- const chat = document.getElementById('chat');
- const text = document.getElementById('text');
- const uri = 'ws://' + location.host + '/chat';
- const ws = new WebSocket(uri);
- function message(data) {
- const line = document.createElement('p');
- line.innerText = data;
- chat.appendChild(line);
- }
- ws.onopen = function() {
- chat.innerHTML = '<p><em>Connected!</em></p>';
- };
- ws.onmessage = function(msg) {
- message(msg.data);
- };
- ws.onclose = function() {
- chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
- };
- send.onclick = function() {
- const msg = text.value;
- ws.send(msg);
- text.value = '';
- message('<You>: ' + msg);
- };
- </script>
- </body>
- </html>
- "#;
|