rsiot/components/cmp_add_input_stream/
mod.rs1use async_trait::async_trait;
4use tokio::sync::broadcast;
5
6use crate::{
7 executor::{Component, ComponentError, IComponentProcess, MsgBusLinker},
8 message::*,
9};
10
11pub const CMP_NAME: &str = "cmp_add_input_stream";
13
14#[derive(Debug)]
16pub struct Config<TMessage> {
17 pub channel: broadcast::Receiver<Message<TMessage>>,
19}
20
21#[cfg_attr(not(feature = "single-thread"), async_trait)]
23#[cfg_attr(feature = "single-thread", async_trait(?Send))]
24impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
25where
26 TMsg: MsgDataBound + 'static,
27{
28 async fn process(
29 &self,
30 mut config: Config<TMsg>,
31 msg_bus: MsgBusLinker<TMsg>,
32 ) -> Result<(), ComponentError> {
33 let output = msg_bus.init(CMP_NAME).output();
34
35 while let Ok(msg) = config.channel.recv().await {
36 output
37 .send(msg)
38 .await
39 .map_err(|err| ComponentError::Execution(err.to_string()))?;
40 }
41 Ok(())
42 }
43}
44
45pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;