rsiot/components/cmp_mqtt_client/tasks/
output.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use rumqttc::{Event, EventLoop, Packet};
use tracing::warn;

use crate::{
    components_config::mqtt_client::ConfigFnOutput,
    executor::CmpInOut,
    message::{MsgDataBound, ServiceBound},
};

pub struct Output<TMsg, TService>
where
    TMsg: MsgDataBound,
    TService: ServiceBound,
{
    pub in_out: CmpInOut<TMsg, TService>,
    pub config_fn_output: ConfigFnOutput<TMsg>,
    pub eventloop: EventLoop,
}

impl<TMsg, TService> Output<TMsg, TService>
where
    TMsg: MsgDataBound,
    TService: ServiceBound,
{
    pub async fn spawn(mut self) -> super::Result<()> {
        while let Ok(notification) = self.eventloop.poll().await {
            let msg = match notification {
                Event::Incoming(msg) => match msg {
                    Packet::Connect(_) => continue,
                    Packet::ConnAck(_) => continue,
                    Packet::Publish(msg) => msg,
                    Packet::PubAck(_) => continue,
                    Packet::PubRec(_) => continue,
                    Packet::PubRel(_) => continue,
                    Packet::PubComp(_) => continue,
                    Packet::Subscribe(_) => continue,
                    Packet::SubAck(_) => continue,
                    Packet::Unsubscribe(_) => continue,
                    Packet::UnsubAck(_) => continue,
                    Packet::PingReq => continue,
                    Packet::PingResp => continue,
                    Packet::Disconnect => continue,
                },
                Event::Outgoing(_) => continue,
            };
            let payload = msg.payload.to_vec();

            let msg = (self.config_fn_output)(&payload);

            // Ошибка выполнения fn_output
            let msg = match msg {
                Ok(msg) => msg,
                Err(err) => {
                    warn!("FnOutput: {err}");
                    continue;
                }
            };

            // Фильтруем сообщения
            let Some(msg) = msg else { continue };

            // Отправляем исходящее сообщение
            self.in_out
                .send_output(msg)
                .await
                .map_err(super::Error::CmpOutput)?;
        }
        Ok(())
    }
}