rsiot/components/cmp_plc/
fn_process.rs
use std::sync::Arc;
use futures::TryFutureExt;
use serde::Serialize;
use tokio::{sync::mpsc, sync::Mutex, task::JoinSet};
use tracing::info;
use crate::{
components::shared_tasks,
executor::{join_set_spawn, Cache, CmpInOut},
message::{MsgDataBound, ServiceBound},
};
use super::{
config::Config,
plc::{FunctionBlockBase, IFunctionBlock},
tasks, Error,
};
pub async fn fn_process<TMsg, TService, I, Q, S>(
in_out: CmpInOut<TMsg, TService>,
config: Config<TMsg, I, Q, S>,
) -> super::Result<()>
where
TMsg: MsgDataBound + 'static,
TService: ServiceBound + 'static,
I: Clone + Default + Send + Serialize + 'static + Sync,
Q: Clone + Default + Send + Serialize + 'static + Sync,
S: Clone + Default + Send + Serialize + 'static + Sync,
FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
info!("PLC mode: STOPPED");
let input_msg_cache = Cache::<TMsg>::new();
let mut task_set = JoinSet::<super::Result<()>>::new();
let buffer_size = in_out.max_capacity();
let (channel_plc_to_filter_send, channel_plc_to_filter_recv) = mpsc::channel(buffer_size);
let (channel_filter_to_output_send, channel_filter_to_output_recv) = mpsc::channel(buffer_size);
let fb_main = tasks::Retention {
cmp_in_out: in_out.clone(),
config_retention: config.retention.clone(),
fb_main: config.fb_main.clone(),
}
.spawn()
.await?;
let fb_main = Arc::new(Mutex::new(fb_main));
let task = tasks::SaveInputInCache {
in_out: in_out.clone(),
input_msg_cache: input_msg_cache.clone(),
};
join_set_spawn(&mut task_set, task.spawn());
let task = tasks::PlcLoop {
input_msg_cache,
output: channel_plc_to_filter_send,
config: config.clone(),
fb_main: fb_main.clone(),
};
join_set_spawn(&mut task_set, task.spawn());
let task = shared_tasks::filter_identical_data::FilterIdenticalData {
input: channel_plc_to_filter_recv,
output: channel_filter_to_output_send,
};
join_set_spawn(
&mut task_set,
task.spawn().map_err(Error::FilterMsgsWithSameData),
);
let task = shared_tasks::mpsc_to_msgbus::MpscToMsgBus {
input: channel_filter_to_output_recv,
msg_bus: in_out.clone(),
};
join_set_spawn(&mut task_set, task.spawn().map_err(Error::ToCmpOutput));
let task = tasks::ExportCurrentState {
in_out: in_out.clone(),
config_retention: config.retention,
fb_main: fb_main.clone(),
};
join_set_spawn(&mut task_set, task.spawn());
while let Some(res) = task_set.join_next().await {
res??
}
Ok(())
}