rsiot/components/shared_tasks/
fn_process_master.rsuse futures::TryFutureExt;
use tokio::{
sync::{broadcast, mpsc},
task::JoinSet,
};
use crate::{
components_config::master_device::{self, AddressBound, DeviceTrait, RequestResponseBound},
executor::{join_set_spawn, CmpInOut},
message::{Message, MsgDataBound, ServiceBound},
};
use super::{filter_identical_data, mpsc_to_msgbus, msgbus_to_broadcast};
pub struct FnProcessMaster<
'a,
TMsg,
TService,
TError,
TFieldbusRequest,
TFieldbusResponse,
TAddress,
> where
TMsg: MsgDataBound + 'static,
TService: ServiceBound + 'static,
TError: Send + Sync + 'static,
TAddress: AddressBound,
{
pub msg_bus: CmpInOut<TMsg, TService>,
pub buffer_size: usize,
pub task_set: &'a mut JoinSet<Result<(), TError>>,
pub error_msgbus_to_broadcast: fn(msgbus_to_broadcast::Error) -> TError,
pub error_filter: fn(filter_identical_data::Error) -> TError,
pub error_mpsc_to_msgbus: fn(mpsc_to_msgbus::Error) -> TError,
pub error_master_device: fn(master_device::Error) -> TError,
pub devices: Vec<Box<dyn DeviceTrait<TMsg, TFieldbusRequest, TFieldbusResponse, TAddress>>>,
}
impl<TMsg, TService, TError, TFieldbusRequest, TFieldbusResponse, TAddress>
FnProcessMaster<'_, TMsg, TService, TError, TFieldbusRequest, TFieldbusResponse, TAddress>
where
TMsg: MsgDataBound + 'static,
TService: ServiceBound + 'static,
TError: Send + Sync + 'static,
TFieldbusRequest: RequestResponseBound<TAddress> + 'static,
TFieldbusResponse: RequestResponseBound<TAddress> + 'static,
TAddress: 'static + AddressBound,
{
pub fn spawn(
self,
) -> (
mpsc::Receiver<TFieldbusRequest>,
broadcast::Sender<TFieldbusResponse>,
) {
let (ch_tx_msgbus_to_devices, ch_rx_msgbus_to_devices) =
broadcast::channel::<Message<TMsg>>(self.buffer_size);
let (ch_tx_devices_to_fieldbus, ch_rx_devices_to_fieldbus) =
mpsc::channel::<TFieldbusRequest>(self.buffer_size);
let (ch_tx_fieldbus_to_devices, ch_rx_fieldbus_to_devices) =
broadcast::channel::<TFieldbusResponse>(self.buffer_size);
let (ch_tx_devices_to_filter, ch_rx_devices_to_filter) =
mpsc::channel::<Message<TMsg>>(self.buffer_size);
let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) =
mpsc::channel::<Message<TMsg>>(self.buffer_size);
let task = msgbus_to_broadcast::MsgBusToBroadcast {
msg_bus: self.msg_bus.clone(),
output: ch_tx_msgbus_to_devices,
};
join_set_spawn(
self.task_set,
task.spawn().map_err(self.error_msgbus_to_broadcast),
);
for device in self.devices {
let ch_rx_msgbus_to_devices = ch_rx_msgbus_to_devices.resubscribe();
let ch_tx_device_to_fieldbus = ch_tx_devices_to_fieldbus.clone();
let ch_rx_fieldbus_to_device = ch_rx_fieldbus_to_devices.resubscribe();
let ch_tx_devices_to_filter = ch_tx_devices_to_filter.clone();
join_set_spawn(
self.task_set,
device
.spawn(
ch_rx_msgbus_to_devices,
ch_tx_device_to_fieldbus,
ch_rx_fieldbus_to_device,
ch_tx_devices_to_filter,
)
.map_err(self.error_master_device),
);
}
let task = filter_identical_data::FilterIdenticalData {
input: ch_rx_devices_to_filter,
output: ch_tx_filter_to_msgbus,
};
join_set_spawn(self.task_set, task.spawn().map_err(self.error_filter));
let task = mpsc_to_msgbus::MpscToMsgBus {
input: ch_rx_filter_to_msgbus,
msg_bus: self.msg_bus.clone(),
};
join_set_spawn(
self.task_set,
task.spawn().map_err(self.error_mpsc_to_msgbus),
);
(ch_rx_devices_to_fieldbus, ch_tx_fieldbus_to_devices)
}
}