rsiot/components/cmp_esp_nvs/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::fmt::Debug;

use esp_idf_svc::nvs::{EspDefaultNvsPartition, EspNvs, EspNvsPartition, NvsDefault};
use postcard::{from_bytes, to_stdvec};
use serde::{de::DeserializeOwned, Serialize};
use tracing::{debug, info, warn};

use crate::{
    executor::{CmpInOut, ComponentError},
    message::{MsgDataBound, ServiceBound},
};

use super::{config::Config, error::Error};

type Result<T> = std::result::Result<T, Error>;

pub async fn fn_process<TMessage, TService, TStorageData>(
    input: CmpInOut<TMessage, TService>,
    config: Config<TMessage, TStorageData>,
) -> std::result::Result<(), ComponentError>
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
    TStorageData: Debug + Default + DeserializeOwned + PartialEq + Serialize + 'static,
{
    info!("Starting cmp_storage_esp component");
    task_main(input, config).await.map_err(|err| {
        let err = err.to_string();
        ComponentError::Execution(err)
    })
}

async fn task_main<TMessage, TService, TStorageData>(
    msg_bus: CmpInOut<TMessage, TService>,
    config: Config<TMessage, TStorageData>,
) -> Result<()>
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
    TStorageData: Debug + Default + DeserializeOwned + PartialEq + Serialize + 'static,
{
    let nvs_default_partition: EspNvsPartition<NvsDefault> =
        EspDefaultNvsPartition::take().map_err(Error::TakePartition)?;

    let test_namespace = "test_ns";
    let mut nvs = match EspNvs::new(nvs_default_partition, test_namespace, true) {
        Ok(nvs) => {
            info!("Got namespace {:?} from default partition", test_namespace);
            nvs
        }
        Err(e) => panic!("Could't get namespace {:?}", e),
    };

    let data = load_data(&mut nvs)?;
    let msgs = (config.fn_output)(&data);
    for msg in msgs {
        msg_bus
            .send_output(msg)
            .await
            .map_err(|e| Error::SendChannel(e.to_string()))?;
    }

    task_input(msg_bus, config, nvs, data).await?;
    Ok(())
}

async fn task_input<TMessage, TService, TStorageData>(
    mut msg_bus: CmpInOut<TMessage, TService>,
    config: Config<TMessage, TStorageData>,
    mut nvs: EspNvs<NvsDefault>,
    data: TStorageData,
) -> Result<()>
where
    TMessage: MsgDataBound,
    TService: ServiceBound,
    TStorageData: Debug + Default + DeserializeOwned + PartialEq + Serialize,
{
    let mut data = data;
    while let Ok(msg) = msg_bus.recv_input().await {
        let new_data = (config.fn_input)(&data, &msg);
        let Some(new_data) = new_data else { continue };
        if new_data == data {
            continue;
        }
        data = new_data;
        save_data(&mut nvs, &data)?;

        let data = load_data(&mut nvs)?;
        let msgs = (config.fn_output)(&data);
        for msg in msgs {
            msg_bus
                .send_output(msg)
                .await
                .map_err(|e| Error::SendChannel(e.to_string()))?;
        }
    }
    Ok(())
}

fn load_data<TStorageData>(nvs: &mut EspNvs<NvsDefault>) -> Result<TStorageData>
where
    TStorageData: Debug + Default + DeserializeOwned + Serialize,
{
    let data_bytes: &mut [u8] = &mut [0; 1024];
    let data = nvs
        .get_raw("data", data_bytes)
        .map_err(Error::ReadFromEsp)?;

    match data {
        Some(data) => {
            let data = from_bytes(data);
            match data {
                Ok(data) => {
                    info!("Data from storage loaded: {:?}", data);
                    Ok(data)
                }
                Err(err) => {
                    let data = TStorageData::default();
                    warn!(
                        "Error deserialization data from storage: {:?}; load default: {:?}",
                        err, data
                    );
                    save_data(nvs, &data)?;
                    Ok(data)
                }
            }
        }
        None => {
            let data = TStorageData::default();
            warn!("Storage empty, generate default: {:?}", data);
            save_data(nvs, &data)?;
            Ok(data)
        }
    }
}

fn save_data<TStorageData>(nvs: &mut EspNvs<NvsDefault>, data: &TStorageData) -> Result<()>
where
    TStorageData: Debug + Default + DeserializeOwned + Serialize,
{
    let data = to_stdvec::<TStorageData>(data)?;
    nvs.set_raw("data", &data).map_err(Error::SaveToEsp)?;
    debug!("Data saved to storage");
    Ok(())
}