rsiot/components/cmp_surrealdb/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::{sync::Arc, time::Duration};

use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal};
use tokio::{sync::Mutex, task::JoinSet, time::sleep};
use tracing::{debug, error, info};

use crate::{
    executor::{CmpInOut, ComponentError},
    message::{MsgDataBound, ServiceBound},
};

use super::{tasks, Config, DbClient};

pub async fn fn_process<TMsg, TService>(
    input: CmpInOut<TMsg, TService>,
    config: Config<TMsg>,
) -> Result<(), ComponentError>
where
    TMsg: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    info!("Starting Surrealdb");
    loop {
        let result = task_main(input.clone(), &config).await;
        match result {
            Ok(_) => error!("SurrealDB stop execution"),
            Err(err) => error!("SurrealDB error: {err}"),
        }
        info!("Restarting...");
        sleep(Duration::from_secs(2)).await;
    }
}

async fn task_main<TMsg, TService>(
    input: CmpInOut<TMsg, TService>,
    config: &Config<TMsg>,
) -> super::Result<()>
where
    TMsg: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    let db = connect(config).await?;
    init_script(config, db.clone()).await?;

    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();

    for request_start_config in &config.request_start {
        let task = tasks::RequestStart {
            in_out: input.clone(),
            start_config: request_start_config.clone(),
            db_client: db.clone(),
        };
        task_set.spawn(task.spawn());
    }

    // Запросы на основе входящих сообщений
    for request_input_config in &config.request_input {
        let task = tasks::RequestInput {
            in_out: input.clone(),
            input_config: request_input_config.clone(),
            db_client: db.clone(),
        };
        task_set.spawn(task.spawn());
    }

    while let Some(res) = task_set.join_next().await {
        res??
    }
    Ok(())
}

/// Подключение к БД
async fn connect<TMsg>(config: &Config<TMsg>) -> super::Result<DbClient> {
    let url = format!("{}:{}", config.host, config.port);
    let db = Surreal::new::<Ws>(url).await?;

    let credentials = Root {
        username: &config.user,
        password: &config.password,
    };
    db.signin(credentials).await?;

    db.use_ns(config.namespace.clone())
        .use_db(config.database.clone())
        .await?;

    Ok(Arc::new(Mutex::new(db)))
}

/// Выполнение первоначального скрипта
async fn init_script<TMsg>(config: &Config<TMsg>, db: DbClient) -> super::Result<()> {
    debug!("Execute init script");
    let db = db.lock().await;
    db.query(config.init_script.clone()).await?;
    debug!("Init script completed");
    Ok(())
}