rsiot/components/cmp_webstorage/tasks/
output.rs

1use std::collections::HashMap;
2
3use gloo::storage::{LocalStorage, SessionStorage, Storage};
4use tracing::warn;
5
6use crate::message::{Message, MsgDataBound};
7
8use super::{
9    super::{config::FnOutput, ConfigStorageKind},
10    TaskInput, TaskOutput,
11};
12
13pub struct Output<TMsg> {
14    pub input: TaskInput<TMsg>,
15    pub output: TaskOutput<TMsg>,
16    pub storage_kind: ConfigStorageKind,
17    pub default_messages: Vec<Message<TMsg>>,
18    pub fn_output: FnOutput<TMsg>,
19}
20
21impl<TMsg> Output<TMsg>
22where
23    TMsg: MsgDataBound,
24{
25    pub async fn spawn(mut self) -> super::Result<()> {
26        // Загружаем из хранилища все значения
27        let msgs: Result<HashMap<String, Message<TMsg>>, _> = match self.storage_kind {
28            ConfigStorageKind::LocalStorage => LocalStorage::get_all(),
29            ConfigStorageKind::SessionStorage => SessionStorage::get_all(),
30        };
31
32        let mut msgs = match msgs {
33            Ok(val) => val,
34            Err(err) => {
35                warn!("Error loading messages: {}", err);
36                match self.storage_kind {
37                    ConfigStorageKind::LocalStorage => LocalStorage::clear(),
38                    ConfigStorageKind::SessionStorage => SessionStorage::clear(),
39                }
40                warn!("Storage {:?} cleared", self.storage_kind);
41                HashMap::new()
42            }
43        };
44
45        // Добавляем значения по-умолчанию
46        for msg in self.default_messages {
47            if !msgs.contains_key(&msg.key) {
48                msgs.insert(msg.key.clone(), msg);
49            }
50        }
51
52        // Фильтруем сообщения на основе fn_output и отправляем исходящие сообщения
53        for (_key, msg) in msgs.into_iter() {
54            let msg = (self.fn_output)(msg);
55            let Some(msg) = msg else { continue };
56            self.output
57                .send(msg)
58                .await
59                .map_err(|e| super::Error::TokioSyncMpsc(e.to_string()))?;
60        }
61
62        while let Some(msg) = self.input.recv().await {
63            let msg = (self.fn_output)(msg);
64            let Some(msg) = msg else { continue };
65
66            self.output
67                .send(msg)
68                .await
69                .map_err(|e| super::Error::TokioSyncMpsc(e.to_string()))?;
70        }
71
72        Err(super::Error::TaskEndOutput)
73    }
74}