rsiot/components/cmp_slint/
fn_process.rs1use std::time::Duration;
2
3use futures::TryFutureExt;
4use slint::ComponentHandle;
5use tokio::{sync::mpsc, task::JoinSet};
6
7use crate::{
8 components::shared_tasks,
9 executor::{join_set_spawn, CmpInOut},
10 message::MsgDataBound,
11};
12
13use super::{tasks, Config, Error, Result};
14
15pub async fn fn_process<TMainWindow, TMsg>(
16 config: Config<TMsg, TMainWindow>,
17 msg_bus: CmpInOut<TMsg>,
18) -> Result<()>
19where
20 TMsg: MsgDataBound + 'static,
21 TMainWindow: ComponentHandle + 'static,
22{
23 let (ch_tx_msgbus_to_input, ch_rx_msgbus_to_input) = mpsc::channel(1000);
24 let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel(1000);
25 let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) = mpsc::channel(1000);
26
27 let mut task_set = JoinSet::new();
28
29 let task = shared_tasks::msgbus_to_mpsc::MsgBusToMpsc {
31 msg_bus: msg_bus.clone(),
32 output: ch_tx_msgbus_to_input,
33 };
34 join_set_spawn(&mut task_set, task.spawn().map_err(Error::TaskMsgBusToMpsc));
35
36 let task = tasks::Input {
38 input: ch_rx_msgbus_to_input,
39 slint_window: config.slint_window.clone(),
40 fn_input: config.fn_input,
41 };
42 join_set_spawn(&mut task_set, task.spawn());
43
44 let task = tasks::Output {
46 output: ch_tx_output_to_filter,
47 slint_window: config.slint_window.clone(),
48 fn_output: config.fn_output,
49 };
50 join_set_spawn(&mut task_set, task.spawn());
51
52 let task = shared_tasks::filter_send_periodically::FilterSendPeriodically {
54 input: ch_rx_output_to_filter,
55 output: ch_tx_filter_to_msgbus,
56 period: Duration::from_millis(100),
57 };
58 join_set_spawn(
59 &mut task_set,
60 task.spawn().map_err(Error::TaskFilterSendPeriodically),
61 );
62
63 let task = shared_tasks::mpsc_to_msgbus::MpscToMsgBus {
65 input: ch_rx_filter_to_msgbus,
66 msg_bus: msg_bus.clone(),
67 };
68 join_set_spawn(&mut task_set, task.spawn().map_err(Error::TaskMpscToMsgBus));
69
70 while let Some(res) = task_set.join_next().await {
71 res??;
72 }
73 Ok(())
74}