Skip to main content

rsiot/executor/
component_executor.rs

1use std::time::Duration;
2use tokio::{
3    sync::{broadcast, mpsc},
4    task::JoinSet,
5};
6use tracing::{error, info};
7
8use crate::{
9    executor::Instant,
10    message::{AuthPermissions, Message, MsgDataBound},
11};
12
13#[cfg(feature = "log_tokio")]
14use super::task_runtime_metrics::TaskRuntimeMetrics;
15use super::{
16    MsgBusLinker, TokioRuntimeMetrics, component::IComponent, error::ComponentError,
17    join_set_spawn, task_internal::TaskInternal, types::FnAuth,
18};
19
20#[cfg(feature = "log_tokio")]
21const RUNTIME_METRICS_PERIOD: Duration = Duration::from_millis(100);
22
23pub type FnTokioMetrics<TMsg> = fn(TokioRuntimeMetrics) -> Option<TMsg>;
24
25/// Запуск коллекции компонентов в работу
26pub struct ComponentExecutor<TMsg>
27where
28    TMsg: MsgDataBound,
29{
30    task_set: JoinSet<Result<(), ComponentError>>,
31    cmp_in_out: MsgBusLinker<TMsg>,
32    start_time: Instant,
33}
34
35/// Настройка исполнителя
36pub struct ComponentExecutorConfig<TMsg> {
37    /// Размер буфера канала сообщения
38    pub buffer_size: usize,
39
40    /// Функция фильтрации сообщений в зависимости от текущей авторизации
41    ///
42    /// **Примеры**
43    ///
44    /// - Все сообщения блокируются
45    ///
46    /// ```rust
47    /// |_, _| None
48    /// ```
49    ///
50    /// - Все сообщения разрешены
51    ///
52    /// ```rust
53    /// |msg, _| Some(msg)
54    /// ```
55    pub fn_auth: FnAuth<TMsg>,
56
57    /// Задержка публикации сообщений
58    ///
59    /// Рассылка сообщений осуществляется по каналу broadcast. При инициализации компоненты
60    /// получают только новые сообщения. Эта задержка нужна для того, чтобы компоненты успели
61    /// запуститься.
62    pub delay_publish: Duration,
63
64    /// Функция создания сообщения с метриками `tokio`
65    ///
66    /// Заглушка: `|_| None`
67    pub fn_tokio_metrics: FnTokioMetrics<TMsg>,
68}
69
70impl<TMsg> ComponentExecutor<TMsg>
71where
72    TMsg: MsgDataBound + 'static,
73{
74    /// Создание коллекции компонентов
75    pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
76        info!("ComponentExecutor start creation");
77        let (component_input_send, component_input) =
78            broadcast::channel::<Message<TMsg>>(config.buffer_size);
79        let (component_output, component_output_recv) =
80            mpsc::channel::<Message<TMsg>>(config.buffer_size);
81        let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
82
83        // Запускаем внутреннюю задачу
84
85        let task = TaskInternal {
86            output: component_output_recv,
87            input: component_input_send.clone(),
88            delay_publish: config.delay_publish,
89            max_capacity: config.buffer_size,
90        };
91        join_set_spawn(&mut task_set, "internal_task", task.spawn());
92
93        // Запускаем задачу сбора метрик tokio
94        #[cfg(feature = "log_tokio")]
95        {
96            let task = TaskRuntimeMetrics::<TMsg> {
97                output: component_input_send,
98                period: RUNTIME_METRICS_PERIOD,
99                fn_tokio_metrics: config.fn_tokio_metrics,
100            };
101            join_set_spawn(&mut task_set, "tokio_metrics", task.spawn());
102        }
103
104        let cmp_in_out = MsgBusLinker::new(
105            component_input,
106            component_output,
107            AuthPermissions::default(),
108            config.fn_auth,
109        );
110
111        Self {
112            task_set,
113            cmp_in_out,
114            start_time: Instant::now(),
115        }
116    }
117
118    /// Добавить компонент
119    #[cfg(not(feature = "single-thread"))]
120    pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
121        component.set_interface(self.cmp_in_out.clone());
122
123        self.task_set.spawn(async move { component.spawn().await });
124
125        self
126    }
127    /// Добавить компонент (?Send)
128    #[cfg(feature = "single-thread")]
129    pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
130        component.set_interface(self.cmp_in_out.clone());
131
132        self.task_set
133            .spawn_local(async move { component.spawn().await });
134        self
135    }
136
137    /// Запустить на выполнение все компоненты и ожидать завершения выполнения выполнения какого-то
138    /// компонента.
139    pub async fn wait_result(mut self) -> Result<(), ComponentError> {
140        // Удаляем неиспользуемые каналы шины сообщений
141        drop(self.cmp_in_out);
142
143        let msg;
144        if let Some(result) = self.task_set.join_next().await {
145            info!("Program runtime: {:?}", self.start_time.elapsed());
146            match result {
147                Ok(Ok(_)) => {
148                    msg = "Component has finished executing with Ok result".to_string();
149                    info!(msg);
150                    return Ok(());
151                }
152
153                Ok(Err(err)) => {
154                    msg = format!("Component has finished executing with error: {err:?}");
155                }
156
157                Err(err) => {
158                    msg = format!("Component has finished executing with error: {err:?}");
159                }
160            };
161            error!(msg);
162            return Err(ComponentError::Execution(msg));
163        }
164        Ok(())
165    }
166}