Skip to main content

rsiot/components/cmp_mqtt_client/
fn_process.rs

1use std::time::Duration;
2
3use rumqttc::{AsyncClient, MqttOptions};
4use tokio::task::JoinSet;
5use tracing::info;
6
7use crate::{
8    components::{cmp_mqtt_client::ConfigPublish, shared_tasks::cmp_mqtt_genral::MqttGeneralTasks},
9    executor::{MsgBusLinker, join_set_spawn},
10    message::MsgDataBound,
11    serde_utils::SerdeAlg,
12};
13
14use super::{Config, Error, config::MqttMsgGen, tasks};
15
16pub async fn fn_process<TMsg>(
17    config: Config<TMsg>,
18    msg_bus: MsgBusLinker<TMsg>,
19) -> super::Result<()>
20where
21    TMsg: MsgDataBound + 'static,
22{
23    info!("Starting");
24
25    let buffer_size = msg_bus.max_capacity();
26
27    let mut mqttoptions = MqttOptions::new(&config.client_id, &config.host, config.port);
28    mqttoptions.set_keep_alive(Duration::from_secs(5));
29
30    let (client, eventloop) = AsyncClient::new(mqttoptions, config.client_capacity);
31
32    let base_topic = if let ConfigPublish::Publish {
33        base_topic: topic_part,
34        ..
35    } = &config.publish
36    {
37        topic_part.clone()
38    } else {
39        "".to_string()
40    };
41    let mqtt_msg_gen = MqttMsgGen {
42        serde_alg: SerdeAlg::new(config.serde_alg),
43        base_topic,
44    };
45
46    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
47
48    let (ch_rx_send, ch_tx_recv) = MqttGeneralTasks {
49        msg_bus,
50        buffer_size,
51        task_set: &mut task_set,
52        publish: config.publish,
53        subscribe: config.subscribe,
54        mqtt_msg_gen,
55        error_fn_publish: Error::FnPublish,
56        error_fn_subscribe: Error::FnSubscribe,
57        error_task_end_input: || Error::TaskEndInput,
58        error_task_end_output: || Error::TaskEndOutput,
59        error_tokio_mpsc_send: || Error::TokioSyncMpscSend,
60    }
61    .spawn();
62
63    // Отправление входящих сообщений на MQTT-брокер
64    let task = tasks::MqttSend {
65        input: ch_rx_send,
66        client: client.clone(),
67    };
68    join_set_spawn(&mut task_set, "cmp_mqtt_client | mqtt_send", task.spawn());
69
70    // Получение сообщения от MQTT-брокера
71    let task = tasks::MqttRecv {
72        output: ch_tx_recv,
73        eventloop,
74    };
75    join_set_spawn(&mut task_set, "cmp_mqtt_client", task.spawn());
76
77    while let Some(res) = task_set.join_next().await {
78        res??
79    }
80
81    Err(Error::TaskEndMain)
82}