rsiot/components/cmp_plc/tasks/
plc_loop.rs1use std::{sync::Arc, time::Duration};
2
3use instant::Instant;
4use serde::Serialize;
5use tokio::sync::{mpsc, Mutex};
6use tracing::{info, trace};
7
8use crate::{
9 executor::{sleep, Cache},
10 message::{Message, MsgDataBound},
11};
12
13use super::super::{
14 plc::{FunctionBlockBase, IFunctionBlock},
15 Config,
16};
17
18pub struct PlcLoop<TMsg, I, Q, S>
20where
21 TMsg: MsgDataBound + 'static,
22 I: Clone + Default + Send + Serialize + Sync,
23 Q: Clone + Default + Send + Serialize + Sync,
24 S: Clone + Default + Send + Serialize + Sync,
25 FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
26{
27 pub input_msg_cache: Cache<TMsg>,
28 pub output: mpsc::Sender<Message<TMsg>>,
29 pub config: Config<TMsg, I, Q, S>,
30 pub fb_main: Arc<Mutex<FunctionBlockBase<I, Q, S>>>,
31}
32
33impl<TMsg, I, Q, S> PlcLoop<TMsg, I, Q, S>
34where
35 TMsg: MsgDataBound + 'static,
36 I: Clone + Default + Send + Serialize + Sync,
37 Q: Clone + Default + Send + Serialize + Sync,
38 S: Clone + Default + Send + Serialize + Sync,
39 FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
40{
41 pub async fn spawn(self) -> super::Result<()> {
42 info!("PLC mode: STARTED");
43 let mut fb_main_input = I::default();
44
45 loop {
46 trace!("Start PLC cycle");
47 let begin = Instant::now();
48
49 let msgs = plc_cycle::<TMsg, I, Q, S>(
51 &self.config,
52 self.fb_main.clone(),
53 &mut fb_main_input,
54 self.input_msg_cache.clone(),
55 )
56 .await?;
57
58 for msg in msgs {
60 self.output
61 .send(msg)
62 .await
63 .map_err(|e| super::Error::TokioSyncMpsc(e.to_string()))?;
64 }
65
66 let elapsed = begin.elapsed();
67 trace!("End PLC cycle, elapsed: {:?}", elapsed);
68 let sleep_time = if self.config.period <= elapsed {
69 Duration::from_millis(10)
70 } else {
71 self.config.period - elapsed
72 };
73 sleep(sleep_time).await;
74 }
75 }
76}
77
78async fn plc_cycle<TMsg, I, Q, S>(
80 config: &Config<TMsg, I, Q, S>,
81 fb_main: Arc<Mutex<FunctionBlockBase<I, Q, S>>>,
82 fb_main_input: &mut I,
83 input_msg_cache: Cache<TMsg>,
84) -> super::Result<Vec<Message<TMsg>>>
85where
86 TMsg: MsgDataBound + 'static,
87 I: Clone + Default + Send + Serialize,
88 Q: Clone + Default + Send + Serialize,
89 S: Clone + Default + Send + Serialize,
90 FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
91{
92 (config.fn_cycle_init)(fb_main_input);
94
95 {
97 let mut lock = input_msg_cache.write().await;
98 for msg in lock.values() {
99 (config.fn_input)(fb_main_input, msg);
100 }
101 lock.clear();
102 }
103
104 let msgs;
106 {
107 let mut fb_main = fb_main.lock().await;
108 fb_main.call(fb_main_input);
109 msgs = (config.fn_output)(&fb_main.q);
110 }
111 Ok(msgs)
112}