rsiot/executor/
msgbus_input.rs1use tokio::sync::broadcast;
2use tracing::warn;
3use uuid::Uuid;
4
5use crate::message::{Message, MsgDataBound};
6
7use super::ComponentError;
8
9pub struct MsgBusInput<TMsg>
11where
12 TMsg: MsgDataBound,
13{
14 input: broadcast::Receiver<Message<TMsg>>,
15 name: String,
16 id: Uuid,
17}
18
19impl<TMsg> MsgBusInput<TMsg>
20where
21 TMsg: MsgDataBound,
22{
23 pub(crate) fn new(
24 input: broadcast::Receiver<Message<TMsg>>,
25 name: impl Into<String>,
26 id: Uuid,
27 ) -> Self {
28 Self {
29 input,
30 name: name.into(),
31 id,
32 }
33 }
34
35 pub async fn recv(&mut self) -> Result<Message<TMsg>, ComponentError> {
37 loop {
38 let msg = self.input.recv().await;
39
40 let msg = match msg {
41 Ok(v) => v,
42 Err(err) => {
43 warn!(
44 "MsgBusInput.recv_input() of component {} input error: {}",
45 self.name, err
46 );
47 continue;
48 }
49 };
50
51 return Ok(msg);
57 }
58 }
59}
60
61impl<TMsg> Clone for MsgBusInput<TMsg>
62where
63 TMsg: MsgDataBound,
64{
65 fn clone(&self) -> Self {
66 Self {
67 input: self.input.resubscribe(),
68 name: self.name.clone(),
69 id: self.id,
70 }
71 }
72}