rsiot/components/cmp_plc/
fn_process.rs1use std::sync::Arc;
2
3use futures::TryFutureExt;
4use serde::Serialize;
5use tokio::{sync::mpsc, sync::Mutex, task::JoinSet};
6use tracing::info;
7
8use crate::{
9 components::shared_tasks,
10 executor::{join_set_spawn, Cache, CmpInOut},
11 message::MsgDataBound,
12};
13
14use super::{
15 config::Config,
16 plc::{FunctionBlockBase, IFunctionBlock},
17 tasks, Error,
18};
19
20pub async fn fn_process<TMsg, I, Q, S>(
21 in_out: CmpInOut<TMsg>,
22 config: Config<TMsg, I, Q, S>,
23) -> super::Result<()>
24where
25 TMsg: MsgDataBound + 'static,
26 I: Clone + Default + Send + Serialize + 'static + Sync,
27 Q: Clone + Default + Send + Serialize + 'static + Sync,
28 S: Clone + Default + Send + Serialize + 'static + Sync,
29 FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
30{
31 info!("PLC mode: STOPPED");
32
33 let input_msg_cache = Cache::<TMsg>::new();
34
35 let mut task_set = JoinSet::<super::Result<()>>::new();
36
37 let buffer_size = in_out.max_capacity();
38 let (channel_plc_to_filter_send, channel_plc_to_filter_recv) = mpsc::channel(buffer_size);
39 let (channel_filter_to_output_send, channel_filter_to_output_recv) = mpsc::channel(buffer_size);
40
41 let task = tasks::SaveInputInCache {
43 in_out: in_out.clone(),
44 input_msg_cache: input_msg_cache.clone(),
45 };
46 join_set_spawn(&mut task_set, task.spawn());
47
48 let fb_main = tasks::Retention {
50 cmp_in_out: in_out.clone(),
51 config_retention: config.retention.clone(),
52 fb_main: config.fb_main.clone(),
53 }
54 .spawn()
55 .await?;
56 let fb_main = Arc::new(Mutex::new(fb_main));
57
58 let task = tasks::PlcLoop {
60 input_msg_cache,
61 output: channel_plc_to_filter_send,
62 config: config.clone(),
63 fb_main: fb_main.clone(),
64 };
65 join_set_spawn(&mut task_set, task.spawn());
66
67 let task = shared_tasks::filter_identical_data::FilterIdenticalData {
69 input: channel_plc_to_filter_recv,
70 output: channel_filter_to_output_send,
71 };
72 join_set_spawn(
73 &mut task_set,
74 task.spawn().map_err(Error::FilterMsgsWithSameData),
75 );
76
77 let task = shared_tasks::mpsc_to_msgbus::MpscToMsgBus {
79 input: channel_filter_to_output_recv,
80 msg_bus: in_out.clone(),
81 };
82 join_set_spawn(&mut task_set, task.spawn().map_err(Error::ToCmpOutput));
83
84 let task = tasks::ExportCurrentState {
86 in_out: in_out.clone(),
87 config_retention: config.retention,
88 fb_main: fb_main.clone(),
89 };
90 join_set_spawn(&mut task_set, task.spawn());
91
92 while let Some(res) = task_set.join_next().await {
93 res??
94 }
95 Ok(())
96}