Skip to main content

rsiot/components/cmp_add_input_stream/
mod.rs

1//! Компонент для добавления сообщений из побочного потока
2
3use async_trait::async_trait;
4use tokio::sync::broadcast;
5
6use crate::{
7    executor::{Component, ComponentError, IComponentProcess, MsgBusLinker},
8    message::*,
9};
10
11/// Название компонента
12pub const CMP_NAME: &str = "cmp_add_input_stream";
13
14/// Настройки компонента cmp_add_input_stream
15#[derive(Debug)]
16pub struct Config<TMessage> {
17    /// Внешний канал broadcast, на который происходит подписка
18    pub channel: broadcast::Receiver<Message<TMessage>>,
19}
20
21/// Компонент для добавления сообщений из побочного потока
22#[cfg_attr(not(feature = "single-thread"), async_trait)]
23#[cfg_attr(feature = "single-thread", async_trait(?Send))]
24impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
25where
26    TMsg: MsgDataBound + 'static,
27{
28    async fn process(
29        &self,
30        mut config: Config<TMsg>,
31        msg_bus: MsgBusLinker<TMsg>,
32    ) -> Result<(), ComponentError> {
33        let output = msg_bus.init(CMP_NAME).output();
34
35        while let Ok(msg) = config.channel.recv().await {
36            output
37                .send(msg)
38                .await
39                .map_err(|err| ComponentError::Execution(err.to_string()))?;
40        }
41        Ok(())
42    }
43}
44
45/// Компонент cmp_add_input_stream
46pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;