runtime.rs 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. use crate::{message::MailboxReceiver, Actor, ActorCommand};
  2. use flume::{r#async::RecvStream, Receiver};
  3. use futures::StreamExt;
  4. pub const QUEUE_CAPACITY: usize = 128;
  5. pub struct ActorRuntime<A>
  6. where
  7. A: Actor + Send + 'static,
  8. {
  9. actor: A,
  10. command_stream: RecvStream<'static, ActorCommand>,
  11. mailbox: MailboxReceiver<A>,
  12. }
  13. impl<A> ActorRuntime<A>
  14. where
  15. A: Actor + 'static + Send,
  16. {
  17. pub fn new(
  18. actor: A,
  19. command_rx: Receiver<ActorCommand>,
  20. message_rx: MailboxReceiver<A>,
  21. ) -> Self {
  22. println!("Building default runtime");
  23. Self {
  24. actor,
  25. command_stream: command_rx.into_stream(),
  26. mailbox: message_rx,
  27. }
  28. }
  29. pub async fn runt(mut self) {
  30. loop {
  31. tokio::select! {
  32. Some(command) = self.command_stream.next() => {
  33. match command {
  34. ActorCommand::Stop => {
  35. println!("actor stopping");
  36. return
  37. },
  38. }
  39. }
  40. message = self.mailbox.recv_async() => {
  41. if let Ok(mut message) = message {
  42. message.handle(&mut self.actor).await
  43. } else {
  44. break;
  45. }
  46. }
  47. }
  48. }
  49. println!("actor stopping");
  50. }
  51. }