websocket.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. use async_trait::async_trait;
  2. use drama::{Actor, ActorHandle, Handler, Relay, RelayActor};
  3. use flume::Sender;
  4. use futures::stream::SplitStream;
  5. use futures::StreamExt;
  6. use parking_lot::RwLock;
  7. use std::collections::HashMap;
  8. use std::sync::atomic::AtomicUsize;
  9. use std::sync::Arc;
  10. use warp::ws::{Message, WebSocket};
  11. use warp::Filter;
  12. struct WebsocketActor {
  13. hello: ActorHandle<Hello>,
  14. tx: Sender<Message>,
  15. }
  16. impl WebsocketActor {
  17. fn new(handle: ActorHandle<Hello>, tx: Sender<Message>) -> Self {
  18. Self { hello: handle, tx }
  19. }
  20. }
  21. impl Actor for WebsocketActor {}
  22. impl RelayActor<Message, SplitStream<WebSocket>> for WebsocketActor {
  23. type Error = warp::Error;
  24. }
  25. #[async_trait]
  26. impl Relay<Message> for WebsocketActor {
  27. async fn process(&mut self, message: Message) -> Option<Message> {
  28. self.hello
  29. .send(crate::Msg {
  30. _content: message.to_str().unwrap().to_owned(),
  31. })
  32. .unwrap_or_else(|e| tracing::trace!("FUKEN HELL M8 {e}"));
  33. self.tx.send(message.clone()).unwrap();
  34. Some(message)
  35. }
  36. }
  37. struct Hello {}
  38. impl Actor for Hello {}
  39. #[derive(Clone)]
  40. struct Msg {
  41. pub _content: String,
  42. }
  43. #[async_trait]
  44. impl Handler<Msg> for Hello {
  45. type Response = usize;
  46. async fn handle(&mut self, _: &Msg) -> usize {
  47. 10
  48. }
  49. }
  50. type Arbiter = Arc<RwLock<HashMap<usize, ActorHandle<WebsocketActor>>>>;
  51. static ID: AtomicUsize = AtomicUsize::new(0);
  52. #[tokio::main]
  53. async fn main() {
  54. let pool = Arc::new(RwLock::new(HashMap::new()));
  55. let pool = warp::any().map(move || pool.clone());
  56. let hello = Hello {}.start();
  57. let hello = warp::any().map(move || hello.clone());
  58. // GET /chat -> websocket upgrade
  59. let chat = warp::path("chat")
  60. // The `ws()` filter will prepare Websocket handshake...
  61. .and(warp::ws())
  62. .and(pool)
  63. .and(hello)
  64. .map(
  65. |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle<Hello>| {
  66. // This will call our function if the handshake succeeds.
  67. ws.on_upgrade(|socket| async move {
  68. let (si, st) = socket.split();
  69. let (tx, rx) = flume::unbounded();
  70. let actor = WebsocketActor::new(hello, tx.clone());
  71. let handle = actor.start_relay(st, tx);
  72. tokio::spawn(rx.into_stream().map(Ok).forward(si));
  73. let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  74. tracing::trace!("Adding actor {id}");
  75. pool.write().insert(id, handle);
  76. })
  77. },
  78. );
  79. // GET / -> index html
  80. let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
  81. let routes = index.or(chat);
  82. warp::serve(routes).run(([127, 0, 0, 1], 3030)).await
  83. }
  84. static INDEX_HTML: &str = r#"<!DOCTYPE html>
  85. <html lang="en">
  86. <head>
  87. <title>Warp Chat</title>
  88. </head>
  89. <body>
  90. <h1>Warp chat</h1>
  91. <div id="chat">
  92. <p><em>Connecting...</em></p>
  93. </div>
  94. <input type="text" id="text" />
  95. <button type="button" id="send">Send</button>
  96. <script type="text/javascript">
  97. const chat = document.getElementById('chat');
  98. const text = document.getElementById('text');
  99. const uri = 'ws://' + location.host + '/chat';
  100. const ws = new WebSocket(uri);
  101. let num = 0;
  102. function message(data) {
  103. if (num % 10000 === 0) chat.innerHTML = `${num}`
  104. }
  105. ws.onopen = function() {
  106. chat.innerHTML = '<p><em>Connected!</em></p>';
  107. };
  108. ws.onmessage = function(msg) {
  109. console.log(msg)
  110. num += 1;
  111. message(msg.data);
  112. };
  113. ws.onclose = function() {
  114. chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
  115. };
  116. send.onclick = function() {
  117. const msg = text.value;
  118. let i = 0;
  119. while (i < 10000) {
  120. ws.send(msg);
  121. i += 1;
  122. }
  123. // text.value = '';
  124. message('<You>: ' + msg);
  125. };
  126. </script>
  127. </body>
  128. </html>
  129. "#;