Skip to main content

rsiot/executor/
msgbus_input.rs

1use tokio::sync::broadcast;
2use tracing::warn;
3use uuid::Uuid;
4
5use crate::message::{Message, MsgDataBound};
6
7use super::ComponentError;
8
9/// Шина MsgBus - получение входящих сообщений
10pub struct MsgBusInput<TMsg>
11where
12    TMsg: MsgDataBound,
13{
14    input: broadcast::Receiver<Message<TMsg>>,
15    name: String,
16    id: Uuid,
17}
18
19impl<TMsg> MsgBusInput<TMsg>
20where
21    TMsg: MsgDataBound,
22{
23    pub(crate) fn new(
24        input: broadcast::Receiver<Message<TMsg>>,
25        name: impl Into<String>,
26        id: Uuid,
27    ) -> Self {
28        Self {
29            input,
30            name: name.into(),
31            id,
32        }
33    }
34
35    /// Получение входящих сообщений
36    pub async fn recv(&mut self) -> Result<Message<TMsg>, ComponentError> {
37        loop {
38            let msg = self.input.recv().await;
39
40            let msg = match msg {
41                Ok(v) => v,
42                Err(err) => {
43                    warn!(
44                        "MsgBusInput.recv_input() of component {} input error: {}",
45                        self.name, err
46                    );
47                    continue;
48                }
49            };
50
51            // Если данное сообщение было сгенерировано данным сервисом, пропускаем
52            // if msg.check_source(&self.id) {
53            //     continue;
54            // }
55
56            return Ok(msg);
57        }
58    }
59}
60
61impl<TMsg> Clone for MsgBusInput<TMsg>
62where
63    TMsg: MsgDataBound,
64{
65    fn clone(&self) -> Self {
66        Self {
67            input: self.input.resubscribe(),
68            name: self.name.clone(),
69            id: self.id,
70        }
71    }
72}