rsiot/components/cmp_mqtt_client/
fn_process.rs

1use std::time::Duration;
2
3use rumqttc::{AsyncClient, MqttOptions};
4use tokio::task::JoinSet;
5use tracing::info;
6
7use crate::components_config::mqtt_client::ConfigPublish;
8use crate::executor::join_set_spawn;
9use crate::serde_utils::SerdeAlg;
10use crate::{executor::CmpInOut, message::MsgDataBound};
11
12use super::config::MqttMsgGen;
13use super::{tasks, Config};
14
15pub async fn fn_process<TMsg>(config: Config<TMsg>, msg_bus: CmpInOut<TMsg>) -> super::Result<()>
16where
17    TMsg: MsgDataBound + 'static,
18{
19    loop {
20        info!("Starting");
21
22        let mut mqttoptions = MqttOptions::new(&config.client_id, &config.host, config.port);
23        mqttoptions.set_keep_alive(Duration::from_secs(5));
24
25        let (client, eventloop) = AsyncClient::new(mqttoptions, config.client_capacity);
26
27        let mqtt_msg_gen = MqttMsgGen {
28            serde_alg: SerdeAlg::new(config.serde_alg),
29        };
30
31        let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
32
33        // Отправление входящих сообщений на MQTT-брокер
34        if let ConfigPublish::Publish { fn_publish } = config.publish {
35            let task = tasks::Publish {
36                msg_bus: msg_bus.clone(),
37                fn_publish,
38                mqtt_msg_gen: mqtt_msg_gen.clone(),
39                client: client.clone(),
40            };
41            join_set_spawn(&mut task_set, task.spawn());
42        }
43
44        // Получение сообщения от MQTT-брокера
45        let task = tasks::Subscribe {
46            msg_bus: msg_bus.clone(),
47            eventloop,
48            client,
49            mqtt_msg_gen: mqtt_msg_gen.clone(),
50            subscribe: config.subscribe.clone(),
51        };
52        join_set_spawn(&mut task_set, task.spawn());
53
54        while let Some(res) = task_set.join_next().await {
55            res??
56        }
57    }
58}