websocket.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. use async_trait::async_trait;
  2. use drama::relay::{Relay, RelayActor};
  3. use drama::{Actor, ActorHandle, Error, Handler};
  4. use futures::stream::SplitStream;
  5. use futures::StreamExt;
  6. use parking_lot::RwLock;
  7. use std::collections::HashMap;
  8. use std::sync::atomic::AtomicUsize;
  9. use std::sync::Arc;
  10. use warp::ws::{Message, WebSocket};
  11. use warp::Filter;
  12. struct WebsocketActor {
  13. hello: ActorHandle<Hello>,
  14. }
  15. impl WebsocketActor {
  16. fn new(handle: ActorHandle<Hello>) -> Self {
  17. Self { hello: handle }
  18. }
  19. }
  20. impl Actor for WebsocketActor {}
  21. impl RelayActor<Message, SplitStream<WebSocket>> for WebsocketActor {
  22. type Error = warp::Error;
  23. }
  24. #[async_trait]
  25. impl Relay<Message> for WebsocketActor {
  26. async fn handle(&mut self, message: Message) -> Result<Option<Message>, Error> {
  27. self.hello
  28. .send(crate::Msg {
  29. _content: message.to_str().unwrap().to_owned(),
  30. })
  31. .unwrap_or_else(|e| println!("FUKEN HELL M8 {e}"));
  32. Ok(Some(message))
  33. }
  34. }
  35. struct Hello {}
  36. impl Actor for Hello {}
  37. #[derive(Clone)]
  38. struct Msg {
  39. pub _content: String,
  40. }
  41. #[async_trait]
  42. impl Handler<Msg> for Hello {
  43. type Response = usize;
  44. async fn handle(&mut self, _: Msg) -> Result<usize, Error> {
  45. Ok(10)
  46. }
  47. }
  48. type Arbiter = Arc<RwLock<HashMap<usize, ActorHandle<WebsocketActor>>>>;
  49. static ID: AtomicUsize = AtomicUsize::new(0);
  50. #[tokio::main]
  51. async fn main() {
  52. let pool = Arc::new(RwLock::new(HashMap::new()));
  53. let pool = warp::any().map(move || pool.clone());
  54. let hello = Hello {}.start();
  55. let hello = warp::any().map(move || hello.clone());
  56. // GET /chat -> websocket upgrade
  57. let chat = warp::path("chat")
  58. // The `ws()` filter will prepare Websocket handshake...
  59. .and(warp::ws())
  60. .and(pool)
  61. .and(hello)
  62. .map(
  63. |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle<Hello>| {
  64. // This will call our function if the handshake succeeds.
  65. ws.on_upgrade(|socket| async move {
  66. let (si, st) = socket.split();
  67. let (tx, rx) = flume::unbounded();
  68. let actor = WebsocketActor::new(hello);
  69. let handle = actor.start_relay(st, tx);
  70. tokio::spawn(rx.into_stream().map(Ok).forward(si));
  71. let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  72. println!("Adding actor {id}");
  73. pool.write().insert(id, handle);
  74. })
  75. },
  76. );
  77. // GET / -> index html
  78. let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
  79. let routes = index.or(chat);
  80. warp::serve(routes).run(([127, 0, 0, 1], 3030)).await
  81. }
  82. static INDEX_HTML: &str = r#"<!DOCTYPE html>
  83. <html lang="en">
  84. <head>
  85. <title>Warp Chat</title>
  86. </head>
  87. <body>
  88. <h1>Warp chat</h1>
  89. <div id="chat">
  90. <p><em>Connecting...</em></p>
  91. </div>
  92. <input type="text" id="text" />
  93. <button type="button" id="send">Send</button>
  94. <script type="text/javascript">
  95. const chat = document.getElementById('chat');
  96. const text = document.getElementById('text');
  97. const uri = 'ws://' + location.host + '/chat';
  98. const ws = new WebSocket(uri);
  99. let num = 0;
  100. function message(data) {
  101. if (num % 10000 === 0) chat.innerHTML = `${num}`
  102. }
  103. ws.onopen = function() {
  104. chat.innerHTML = '<p><em>Connected!</em></p>';
  105. };
  106. ws.onmessage = function(msg) {
  107. console.log(msg)
  108. num += 1;
  109. message(msg.data);
  110. };
  111. ws.onclose = function() {
  112. chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
  113. };
  114. send.onclick = function() {
  115. const msg = text.value;
  116. let i = 0;
  117. while (i < 100000) {
  118. ws.send(msg);
  119. i += 1;
  120. }
  121. // text.value = '';
  122. message('<You>: ' + msg);
  123. };
  124. </script>
  125. </body>
  126. </html>
  127. "#;