rsiot/components_config/master_device/device/tasks/
response.rs

1use tokio::sync::mpsc;
2use tracing::{trace, warn};
3
4use crate::message::{Message, MsgDataBound};
5
6use super::{Buffer, RequestResponseBound};
7
8pub struct Response<TMsg, TResponse, TBuffer> {
9    pub buffer: Buffer<TBuffer>,
10    pub ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
11    pub ch_tx_output_to_filter: mpsc::Sender<Message<TMsg>>,
12    pub fn_response_to_buffer: fn(TResponse, &mut TBuffer) -> anyhow::Result<()>,
13    pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<Message<TMsg>>,
14}
15
16impl<TMsg, TResponse, TBuffer> Response<TMsg, TResponse, TBuffer>
17where
18    TResponse: RequestResponseBound,
19    TMsg: MsgDataBound,
20{
21    pub async fn spawn(mut self) -> super::Result<()> {
22        while let Some(response) = self.ch_rx_fieldbus_to_device.recv().await {
23            trace!("Response: {:?}", response);
24
25            let mut buffer = self.buffer.lock().await;
26            let result = (self.fn_response_to_buffer)(response, &mut buffer);
27
28            if let Err(err) = result {
29                warn!("Error in fn_response_to_buffer: {:?}", err);
30            }
31
32            let msgs = (self.fn_buffer_to_msgs)(&mut buffer);
33            drop(buffer);
34
35            for msg in msgs {
36                self.ch_tx_output_to_filter.send(msg).await.unwrap();
37            }
38        }
39
40        Ok(())
41    }
42}