rsiot/components/cmp_webstorage/tasks/
output.rs1use std::collections::HashMap;
2
3use gloo::storage::{LocalStorage, SessionStorage, Storage};
4use tracing::warn;
5
6use crate::message::{Message, MsgDataBound};
7
8use super::{
9 super::{config::FnOutput, ConfigStorageKind},
10 TaskInput, TaskOutput,
11};
12
13pub struct Output<TMsg> {
14 pub input: TaskInput<TMsg>,
15 pub output: TaskOutput<TMsg>,
16 pub storage_kind: ConfigStorageKind,
17 pub default_messages: Vec<Message<TMsg>>,
18 pub fn_output: FnOutput<TMsg>,
19}
20
21impl<TMsg> Output<TMsg>
22where
23 TMsg: MsgDataBound,
24{
25 pub async fn spawn(mut self) -> super::Result<()> {
26 let msgs: Result<HashMap<String, Message<TMsg>>, _> = match self.storage_kind {
28 ConfigStorageKind::LocalStorage => LocalStorage::get_all(),
29 ConfigStorageKind::SessionStorage => SessionStorage::get_all(),
30 };
31
32 let mut msgs = match msgs {
33 Ok(val) => val,
34 Err(err) => {
35 warn!("Error loading messages: {}", err);
36 match self.storage_kind {
37 ConfigStorageKind::LocalStorage => LocalStorage::clear(),
38 ConfigStorageKind::SessionStorage => SessionStorage::clear(),
39 }
40 warn!("Storage {:?} cleared", self.storage_kind);
41 HashMap::new()
42 }
43 };
44
45 for msg in self.default_messages {
47 if !msgs.contains_key(&msg.key) {
48 msgs.insert(msg.key.clone(), msg);
49 }
50 }
51
52 for (_key, msg) in msgs.into_iter() {
54 let msg = (self.fn_output)(msg);
55 let Some(msg) = msg else { continue };
56 self.output
57 .send(msg)
58 .await
59 .map_err(|e| super::Error::TokioSyncMpsc(e.to_string()))?;
60 }
61
62 while let Some(msg) = self.input.recv().await {
63 let msg = (self.fn_output)(msg);
64 let Some(msg) = msg else { continue };
65
66 self.output
67 .send(msg)
68 .await
69 .map_err(|e| super::Error::TokioSyncMpsc(e.to_string()))?;
70 }
71
72 Err(super::Error::TaskEndOutput)
73 }
74}