rsiot/components/cmp_plc/tasks/
retention.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
//! Восстановление сохраненного состояния области `static` ПЛК.
//!
//! По сравнению с другими задачами, при выполнении этой задачи выполнение ПЛК приостанавливается,
//! пока не закончится выполнение данной функции

use std::time::Duration;

use serde::Serialize;
use tokio::task::JoinSet;
use tracing::{info, warn};

use crate::{
    executor::{join_set_spawn, sleep, CmpInOut},
    message::{MsgDataBound, ServiceBound},
};

use super::super::{
    config::{ConfigRetention, ConfigRetentionRestoreResult},
    plc::{FunctionBlockBase, IFunctionBlock},
};

pub struct Retention<TMsg, TService, I, Q, S>
where
    TMsg: MsgDataBound,
    TService: ServiceBound,
    I: Clone + Default + Send + Serialize + 'static + Sync,
    Q: Clone + Default + Send + Serialize + 'static + Sync,
    S: Clone + Default + Send + Serialize + 'static + Sync,
    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
    pub cmp_in_out: CmpInOut<TMsg, TService>,
    pub config_retention: Option<ConfigRetention<TMsg, I, Q, S>>,
    pub fb_main: FunctionBlockBase<I, Q, S>,
}

impl<TMsg, TService, I, Q, S> Retention<TMsg, TService, I, Q, S>
where
    TMsg: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
    I: Clone + Default + Send + Serialize + 'static + Sync,
    Q: Clone + Default + Send + Serialize + 'static + Sync,
    S: Clone + Default + Send + Serialize + 'static + Sync,
    FunctionBlockBase<I, Q, S>: IFunctionBlock<I, Q, S>,
{
    pub async fn spawn(mut self) -> super::Result<FunctionBlockBase<I, Q, S>> {
        let retention_restore = if let Some(config_retention) = self.config_retention {
            let mut task_set_retention = JoinSet::<ConfigRetentionRestoreResult<S>>::new();

            // Таймаут
            // В tokio есть timeout в модуле time, но использование модуля вызывает панику в WASM.
            let task = task_retention_timeout(config_retention.restore_timeout);
            join_set_spawn(&mut task_set_retention, task);

            task_set_retention.spawn(async move {
                while let Ok(msg) = self.cmp_in_out.recv_input().await {
                    let data = (config_retention.fn_import_static)(&msg);

                    let Ok(data) = data else {
                        return ConfigRetentionRestoreResult::RestoreDeserializationError;
                    };

                    if let Some(data) = data {
                        return ConfigRetentionRestoreResult::RestoreData(data);
                    };
                }
                ConfigRetentionRestoreResult::NoRestoreData
            });

            let mut config_retention = ConfigRetentionRestoreResult::NoRestoreData;
            while let Some(task_result) = task_set_retention.join_next().await {
                config_retention = task_result?;
                task_set_retention.shutdown().await;
            }
            config_retention
        } else {
            ConfigRetentionRestoreResult::NoRestoreData
        };
        match retention_restore {
            ConfigRetentionRestoreResult::NoRestoreData => warn!("Restore retention data: no data"),
            ConfigRetentionRestoreResult::RestoreDeserializationError => {
                warn!("Restore retention data: deserialization error");
            }
            ConfigRetentionRestoreResult::RestoreData(_) => {
                info!("Restore retention data: success")
            }
        }

        let fb_main = match retention_restore {
            ConfigRetentionRestoreResult::NoRestoreData => self.fb_main.clone(),
            ConfigRetentionRestoreResult::RestoreDeserializationError => self.fb_main.clone(),
            ConfigRetentionRestoreResult::RestoreData(stat) => self
                .fb_main
                .clone()
                .new_with_restore_stat(stat, self.fb_main.get_period()),
        };

        Ok(fb_main)
    }
}

async fn task_retention_timeout<S>(timeout: Duration) -> ConfigRetentionRestoreResult<S>
where
    S: Clone + Default + Serialize,
{
    sleep(timeout).await;
    ConfigRetentionRestoreResult::NoRestoreData
}