websocket.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. use async_trait::async_trait;
  2. use drama::runtime::Runtime;
  3. use drama::ws::{WebsocketRuntime, WsActor};
  4. use drama::{Actor, ActorHandle, Error, Handler};
  5. use futures::stream::{SplitSink, SplitStream};
  6. use futures::StreamExt;
  7. use std::collections::HashMap;
  8. use std::sync::atomic::AtomicUsize;
  9. use std::sync::{Arc, RwLock};
  10. use tokio::sync::Mutex;
  11. use warp::ws::{Message, WebSocket};
  12. use warp::Filter;
  13. type Arbiter = Arc<RwLock<HashMap<usize, ActorHandle<WebsocketActor>>>>;
  14. static ID: AtomicUsize = AtomicUsize::new(0);
  15. struct WebsocketActor {
  16. websocket: Option<WebSocket>,
  17. hello: ActorHandle<Hello>,
  18. }
  19. impl WebsocketActor {
  20. fn new(ws: WebSocket, handle: ActorHandle<Hello>) -> Self {
  21. Self {
  22. websocket: Some(ws),
  23. hello: handle,
  24. }
  25. }
  26. }
  27. impl Actor for WebsocketActor {
  28. fn start(self) -> ActorHandle<Self> {
  29. WebsocketRuntime::run(self)
  30. }
  31. }
  32. impl WsActor<Message, SplitStream<WebSocket>, SplitSink<WebSocket, Message>> for WebsocketActor {
  33. type Error = warp::Error;
  34. fn websocket(&mut self) -> (SplitSink<WebSocket, Message>, SplitStream<WebSocket>) {
  35. self.websocket
  36. .take()
  37. .expect("Websocket already split")
  38. .split()
  39. }
  40. }
  41. #[async_trait]
  42. impl Handler<Message> for WebsocketActor {
  43. type Response = Option<Message>;
  44. async fn handle(
  45. this: Arc<Mutex<Self>>,
  46. message: Box<Message>,
  47. ) -> Result<Self::Response, Error> {
  48. this.lock()
  49. .await
  50. .hello
  51. .send(crate::Msg {
  52. _content: message.to_str().unwrap().to_owned(),
  53. })
  54. .unwrap_or_else(|e| println!("{e}"));
  55. Ok(Some(*message.clone()))
  56. }
  57. }
  58. struct Hello {}
  59. impl Actor for Hello {}
  60. struct Msg {
  61. pub _content: String,
  62. }
  63. #[async_trait]
  64. impl Handler<Msg> for Hello {
  65. type Response = usize;
  66. async fn handle(_: Arc<Mutex<Self>>, _: Box<Msg>) -> Result<usize, Error> {
  67. println!("Handling message Hello");
  68. Ok(10)
  69. }
  70. }
  71. #[tokio::main]
  72. async fn main() {
  73. let pool = Arc::new(RwLock::new(HashMap::new()));
  74. let pool = warp::any().map(move || pool.clone());
  75. let hello = Hello {}.start();
  76. let hello = warp::any().map(move || hello.clone());
  77. // GET /chat -> websocket upgrade
  78. let chat = warp::path("chat")
  79. // The `ws()` filter will prepare Websocket handshake...
  80. .and(warp::ws())
  81. .and(pool)
  82. .and(hello)
  83. .map(
  84. |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle<Hello>| {
  85. // This will call our function if the handshake succeeds.
  86. ws.on_upgrade(|socket| async move {
  87. let actor = WebsocketActor::new(socket, hello);
  88. let handle = actor.start();
  89. let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  90. println!("Adding actor {id}");
  91. pool.write().unwrap().insert(id, handle);
  92. })
  93. },
  94. );
  95. // GET / -> index html
  96. let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
  97. let routes = index.or(chat);
  98. warp::serve(routes).run(([127, 0, 0, 1], 3030)).await
  99. }
  100. static INDEX_HTML: &str = r#"<!DOCTYPE html>
  101. <html lang="en">
  102. <head>
  103. <title>Warp Chat</title>
  104. </head>
  105. <body>
  106. <h1>Warp chat</h1>
  107. <div id="chat">
  108. <p><em>Connecting...</em></p>
  109. </div>
  110. <input type="text" id="text" />
  111. <button type="button" id="send">Send</button>
  112. <script type="text/javascript">
  113. const chat = document.getElementById('chat');
  114. const text = document.getElementById('text');
  115. const uri = 'ws://' + location.host + '/chat';
  116. const ws = new WebSocket(uri);
  117. let num = 0;
  118. function message(data) {
  119. if (num % 10000 === 0) chat.innerHTML = `${num}`
  120. }
  121. ws.onopen = function() {
  122. chat.innerHTML = '<p><em>Connected!</em></p>';
  123. };
  124. ws.onmessage = function(msg) {
  125. console.log(msg)
  126. num += 1;
  127. message(msg.data);
  128. };
  129. ws.onclose = function() {
  130. chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
  131. };
  132. send.onclick = function() {
  133. const msg = text.value;
  134. let i = 0;
  135. while (i < 100000) {
  136. ws.send(msg);
  137. i += 1;
  138. }
  139. text.value = '';
  140. message('<You>: ' + msg);
  141. };
  142. </script>
  143. </body>
  144. </html>
  145. "#;