rsiot/components/cmp_inject_periodic/
mod.rs

1//! Компонент для периодического генерирования сообщений
2
3use async_trait::async_trait;
4use instant::Instant;
5use tokio::time::Duration;
6
7use crate::{
8    executor::{sleep, CmpInOut, Component, ComponentError, IComponentProcess},
9    message::{AuthPermissions, Message, MsgDataBound},
10};
11
12/// Конфигурация cmp_inject_periodic
13#[derive(Clone, Debug)]
14pub struct Config<TMsg, TFnPeriodic>
15where
16    TMsg: Clone,
17    TFnPeriodic: FnMut() -> Vec<Message<TMsg>> + Send + Sync,
18{
19    /// Период вызова
20    pub period: Duration,
21
22    /// Функция для генерирования сообщений
23    ///
24    /// Тип данных - `FnMut() -> Vec<Message<TMsg>>`
25    pub fn_periodic: TFnPeriodic,
26}
27
28#[cfg_attr(not(feature = "single-thread"), async_trait)]
29#[cfg_attr(feature = "single-thread", async_trait(?Send))]
30impl<TMsg, TFnPeriodic> IComponentProcess<Config<TMsg, TFnPeriodic>, TMsg>
31    for Component<Config<TMsg, TFnPeriodic>, TMsg>
32where
33    TMsg: MsgDataBound,
34    TFnPeriodic: FnMut() -> Vec<Message<TMsg>> + Send + Sync,
35{
36    async fn process(
37        &self,
38        config: Config<TMsg, TFnPeriodic>,
39        in_out: CmpInOut<TMsg>,
40    ) -> Result<(), ComponentError> {
41        fn_process(
42            config,
43            in_out.clone_with_new_id("cmp_inject_periodic", AuthPermissions::FullAccess),
44        )
45        .await
46    }
47}
48
49async fn fn_process<TMsg, TFnPeriodic>(
50    mut config: Config<TMsg, TFnPeriodic>,
51    in_out: CmpInOut<TMsg>,
52) -> Result<(), ComponentError>
53where
54    TMsg: MsgDataBound,
55    TFnPeriodic: FnMut() -> Vec<Message<TMsg>> + Send + Sync,
56{
57    loop {
58        let begin = Instant::now();
59        let msgs = (config.fn_periodic)();
60        for msg in msgs {
61            in_out
62                .send_output(msg)
63                .await
64                .map_err(|err| ComponentError::Execution(err.to_string()))?;
65        }
66        let elapsed = begin.elapsed();
67        let sleep_time = if config.period <= elapsed {
68            Duration::from_millis(10)
69        } else {
70            config.period - elapsed
71        };
72        sleep(sleep_time).await;
73    }
74}
75
76/// Компонент cmp_inject_periodic
77pub type Cmp<TMessage, TFnPeriodic> = Component<Config<TMessage, TFnPeriodic>, TMessage>;