Skip to main content

rsiot/executor/
msgbus_linker.rs

1use std::fmt::Debug;
2
3use tracing::{error, info};
4use uuid::Uuid;
5
6use crate::message::{AuthPermissions, MsgDataBound};
7
8use super::{
9    MsgBusInput, MsgBusOutput,
10    types::{CmpInput, CmpOutput, FnAuth},
11};
12
13/// Подключение компонента к внутренней шине сообщений исполнителя
14#[derive(Debug)]
15pub struct MsgBusLinker<TMsg>
16where
17    TMsg: MsgDataBound,
18{
19    input: CmpInput<TMsg>,
20    output: CmpOutput<TMsg>,
21    name: String,
22    id: Uuid,
23    auth_perm: AuthPermissions,
24    fn_auth: FnAuth<TMsg>,
25}
26
27impl<TMsg> MsgBusLinker<TMsg>
28where
29    TMsg: MsgDataBound,
30{
31    /// Создание подключения к внутренней шине сообщений исполнителя
32    pub fn new(
33        input: CmpInput<TMsg>,
34        output: CmpOutput<TMsg>,
35        auth_perm: AuthPermissions,
36        fn_auth: FnAuth<TMsg>,
37    ) -> Self {
38        Self {
39            input,
40            output,
41            id: Uuid::default(),
42            name: "".to_string(),
43            auth_perm,
44            fn_auth,
45        }
46    }
47
48    /// Инициализация шины сообщений с новым идентификатором и именем
49    pub fn init(mut self, name: &str) -> Self {
50        let id = Uuid::new_v4();
51        self.id = id;
52        self.name = name.into();
53        info!("Start: {}, id: {}", name, id);
54        self
55    }
56
57    /// Канал входящих сообщений
58    pub fn input(&self) -> MsgBusInput<TMsg> {
59        if self.name.is_empty() {
60            error!("Component name is empty");
61            panic!("Component name is empty");
62        }
63        MsgBusInput::new(self.input.resubscribe(), self.name.clone(), self.id)
64    }
65
66    /// Канал исходящих сообщений
67    pub fn output(&self) -> MsgBusOutput<TMsg> {
68        if self.name.is_empty() {
69            error!("Component name is empty");
70            panic!("Component name is empty");
71        }
72        MsgBusOutput::new(self.output.clone(), self.id)
73    }
74
75    /// Каналы входящих сообщений и исходящих сообщений
76    pub fn input_output(&self) -> (MsgBusInput<TMsg>, MsgBusOutput<TMsg>) {
77        (self.input(), self.output())
78    }
79
80    /// Возвращает максимальный размер очереди сообщений
81    pub fn max_capacity(&self) -> usize {
82        self.output.max_capacity()
83    }
84
85    /// Закрыть подключение
86    pub fn close(self) {}
87}
88
89impl<TMsg> Clone for MsgBusLinker<TMsg>
90where
91    TMsg: MsgDataBound,
92{
93    fn clone(&self) -> Self {
94        Self {
95            input: self.input.resubscribe(),
96            output: self.output.clone(),
97            id: self.id,
98            name: self.name.clone(),
99            auth_perm: self.auth_perm,
100            fn_auth: self.fn_auth,
101        }
102    }
103}