rsiot/components/cmp_plc/tasks/
retention.rsuse std::time::Duration;
use serde::Serialize;
use tokio::task::JoinSet;
use tracing::{info, warn};
use crate::{
executor::{join_set_spawn, sleep, CmpInOut},
message::{MsgDataBound, ServiceBound},
};
use super::super::{
config::{ConfigRetention, ConfigRetentionRestoreResult},
plc::{FunctionBlockBase, IFunctionBlock},
};
pub struct Retention<TMsg, TService, I, Q, S>
where
TMsg: MsgDataBound,
TService: ServiceBound,
I: Clone + Default + Send + Serialize + 'static + Sync,
Q: Clone + Default + Send + Serialize + 'static + Sync,
S: Clone + Default + Send + Serialize + 'static + Sync,
FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
pub cmp_in_out: CmpInOut<TMsg, TService>,
pub config_retention: Option<ConfigRetention<TMsg, I, Q, S>>,
pub fb_main: FunctionBlockBase<I, Q, S>,
}
impl<TMsg, TService, I, Q, S> Retention<TMsg, TService, I, Q, S>
where
TMsg: MsgDataBound + 'static,
TService: ServiceBound + 'static,
I: Clone + Default + Send + Serialize + 'static + Sync,
Q: Clone + Default + Send + Serialize + 'static + Sync,
S: Clone + Default + Send + Serialize + 'static + Sync,
FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
pub async fn spawn(mut self) -> super::Result<FunctionBlockBase<I, Q, S>> {
let retention_restore = if let Some(config_retention) = self.config_retention {
let mut task_set_retention = JoinSet::<ConfigRetentionRestoreResult<S>>::new();
let task = task_retention_timeout(config_retention.restore_timeout);
join_set_spawn(&mut task_set_retention, task);
task_set_retention.spawn(async move {
while let Ok(msg) = self.cmp_in_out.recv_input().await {
let data = (config_retention.fn_import_static)(&msg);
let Ok(data) = data else {
return ConfigRetentionRestoreResult::RestoreDeserializationError;
};
if let Some(data) = data {
return ConfigRetentionRestoreResult::RestoreData(data);
};
}
ConfigRetentionRestoreResult::NoRestoreData
});
let mut config_retention = ConfigRetentionRestoreResult::NoRestoreData;
while let Some(task_result) = task_set_retention.join_next().await {
config_retention = task_result?;
task_set_retention.shutdown().await;
}
config_retention
} else {
ConfigRetentionRestoreResult::NoRestoreData
};
match retention_restore {
ConfigRetentionRestoreResult::NoRestoreData => warn!("Restore retention data: no data"),
ConfigRetentionRestoreResult::RestoreDeserializationError => {
warn!("Restore retention data: deserialization error");
}
ConfigRetentionRestoreResult::RestoreData(_) => {
info!("Restore retention data: success")
}
}
let fb_main = match retention_restore {
ConfigRetentionRestoreResult::NoRestoreData => self.fb_main.clone(),
ConfigRetentionRestoreResult::RestoreDeserializationError => self.fb_main.clone(),
ConfigRetentionRestoreResult::RestoreData(stat) => self
.fb_main
.clone()
.new_with_restore_stat(stat, self.fb_main.get_period()),
};
Ok(fb_main)
}
}
async fn task_retention_timeout<S>(timeout: Duration) -> ConfigRetentionRestoreResult<S>
where
S: Clone + Default + Serialize,
{
sleep(timeout).await;
ConfigRetentionRestoreResult::NoRestoreData
}