rsiot/components/cmp_esp_mqtt_client/tasks/
input.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
use std::time::Duration;

use esp_idf_svc::mqtt::client::{EspAsyncMqttClient, QoS};
use tokio::time::sleep;
use tracing::{info, warn};

use crate::{
    components::shared_mqtt_client::{create_payload_for_message, create_topic_for_message},
    components_config::mqtt_client::ConfigFnInput,
    executor::CmpInOut,
    message::{MsgDataBound, ServiceBound},
};

pub struct Input<TMsg, TService>
where
    TMsg: MsgDataBound,
    TService: ServiceBound,
{
    pub in_out: CmpInOut<TMsg, TService>,
    pub config_fn_input: ConfigFnInput<TMsg>,
    pub client: EspAsyncMqttClient,
}

impl<TMsg, TService> Input<TMsg, TService>
where
    TMsg: MsgDataBound,
    TService: ServiceBound,
{
    pub async fn spawn(mut self) -> super::Result<()> {
        let topic = "rsiot/#";
        loop {
            info!("MQTT client: trying to subscribe to topic");
            let res = self.client.subscribe(topic, QoS::ExactlyOnce).await;
            match res {
                Ok(_) => break,
                Err(err) => {
                    warn!("{}", err);
                }
            }
            sleep(Duration::from_secs(5)).await;
        }
        info!("MQTT client subscribed to topic");

        while let Ok(msg) = self.in_out.recv_input().await {
            let topic = create_topic_for_message(&msg);

            let payload = create_payload_for_message(&msg, self.config_fn_input);
            let Some(payload) = payload else { continue };

            self.client
                .publish(&topic, QoS::ExactlyOnce, true, &payload)
                .await
                .unwrap();
        }
        Ok(())
    }
}