rsiot/components/cmp_derive/
fn_process.rs

1use tokio::task::JoinSet;
2
3use crate::executor::{join_set_spawn, CmpInOut};
4use crate::message::*;
5
6use super::{Config, DeriveItemProcess, Error};
7
8pub async fn fn_process<TMsg>(in_out: CmpInOut<TMsg>, config: Config<TMsg>) -> super::Result<()>
9where
10    TMsg: MsgDataBound + 'static,
11{
12    let mut task_set = JoinSet::new();
13
14    for item in config.derive_items {
15        join_set_spawn(
16            &mut task_set,
17            task_process_derive_item(in_out.clone(), item),
18        );
19    }
20
21    while let Some(res) = task_set.join_next().await {
22        res??
23    }
24    Ok(())
25}
26
27async fn task_process_derive_item<TMsg>(
28    mut in_out: CmpInOut<TMsg>,
29    mut derive_item: Box<dyn DeriveItemProcess<TMsg>>,
30) -> super::Result<()>
31where
32    TMsg: MsgDataBound,
33{
34    while let Ok(msg) = in_out.recv_input().await {
35        let msgs = derive_item.process(&msg);
36        let Some(msgs) = msgs else { continue };
37        for msg in msgs {
38            in_out
39                .send_output(msg)
40                .await
41                .map_err(|e| Error::TokioSynBroadcast(e.to_string()))?
42        }
43    }
44    Ok(())
45}