rsiot/components/cmp_plc/
fn_process.rs

1use std::sync::Arc;
2
3use futures::TryFutureExt;
4use serde::Serialize;
5use tokio::{sync::mpsc, sync::Mutex, task::JoinSet};
6use tracing::info;
7
8use crate::{
9    components::shared_tasks,
10    executor::{join_set_spawn, Cache, CmpInOut},
11    message::MsgDataBound,
12};
13
14use super::{
15    config::Config,
16    plc::{FunctionBlockBase, IFunctionBlock},
17    tasks, Error,
18};
19
20pub async fn fn_process<TMsg, I, Q, S>(
21    in_out: CmpInOut<TMsg>,
22    config: Config<TMsg, I, Q, S>,
23) -> super::Result<()>
24where
25    TMsg: MsgDataBound + 'static,
26    I: Clone + Default + Send + Serialize + 'static + Sync,
27    Q: Clone + Default + Send + Serialize + 'static + Sync,
28    S: Clone + Default + Send + Serialize + 'static + Sync,
29    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
30{
31    info!("PLC mode: STOPPED");
32
33    let input_msg_cache = Cache::<TMsg>::new();
34
35    let mut task_set = JoinSet::<super::Result<()>>::new();
36
37    let buffer_size = in_out.max_capacity();
38    let (channel_plc_to_filter_send, channel_plc_to_filter_recv) = mpsc::channel(buffer_size);
39    let (channel_filter_to_output_send, channel_filter_to_output_recv) = mpsc::channel(buffer_size);
40
41    // Сохранение входных сообщений в кеше
42    let task = tasks::SaveInputInCache {
43        in_out: in_out.clone(),
44        input_msg_cache: input_msg_cache.clone(),
45    };
46    join_set_spawn(&mut task_set, task.spawn());
47
48    // Ожидаем данные для восстановления памяти
49    let fb_main = tasks::Retention {
50        cmp_in_out: in_out.clone(),
51        config_retention: config.retention.clone(),
52        fb_main: config.fb_main.clone(),
53    }
54    .spawn()
55    .await?;
56    let fb_main = Arc::new(Mutex::new(fb_main));
57
58    // Выполнение цикла ПЛК
59    let task = tasks::PlcLoop {
60        input_msg_cache,
61        output: channel_plc_to_filter_send,
62        config: config.clone(),
63        fb_main: fb_main.clone(),
64    };
65    join_set_spawn(&mut task_set, task.spawn());
66
67    // Фильтрация исходящих сообщений
68    let task = shared_tasks::filter_identical_data::FilterIdenticalData {
69        input: channel_plc_to_filter_recv,
70        output: channel_filter_to_output_send,
71    };
72    join_set_spawn(
73        &mut task_set,
74        task.spawn().map_err(Error::FilterMsgsWithSameData),
75    );
76
77    // Пересылка сообщений на выход компонента
78    let task = shared_tasks::mpsc_to_msgbus::MpscToMsgBus {
79        input: channel_filter_to_output_recv,
80        msg_bus: in_out.clone(),
81    };
82    join_set_spawn(&mut task_set, task.spawn().map_err(Error::ToCmpOutput));
83
84    // Периодический экспорт состояния
85    let task = tasks::ExportCurrentState {
86        in_out: in_out.clone(),
87        config_retention: config.retention,
88        fb_main: fb_main.clone(),
89    };
90    join_set_spawn(&mut task_set, task.spawn());
91
92    while let Some(res) = task_set.join_next().await {
93        res??
94    }
95    Ok(())
96}