rsiot/components/cmp_inject_periodic/
mod.rs1use async_trait::async_trait;
4use instant::Instant;
5use tokio::time::Duration;
6
7use crate::{
8 executor::{sleep, CmpInOut, Component, ComponentError, IComponentProcess},
9 message::{AuthPermissions, Message, MsgDataBound},
10};
11
12#[derive(Clone, Debug)]
14pub struct Config<TMsg, TFnPeriodic>
15where
16 TMsg: Clone,
17 TFnPeriodic: FnMut() -> Vec<Message<TMsg>> + Send + Sync,
18{
19 pub period: Duration,
21
22 pub fn_periodic: TFnPeriodic,
26}
27
28#[cfg_attr(not(feature = "single-thread"), async_trait)]
29#[cfg_attr(feature = "single-thread", async_trait(?Send))]
30impl<TMsg, TFnPeriodic> IComponentProcess<Config<TMsg, TFnPeriodic>, TMsg>
31 for Component<Config<TMsg, TFnPeriodic>, TMsg>
32where
33 TMsg: MsgDataBound,
34 TFnPeriodic: FnMut() -> Vec<Message<TMsg>> + Send + Sync,
35{
36 async fn process(
37 &self,
38 config: Config<TMsg, TFnPeriodic>,
39 in_out: CmpInOut<TMsg>,
40 ) -> Result<(), ComponentError> {
41 fn_process(
42 config,
43 in_out.clone_with_new_id("cmp_inject_periodic", AuthPermissions::FullAccess),
44 )
45 .await
46 }
47}
48
49async fn fn_process<TMsg, TFnPeriodic>(
50 mut config: Config<TMsg, TFnPeriodic>,
51 in_out: CmpInOut<TMsg>,
52) -> Result<(), ComponentError>
53where
54 TMsg: MsgDataBound,
55 TFnPeriodic: FnMut() -> Vec<Message<TMsg>> + Send + Sync,
56{
57 loop {
58 let begin = Instant::now();
59 let msgs = (config.fn_periodic)();
60 for msg in msgs {
61 in_out
62 .send_output(msg)
63 .await
64 .map_err(|err| ComponentError::Execution(err.to_string()))?;
65 }
66 let elapsed = begin.elapsed();
67 let sleep_time = if config.period <= elapsed {
68 Duration::from_millis(10)
69 } else {
70 config.period - elapsed
71 };
72 sleep(sleep_time).await;
73 }
74}
75
76pub type Cmp<TMessage, TFnPeriodic> = Component<Config<TMessage, TFnPeriodic>, TMessage>;