rsiot/components/cmp_mqtt_client/
fn_process.rs1use std::time::Duration;
2
3use rumqttc::{AsyncClient, MqttOptions};
4use tokio::task::JoinSet;
5use tracing::info;
6
7use crate::components_config::mqtt_client::ConfigPublish;
8use crate::executor::join_set_spawn;
9use crate::serde_utils::SerdeAlg;
10use crate::{executor::CmpInOut, message::MsgDataBound};
11
12use super::config::MqttMsgGen;
13use super::{tasks, Config};
14
15pub async fn fn_process<TMsg>(config: Config<TMsg>, msg_bus: CmpInOut<TMsg>) -> super::Result<()>
16where
17 TMsg: MsgDataBound + 'static,
18{
19 loop {
20 info!("Starting");
21
22 let mut mqttoptions = MqttOptions::new(&config.client_id, &config.host, config.port);
23 mqttoptions.set_keep_alive(Duration::from_secs(5));
24
25 let (client, eventloop) = AsyncClient::new(mqttoptions, config.client_capacity);
26
27 let mqtt_msg_gen = MqttMsgGen {
28 serde_alg: SerdeAlg::new(config.serde_alg),
29 };
30
31 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
32
33 if let ConfigPublish::Publish { fn_publish } = config.publish {
35 let task = tasks::Publish {
36 msg_bus: msg_bus.clone(),
37 fn_publish,
38 mqtt_msg_gen: mqtt_msg_gen.clone(),
39 client: client.clone(),
40 };
41 join_set_spawn(&mut task_set, task.spawn());
42 }
43
44 let task = tasks::Subscribe {
46 msg_bus: msg_bus.clone(),
47 eventloop,
48 client,
49 mqtt_msg_gen: mqtt_msg_gen.clone(),
50 subscribe: config.subscribe.clone(),
51 };
52 join_set_spawn(&mut task_set, task.spawn());
53
54 while let Some(res) = task_set.join_next().await {
55 res??
56 }
57 }
58}