rsiot/components/cmp_mqtt_client/
fn_process.rs1use std::time::Duration;
2
3use rumqttc::{AsyncClient, MqttOptions};
4use tokio::task::JoinSet;
5use tracing::info;
6
7use crate::{
8 components::{cmp_mqtt_client::ConfigPublish, shared_tasks::cmp_mqtt_genral::MqttGeneralTasks},
9 executor::{MsgBusLinker, join_set_spawn},
10 message::MsgDataBound,
11 serde_utils::SerdeAlg,
12};
13
14use super::{Config, Error, config::MqttMsgGen, tasks};
15
16pub async fn fn_process<TMsg>(
17 config: Config<TMsg>,
18 msg_bus: MsgBusLinker<TMsg>,
19) -> super::Result<()>
20where
21 TMsg: MsgDataBound + 'static,
22{
23 info!("Starting");
24
25 let buffer_size = msg_bus.max_capacity();
26
27 let mut mqttoptions = MqttOptions::new(&config.client_id, &config.host, config.port);
28 mqttoptions.set_keep_alive(Duration::from_secs(5));
29
30 let (client, eventloop) = AsyncClient::new(mqttoptions, config.client_capacity);
31
32 let base_topic = if let ConfigPublish::Publish {
33 base_topic: topic_part,
34 ..
35 } = &config.publish
36 {
37 topic_part.clone()
38 } else {
39 "".to_string()
40 };
41 let mqtt_msg_gen = MqttMsgGen {
42 serde_alg: SerdeAlg::new(config.serde_alg),
43 base_topic,
44 };
45
46 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
47
48 let (ch_rx_send, ch_tx_recv) = MqttGeneralTasks {
49 msg_bus,
50 buffer_size,
51 task_set: &mut task_set,
52 publish: config.publish,
53 subscribe: config.subscribe,
54 mqtt_msg_gen,
55 error_fn_publish: Error::FnPublish,
56 error_fn_subscribe: Error::FnSubscribe,
57 error_task_end_input: || Error::TaskEndInput,
58 error_task_end_output: || Error::TaskEndOutput,
59 error_tokio_mpsc_send: || Error::TokioSyncMpscSend,
60 }
61 .spawn();
62
63 let task = tasks::MqttSend {
65 input: ch_rx_send,
66 client: client.clone(),
67 };
68 join_set_spawn(&mut task_set, "cmp_mqtt_client | mqtt_send", task.spawn());
69
70 let task = tasks::MqttRecv {
72 output: ch_tx_recv,
73 eventloop,
74 };
75 join_set_spawn(&mut task_set, "cmp_mqtt_client", task.spawn());
76
77 while let Some(res) = task_set.join_next().await {
78 res??
79 }
80
81 Err(Error::TaskEndMain)
82}