rsiot/executor/
component_executor.rs

1use std::time::Duration;
2use tokio::{
3    sync::{broadcast, mpsc},
4    task::JoinSet,
5};
6use tracing::{debug, error, info, trace, warn};
7use uuid::Uuid;
8
9use crate::message::{system_messages::*, *};
10
11use super::{
12    component::IComponent, error::ComponentError, join_set_spawn, sleep, types::FnAuth, Cache,
13    CmpInOut,
14};
15
16const UPDATE_TTL_PERIOD: Duration = Duration::from_millis(200);
17
18/// Запуск коллекции компонентов в работу
19pub struct ComponentExecutor<TMsg>
20where
21    TMsg: MsgDataBound,
22{
23    task_set: JoinSet<Result<(), ComponentError>>,
24    cmp_in_out: CmpInOut<TMsg>,
25}
26
27/// Настройка исполнителя
28pub struct ComponentExecutorConfig<TMsg> {
29    /// Размер буфера канала сообщения
30    pub buffer_size: usize,
31
32    /// Функция фильтрации сообщений в зависимости от текущей авторизации
33    ///
34    /// **Примеры**
35    ///
36    /// - Все сообщения блокируются
37    ///
38    /// ```rust
39    /// |_, _| None
40    /// ```
41    ///
42    /// - Все сообщения разрешены
43    ///
44    /// ```rust
45    /// |msg, _| Some(msg)
46    /// ```
47    pub fn_auth: FnAuth<TMsg>,
48
49    /// Задержка публикации сообщений
50    ///
51    /// Рассылка сообщений осуществляется по каналу broadcast. При инициализации компоненты
52    /// получают только новые сообщения. Эта задержка нужна для того, чтобы компоненты успели
53    /// запуститься.
54    pub delay_publish: Duration,
55}
56
57impl<TMsg> ComponentExecutor<TMsg>
58where
59    TMsg: MsgDataBound + 'static,
60{
61    /// Создание коллекции компонентов
62    pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
63        info!("ComponentExecutor start creation");
64        let id = MsgTrace::generate_uuid();
65        let (component_input_send, component_input) =
66            broadcast::channel::<Message<TMsg>>(config.buffer_size);
67        let (component_output, component_output_recv) =
68            mpsc::channel::<Message<TMsg>>(config.buffer_size);
69        let cache: Cache<TMsg> = Cache::new();
70        let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
71
72        // Запускаем внутреннюю задачу
73        let task_internal_handle = task_internal(
74            component_output_recv,
75            component_input_send.clone(),
76            cache.clone(),
77            id,
78            config.delay_publish,
79        );
80        join_set_spawn(&mut task_set, task_internal_handle);
81
82        // Запускаем задачу обновления TTL сообщений
83        let task_update_ttl_in_cache_handle = task_update_ttl_in_cache(cache.clone());
84        join_set_spawn(&mut task_set, task_update_ttl_in_cache_handle);
85
86        let cmp_in_out = CmpInOut::new(
87            component_input,
88            component_output,
89            cache.clone(),
90            "Trace name (maybe delete?)",
91            id,
92            AuthPermissions::default(),
93            config.fn_auth,
94        );
95
96        Self {
97            task_set,
98            cmp_in_out,
99        }
100    }
101
102    /// Добавить компонент
103    #[cfg(not(feature = "single-thread"))]
104    pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
105        component.set_interface(self.cmp_in_out.clone());
106
107        self.task_set.spawn(async move { component.spawn().await });
108
109        self
110    }
111    /// Добавить компонент (?Send)
112    #[cfg(feature = "single-thread")]
113    pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
114        component.set_interface(self.cmp_in_out.clone());
115
116        self.task_set
117            .spawn_local(async move { component.spawn().await });
118        self
119    }
120
121    /// Запустить на выполнение все компоненты.
122    ///
123    /// Компоненты не должны заканчивать выполнение. Если хоть один остановился (неважно по какой
124    /// причине - по ошибке или нет), это ошибка выполнения.
125    pub async fn wait_result(&mut self) -> Result<(), ComponentError> {
126        let msg;
127        if let Some(result) = self.task_set.join_next().await {
128            match result {
129                Ok(result) => match result {
130                    Ok(_) => msg = "Component has finished executing".to_string(),
131                    Err(err) => {
132                        msg = format!("Component has finished executing with error: {:?}", err);
133                    }
134                },
135                Err(err) => {
136                    msg = format!("Component has finished executing with error: {:?}", err);
137                }
138            };
139            error!(msg);
140            return Err(ComponentError::Execution(msg));
141        }
142        Ok(())
143    }
144}
145
146async fn task_internal<TMsg>(
147    mut input: mpsc::Receiver<Message<TMsg>>,
148    output: broadcast::Sender<Message<TMsg>>,
149    cache: Cache<TMsg>,
150    executor_id: Uuid,
151    delay_publish: Duration,
152) -> Result<(), ComponentError>
153where
154    TMsg: MsgDataBound,
155{
156    debug!("Internal task of ComponentExecutor: starting");
157
158    // Задержка, чтобы компоненты успели запуститься и подписаться на получение сообщений
159    sleep(delay_publish).await;
160
161    while let Some(mut msg) = input.recv().await {
162        // TODO
163        trace!("ComponentExecutor: new message: {:?}", msg);
164        // msg.add_trace_item(&executor_id, &format!("{}::internal_bus", service_name));
165        msg.add_trace_item(&executor_id);
166        let msg = save_msg_in_cache(msg, &cache).await;
167        let Some(msg) = msg else { continue };
168        output.send(msg).map_err(|err| {
169            let err = format!(
170                "Internal task of ComponentExecutor: send to channel error, {:?}",
171                err
172            );
173            ComponentError::Initialization(err)
174        })?;
175    }
176    warn!("Internal task: stop");
177    Ok(())
178}
179
180/// Обновить значения времени жизни сообщений. Удаляет сообщения, время которых истекло
181async fn task_update_ttl_in_cache<TMsg>(cache: Cache<TMsg>) -> Result<(), ComponentError>
182where
183    TMsg: MsgDataBound,
184{
185    loop {
186        sleep(UPDATE_TTL_PERIOD).await;
187        let mut cache = cache.write().await;
188        let mut keys_for_delete = vec![];
189        for (key, msg) in cache.iter_mut() {
190            msg.update_time_to_live(UPDATE_TTL_PERIOD);
191            if !msg.is_alive() {
192                keys_for_delete.push(key.clone());
193            }
194        }
195        for key in keys_for_delete {
196            let remove_result = cache.remove(&key);
197            if remove_result.is_none() {
198                let err = format!("Message with key {} not found in cache", key);
199                return Err(ComponentError::Execution(err));
200            }
201        }
202    }
203}
204
205/// Сохраняем сообщение в кеше
206///
207/// Возвращает `Option<Message>`:
208/// - None - сообщение не нужно отправлять дальше
209/// - Some(Message) - сообщение нужно отправить на вход всех компонентов
210async fn save_msg_in_cache<TMsg>(msg: Message<TMsg>, cache: &Cache<TMsg>) -> Option<Message<TMsg>>
211where
212    TMsg: MsgDataBound,
213{
214    // Фильтруем сообщения авторизации
215    if let MsgData::System(data) = &msg.data {
216        match data {
217            System::AuthRequestByLogin(_) => return Some(msg),
218            System::AuthRequestByToken(_) => return Some(msg),
219            System::AuthResponseErr(_) => return Some(msg),
220            System::AuthResponseOk(_) => return Some(msg),
221            System::EspWifiConnected => return Some(msg),
222            System::Ping(_) => return None,
223            System::Pong(_) => return None,
224        }
225    }
226    // Время жизни сообщения истекло
227    if !msg.is_alive() {
228        return Some(msg);
229    }
230    let key = msg.key.clone();
231    let value = msg.clone();
232    {
233        let mut lock = cache.write().await;
234        let value_from_cache = lock.get(&key);
235        if let Some(_value_from_cache) = value_from_cache {
236            // если в кеше более новое сообщение, отбрасываем
237            // if value.ts <= value_from_cache.ts {
238            //     return None;
239            // }
240        }
241        lock.insert(key, value);
242    }
243    Some(msg)
244}