rsiot/components/cmp_esp_i2c_slave/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::{fmt::Debug, sync::Arc};

use esp_idf_svc::hal::{
    i2c::{I2c, I2cSlaveConfig, I2cSlaveDriver},
    peripheral::Peripheral,
};
use futures::TryFutureExt;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
    sync::{mpsc, Mutex},
    task::JoinSet,
};
use tracing::debug;

use crate::{
    components::{cmp_esp_i2c_slave::tasks, shared_tasks},
    executor::{join_set_spawn, CmpInOut},
    message::{MsgDataBound, ServiceBound},
};

use super::{BufferData, Config, Error};

/// Размер буферов приема и отправки
const BUFFER_LEN: usize = 128;

pub async fn fn_process<TMsg, TI2c, TPeripheral, TI2cRequest, TI2cResponse, TBufferData, TService>(
    config: Config<TMsg, TI2c, TPeripheral, TI2cRequest, TI2cResponse, TBufferData>,
    msg_bus: CmpInOut<TMsg, TService>,
) -> super::Result<()>
where
    TMsg: MsgDataBound + 'static,
    TI2c: Peripheral<P = TPeripheral> + 'static,
    TPeripheral: I2c,
    TI2cRequest: Debug + DeserializeOwned + 'static,
    TI2cResponse: Debug + Serialize + 'static,
    TBufferData: BufferData + 'static,
    TService: ServiceBound + 'static,
{
    let i2c_idf_config = I2cSlaveConfig::new()
        .sda_enable_pullup(false)
        .scl_enable_pullup(false)
        .tx_buffer_length(BUFFER_LEN)
        .rx_buffer_length(BUFFER_LEN);
    let i2c_slave = I2cSlaveDriver::new(
        config.i2c,
        config.sda,
        config.scl,
        config.slave_address,
        &i2c_idf_config,
    )
    .unwrap();

    let buffer_data = Arc::new(Mutex::new(config.buffer_data_default.clone()));

    debug!("I2c slave drive initialized");

    let buffer_size = msg_bus.max_capacity();
    let (channel_buffer_to_filter_send, channel_buffer_to_filter_recv) = mpsc::channel(buffer_size);
    let (channel_filter_to_output_send, channel_filter_to_output_recv) = mpsc::channel(buffer_size);

    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();

    // Задача коммуникации I2C
    let task = tasks::I2cComm {
        i2c_slave,
        fn_i2c_comm: config.fn_i2c_comm,
        buffer_data: buffer_data.clone(),
        start_delay: config.start_i2ccomm_delay,
    };
    task_set.spawn_blocking(move || task.spawn());

    // Задача обработки входящих сообщений
    let task = tasks::Input {
        msg_bus: msg_bus.clone(),
        fn_input: config.fn_input,
        buffer_data: buffer_data.clone(),
    };
    join_set_spawn(&mut task_set, task.spawn());

    // Задача создания исходящих сообщений
    let task = tasks::Output {
        output: channel_buffer_to_filter_send,
        fn_output: config.fn_output,
        fn_output_period: config.fn_output_period,
        buffer_data: buffer_data.clone(),
    };
    join_set_spawn(&mut task_set, task.spawn());

    // Фильтрация исходящих сообщений
    let task = shared_tasks::filter_identical_data::FilterIdenticalData {
        input: channel_buffer_to_filter_recv,
        output: channel_filter_to_output_send,
    };
    join_set_spawn(
        &mut task_set,
        task.spawn().map_err(Error::TaskFilterIdenticalData),
    );

    // Пересылка сообщений на выход компонента
    let task = shared_tasks::mpsc_to_msgbus::MpscToMsgBus {
        input: channel_filter_to_output_recv,
        msg_bus: msg_bus.clone(),
    };
    join_set_spawn(&mut task_set, task.spawn().map_err(Error::TaskToMsgBus));

    while let Some(res) = task_set.join_next().await {
        res.unwrap()?;
    }

    Ok(())
}