rsiot/components/cmp_surrealdb/
fn_process.rs

1use std::{sync::Arc, time::Duration};
2
3use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal};
4use tokio::{sync::Mutex, task::JoinSet, time::sleep};
5use tracing::{debug, error, info};
6
7use crate::{
8    executor::{CmpInOut, ComponentError},
9    message::MsgDataBound,
10};
11
12use super::{tasks, Config, DbClient};
13
14pub async fn fn_process<TMsg>(
15    input: CmpInOut<TMsg>,
16    config: Config<TMsg>,
17) -> Result<(), ComponentError>
18where
19    TMsg: MsgDataBound + 'static,
20{
21    info!("Starting Surrealdb");
22    loop {
23        let result = task_main(input.clone(), &config).await;
24        match result {
25            Ok(_) => error!("SurrealDB stop execution"),
26            Err(err) => error!("SurrealDB error: {err}"),
27        }
28        info!("Restarting...");
29        sleep(Duration::from_secs(2)).await;
30    }
31}
32
33async fn task_main<TMsg>(input: CmpInOut<TMsg>, config: &Config<TMsg>) -> super::Result<()>
34where
35    TMsg: MsgDataBound + 'static,
36{
37    let db = connect(config).await?;
38    init_script(config, db.clone()).await?;
39
40    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
41
42    for request_start_config in &config.request_start {
43        let task = tasks::RequestStart {
44            in_out: input.clone(),
45            start_config: request_start_config.clone(),
46            db_client: db.clone(),
47        };
48        task_set.spawn(task.spawn());
49    }
50
51    // Запросы на основе входящих сообщений
52    for request_input_config in &config.request_input {
53        let task = tasks::RequestInput {
54            in_out: input.clone(),
55            input_config: request_input_config.clone(),
56            db_client: db.clone(),
57        };
58        task_set.spawn(task.spawn());
59    }
60
61    while let Some(res) = task_set.join_next().await {
62        res??
63    }
64    Ok(())
65}
66
67/// Подключение к БД
68async fn connect<TMsg>(config: &Config<TMsg>) -> super::Result<DbClient> {
69    let url = format!("{}:{}", config.host, config.port);
70    let db = Surreal::new::<Ws>(url).await?;
71
72    let credentials = Root {
73        username: &config.user,
74        password: &config.password,
75    };
76    db.signin(credentials).await?;
77
78    db.use_ns(config.namespace.clone())
79        .use_db(config.database.clone())
80        .await?;
81
82    Ok(Arc::new(Mutex::new(db)))
83}
84
85/// Выполнение первоначального скрипта
86async fn init_script<TMsg>(config: &Config<TMsg>, db: DbClient) -> super::Result<()> {
87    debug!("Execute init script");
88    let db = db.lock().await;
89    db.query(config.init_script.clone()).await?;
90    debug!("Init script completed");
91    Ok(())
92}