rsiot/components/cmp_plc/tasks/
plc_loop.rs

1use std::{sync::Arc, time::Duration};
2
3use instant::Instant;
4use serde::Serialize;
5use tokio::sync::{mpsc, Mutex};
6use tracing::{info, trace};
7
8use crate::{
9    executor::{sleep, Cache},
10    message::{Message, MsgDataBound},
11};
12
13use super::super::{
14    plc::{FunctionBlockBase, IFunctionBlock},
15    Config,
16};
17
18/// Задача выполнения цикла ПЛК
19pub struct PlcLoop<TMsg, I, Q, S>
20where
21    TMsg: MsgDataBound + 'static,
22    I: Clone + Default + Send + Serialize + Sync,
23    Q: Clone + Default + Send + Serialize + Sync,
24    S: Clone + Default + Send + Serialize + Sync,
25    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
26{
27    pub input_msg_cache: Cache<TMsg>,
28    pub output: mpsc::Sender<Message<TMsg>>,
29    pub config: Config<TMsg, I, Q, S>,
30    pub fb_main: Arc<Mutex<FunctionBlockBase<I, Q, S>>>,
31}
32
33impl<TMsg, I, Q, S> PlcLoop<TMsg, I, Q, S>
34where
35    TMsg: MsgDataBound + 'static,
36    I: Clone + Default + Send + Serialize + Sync,
37    Q: Clone + Default + Send + Serialize + Sync,
38    S: Clone + Default + Send + Serialize + Sync,
39    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
40{
41    pub async fn spawn(self) -> super::Result<()> {
42        info!("PLC mode: STARTED");
43        let mut fb_main_input = I::default();
44
45        loop {
46            trace!("Start PLC cycle");
47            let begin = Instant::now();
48
49            // Исполняем цикл ПЛК
50            let msgs = plc_cycle::<TMsg, I, Q, S>(
51                &self.config,
52                self.fb_main.clone(),
53                &mut fb_main_input,
54                self.input_msg_cache.clone(),
55            )
56            .await?;
57
58            // Записываем выходы
59            for msg in msgs {
60                self.output
61                    .send(msg)
62                    .await
63                    .map_err(|e| super::Error::TokioSyncMpsc(e.to_string()))?;
64            }
65
66            let elapsed = begin.elapsed();
67            trace!("End PLC cycle, elapsed: {:?}", elapsed);
68            let sleep_time = if self.config.period <= elapsed {
69                Duration::from_millis(10)
70            } else {
71                self.config.period - elapsed
72            };
73            sleep(sleep_time).await;
74        }
75    }
76}
77
78/// Исполнение одного цикла ПЛК
79async fn plc_cycle<TMsg, I, Q, S>(
80    config: &Config<TMsg, I, Q, S>,
81    fb_main: Arc<Mutex<FunctionBlockBase<I, Q, S>>>,
82    fb_main_input: &mut I,
83    input_msg_cache: Cache<TMsg>,
84) -> super::Result<Vec<Message<TMsg>>>
85where
86    TMsg: MsgDataBound + 'static,
87    I: Clone + Default + Send + Serialize,
88    Q: Clone + Default + Send + Serialize,
89    S: Clone + Default + Send + Serialize,
90    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
91{
92    // Инициализация структуры входов в начале цикла
93    (config.fn_cycle_init)(fb_main_input);
94
95    // Обновляем входную структуру по данным из входящих сообщений
96    {
97        let mut lock = input_msg_cache.write().await;
98        for msg in lock.values() {
99            (config.fn_input)(fb_main_input, msg);
100        }
101        lock.clear();
102    }
103
104    // Выполняем цикл ПЛК и формируем исходящие сообщения
105    let msgs;
106    {
107        let mut fb_main = fb_main.lock().await;
108        fb_main.call(fb_main_input);
109        msgs = (config.fn_output)(&fb_main.q);
110    }
111    Ok(msgs)
112}