rsiot/components/cmp_derive/
fn_process.rs1use tokio::task::JoinSet;
2
3use crate::executor::{join_set_spawn, CmpInOut};
4use crate::message::*;
5
6use super::{Config, DeriveItemProcess, Error};
7
8pub async fn fn_process<TMsg>(in_out: CmpInOut<TMsg>, config: Config<TMsg>) -> super::Result<()>
9where
10 TMsg: MsgDataBound + 'static,
11{
12 let mut task_set = JoinSet::new();
13
14 for item in config.derive_items {
15 join_set_spawn(
16 &mut task_set,
17 task_process_derive_item(in_out.clone(), item),
18 );
19 }
20
21 while let Some(res) = task_set.join_next().await {
22 res??
23 }
24 Ok(())
25}
26
27async fn task_process_derive_item<TMsg>(
28 mut in_out: CmpInOut<TMsg>,
29 mut derive_item: Box<dyn DeriveItemProcess<TMsg>>,
30) -> super::Result<()>
31where
32 TMsg: MsgDataBound,
33{
34 while let Ok(msg) = in_out.recv_input().await {
35 let msgs = derive_item.process(&msg);
36 let Some(msgs) = msgs else { continue };
37 for msg in msgs {
38 in_out
39 .send_output(msg)
40 .await
41 .map_err(|e| Error::TokioSynBroadcast(e.to_string()))?
42 }
43 }
44 Ok(())
45}