rsiot/components/cmp_surrealdb/
fn_process.rs1use std::{sync::Arc, time::Duration};
2
3use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal};
4use tokio::{sync::Mutex, task::JoinSet, time::sleep};
5use tracing::{debug, error, info};
6
7use crate::{
8 executor::{CmpInOut, ComponentError},
9 message::MsgDataBound,
10};
11
12use super::{tasks, Config, DbClient};
13
14pub async fn fn_process<TMsg>(
15 input: CmpInOut<TMsg>,
16 config: Config<TMsg>,
17) -> Result<(), ComponentError>
18where
19 TMsg: MsgDataBound + 'static,
20{
21 info!("Starting Surrealdb");
22 loop {
23 let result = task_main(input.clone(), &config).await;
24 match result {
25 Ok(_) => error!("SurrealDB stop execution"),
26 Err(err) => error!("SurrealDB error: {err}"),
27 }
28 info!("Restarting...");
29 sleep(Duration::from_secs(2)).await;
30 }
31}
32
33async fn task_main<TMsg>(input: CmpInOut<TMsg>, config: &Config<TMsg>) -> super::Result<()>
34where
35 TMsg: MsgDataBound + 'static,
36{
37 let db = connect(config).await?;
38 init_script(config, db.clone()).await?;
39
40 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
41
42 for request_start_config in &config.request_start {
43 let task = tasks::RequestStart {
44 in_out: input.clone(),
45 start_config: request_start_config.clone(),
46 db_client: db.clone(),
47 };
48 task_set.spawn(task.spawn());
49 }
50
51 for request_input_config in &config.request_input {
53 let task = tasks::RequestInput {
54 in_out: input.clone(),
55 input_config: request_input_config.clone(),
56 db_client: db.clone(),
57 };
58 task_set.spawn(task.spawn());
59 }
60
61 while let Some(res) = task_set.join_next().await {
62 res??
63 }
64 Ok(())
65}
66
67async fn connect<TMsg>(config: &Config<TMsg>) -> super::Result<DbClient> {
69 let url = format!("{}:{}", config.host, config.port);
70 let db = Surreal::new::<Ws>(url).await?;
71
72 let credentials = Root {
73 username: &config.user,
74 password: &config.password,
75 };
76 db.signin(credentials).await?;
77
78 db.use_ns(config.namespace.clone())
79 .use_db(config.database.clone())
80 .await?;
81
82 Ok(Arc::new(Mutex::new(db)))
83}
84
85async fn init_script<TMsg>(config: &Config<TMsg>, db: DbClient) -> super::Result<()> {
87 debug!("Execute init script");
88 let db = db.lock().await;
89 db.query(config.init_script.clone()).await?;
90 debug!("Init script completed");
91 Ok(())
92}