rsiot/executor/
cmp_in_out.rs

1use std::{cmp::max, fmt::Debug};
2
3use tracing::{info, trace};
4use uuid::Uuid;
5
6use crate::message::{system_messages::*, *};
7
8use super::{
9    types::{CmpInput, CmpOutput, FnAuth},
10    Cache, ComponentError,
11};
12
13/// Подключение компонента к внутренней шине сообщений исполнителя
14#[derive(Debug)]
15pub struct CmpInOut<TMsg>
16where
17    TMsg: MsgDataBound,
18{
19    input: CmpInput<TMsg>,
20    output: CmpOutput<TMsg>,
21    /// Ссылка на кэш
22    /// TODO - проверить, скорее всего можно сделать приватным
23    pub cache: Cache<TMsg>,
24    name: String,
25    id: Uuid,
26    auth_perm: AuthPermissions,
27    fn_auth: FnAuth<TMsg>,
28}
29
30impl<TMsg> CmpInOut<TMsg>
31where
32    TMsg: MsgDataBound,
33{
34    /// Создание подключения к внутренней шине сообщений исполнителя
35    pub fn new(
36        input: CmpInput<TMsg>,
37        output: CmpOutput<TMsg>,
38        cache: Cache<TMsg>,
39        name: &str,
40        id: Uuid,
41        auth_perm: AuthPermissions,
42        fn_auth: FnAuth<TMsg>,
43    ) -> Self {
44        info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
45        Self {
46            input,
47            output,
48            cache,
49            id,
50            name: name.into(),
51            auth_perm,
52            fn_auth,
53        }
54    }
55
56    /// Клонировать и присвоить новый идентификатор
57    ///
58    /// Необходимо вызывать в начале исполнения компонента, чтобы у каждого компонента был
59    /// уникальный id
60    pub fn clone_with_new_id(&self, name: &str, auth_perm: AuthPermissions) -> Self {
61        let name = format!("{}::{}", self.name, name);
62        let id = MsgTrace::generate_uuid();
63        info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
64        Self {
65            input: self.input.resubscribe(),
66            output: self.output.clone(),
67            cache: self.cache.clone(),
68            name,
69            id,
70            auth_perm,
71            fn_auth: self.fn_auth,
72        }
73    }
74
75    /// Получение сообщений со входа
76    pub async fn recv_input(&mut self) -> Result<Message<TMsg>, ComponentError> {
77        loop {
78            let msg = self
79                .input
80                .recv()
81                .await
82                .map_err(|e| ComponentError::CmpInput(e.to_string()))?;
83
84            // Обновляем уровень авторизации при получении системного сообщения. Пропускаем
85            // сообщение, если запрос на авторизацию не проходил через данный компонент
86            if let MsgData::System(System::AuthResponseOk(value)) = &msg.data {
87                if !value.trace_ids.contains(&self.id) {
88                    continue;
89                }
90                self.auth_perm = max(self.auth_perm, value.perm);
91            }
92            if let MsgData::System(System::AuthResponseErr(value)) = &msg.data {
93                if !value.trace_ids.contains(&self.id) {
94                    continue;
95                }
96            }
97
98            // Если данное сообщение было сгенерировано данным сервисом, пропускаем
99            if msg.contains_trace_item(&self.id) {
100                continue;
101            }
102
103            // Если нет авторизации, пропускаем
104            let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
105                continue;
106            };
107
108            msg.add_trace_item(&self.id);
109            return Ok(msg);
110        }
111    }
112
113    /// Возвращает копию сообщений из кеша
114    pub async fn recv_cache_all(&self) -> Vec<Message<TMsg>> {
115        let lock = self.cache.read().await;
116        lock.values()
117            .cloned()
118            .filter_map(|m| (self.fn_auth)(m, &self.auth_perm))
119            .collect()
120    }
121
122    /// Возвращает сообщение из кеша по ключу
123    pub async fn recv_cache_msg(&self, key: &str) -> Option<Message<TMsg>> {
124        let cache = self.cache.read().await;
125        cache.get(key).map(|m| m.to_owned())
126    }
127
128    /// Отправка сообщений на выход
129    pub async fn send_output(&self, msg: Message<TMsg>) -> Result<(), ComponentError> {
130        trace!("Start send to output: {msg:?}");
131        // Если нет авторизации, пропускаем
132        let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
133            trace!("No authorization. Auth: {:?}", self.auth_perm);
134            return Ok(());
135        };
136
137        msg.add_trace_item(&self.id);
138        self.output
139            .send(msg)
140            .await
141            .map_err(|e| ComponentError::CmpOutput(e.to_string()))
142    }
143
144    /// Отправка исходящих сообщений, в синхронном окружении
145    pub fn send_output_blocking(&self, msg: Message<TMsg>) -> Result<(), ComponentError> {
146        trace!("Start send to output: {msg:?}");
147        // Если нет авторизации, пропускаем
148        let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
149            trace!("No authorization. Auth: {:?}", self.auth_perm);
150            return Ok(());
151        };
152
153        msg.add_trace_item(&self.id);
154        self.output
155            .blocking_send(msg)
156            .map_err(|e| ComponentError::CmpOutput(e.to_string()))
157    }
158
159    /// Возвращает максимальный размер очереди сообщений
160    pub fn max_capacity(&self) -> usize {
161        self.output.max_capacity()
162    }
163}
164
165impl<TMsg> Clone for CmpInOut<TMsg>
166where
167    TMsg: MsgDataBound,
168{
169    fn clone(&self) -> Self {
170        Self {
171            input: self.input.resubscribe(),
172            output: self.output.clone(),
173            cache: self.cache.clone(),
174            id: self.id,
175            name: self.name.clone(),
176            auth_perm: self.auth_perm,
177            fn_auth: self.fn_auth,
178        }
179    }
180}