1 重构rtsa_mqtt库,添加路由请求响应功能

2 修改库命名rtss->rtsa
This commit is contained in:
soul-walker 2024-10-31 15:12:26 +08:00
parent 2e1eecfdad
commit 505230b26c
91 changed files with 1148 additions and 1987 deletions

View File

@ -50,7 +50,7 @@ jobs:
file: ./Dockerfile
push: true
tags: |
gitea.joylink.club/joylink/rtss-simulation:local-test
gitea.joylink.club/joylink/rtsam:lt
- name: 发布到本地测试环境
uses: https://gitea.joylink.club/appleboy/ssh-action@v1.0.3
with:
@ -59,6 +59,6 @@ jobs:
username: ${{ secrets.LOCAL_233_SSH_USER }}
password: ${{ secrets.LOCAL_233_SSH_PASSWORD }}
script: |
docker rm -f rtss-simulation || echo "rtss-simulation not exist"
docker pull gitea.joylink.club/joylink/rtss-simulation:local-test
docker run --name rtss-simulation --restart=always -e RUN_MODE=local_test --network jlnet --ip 10.11.11.11 -d -p 8765:8765 -v /usr/local/joylink/logs/simulation:/logs/simulation gitea.joylink.club/joylink/rtss-simulation:local-test
docker rm -f rtsa-simulation || echo "rtsa-simulation not exist"
docker pull gitea.joylink.club/joylink/rtsa-simulation:local-test
docker run --name rtsa-simulation --restart=always -e RUN_MODE=local_test --network jlnet --ip 10.11.11.11 -d -p 8765:8765 -v /usr/local/joylink/logs/simulation:/logs/simulation gitea.joylink.club/joylink/rtsa-simulation:local-test

4
.gitmodules vendored
View File

@ -1,3 +1,3 @@
[submodule "rtss-proto-msg"]
path = rtss-proto-msg
[submodule "rtsa-proto-msg"]
path = rtsa-proto-msg
url = https://gitea.joylink.club/joylink/rtss-proto-msg.git

View File

@ -27,7 +27,7 @@
"repr",
"reqwest",
"rtsa",
"rtss",
"rtsa",
"rumqtt",
"rumqttc",
"sqlx",

65
Cargo.lock generated
View File

@ -331,7 +331,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite",
"tower",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
@ -375,7 +375,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tower",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
@ -1448,7 +1448,7 @@ dependencies = [
"pin-project-lite",
"socket2",
"tokio",
"tower",
"tower 0.4.13",
"tower-service",
"tracing",
]
@ -1636,9 +1636,9 @@ dependencies = [
"clap",
"config",
"enum_dispatch",
"rtss_api",
"rtss_db",
"rtss_log",
"rtsa_api",
"rtsa_db",
"rtsa_log",
"serde",
"tokio",
]
@ -2400,7 +2400,7 @@ dependencies = [
]
[[package]]
name = "rtss_api"
name = "rtsa_api"
version = "0.1.0"
dependencies = [
"anyhow",
@ -2412,9 +2412,9 @@ dependencies = [
"chrono",
"jsonwebtoken",
"reqwest",
"rtss_db",
"rtss_dto",
"rtss_log",
"rtsa_db",
"rtsa_dto",
"rtsa_log",
"serde",
"serde_json",
"sysinfo",
@ -2423,13 +2423,13 @@ dependencies = [
]
[[package]]
name = "rtss_db"
name = "rtsa_db"
version = "0.1.0"
dependencies = [
"anyhow",
"lazy_static",
"rtss_dto",
"rtss_log",
"rtsa_dto",
"rtsa_log",
"serde",
"serde_json",
"sqlx",
@ -2437,7 +2437,7 @@ dependencies = [
]
[[package]]
name = "rtss_dto"
name = "rtsa_dto"
version = "0.1.0"
dependencies = [
"async-graphql",
@ -2449,7 +2449,7 @@ dependencies = [
]
[[package]]
name = "rtss_log"
name = "rtsa_log"
version = "0.1.0"
dependencies = [
"tracing",
@ -2458,16 +2458,17 @@ dependencies = [
]
[[package]]
name = "rtss_mqtt"
name = "rtsa_mqtt"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"lazy_static",
"rtss_log",
"rtsa_log",
"rumqttc",
"thiserror",
"tokio",
"tower 0.5.1",
]
[[package]]
@ -2644,18 +2645,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.213"
version = "1.0.214"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea7893ff5e2466df8d720bb615088341b295f849602c6956047f8f80f0e9bc1"
checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.213"
version = "1.0.214"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5"
checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766"
dependencies = [
"proc-macro2",
"quote",
@ -2778,10 +2779,10 @@ dependencies = [
"enum_dispatch",
"lazy_static",
"rayon",
"rtss_db",
"rtss_dto",
"rtss_log",
"rtss_mqtt",
"rtsa_db",
"rtsa_dto",
"rtsa_log",
"rtsa_mqtt",
"serde",
"thiserror",
"tokio",
@ -3391,6 +3392,20 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper 0.1.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-http"
version = "0.6.0"

View File

@ -17,7 +17,7 @@ sqlx = { version = "0.8", features = [
"json",
"chrono",
] }
serde = { version = "1.0.213", features = ["derive"] }
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
anyhow = "1.0.91"
async-trait = "0.1.83"
@ -26,3 +26,4 @@ lazy_static = "1.5.0"
config = "0.14.1"
clap = { version = "4.5.20", features = ["derive"] }
enum_dispatch = "0.3"
tower = { version = "0.5.1", features = ["util"] }

View File

@ -1,4 +1,4 @@
[default.extend-words]
[files]
extend-exclude = ["README.md", "rtss-proto-msg/*"]
extend-exclude = ["README.md", "rtsa-proto-msg/*"]

View File

@ -1,5 +1,5 @@
[package]
name = "rtss_db"
name = "rtsa_db"
version = "0.1.0"
edition = "2021"
@ -19,5 +19,5 @@ sqlx = { workspace = true, features = [
thiserror = { workspace = true }
lazy_static = { workspace = true }
rtss_dto = { path = "../rtss_dto" }
rtss_log = { path = "../rtss_log" }
rtsa_dto = { path = "../rtsa_dto" }
rtsa_log = { path = "../rtsa_log" }

View File

@ -1,7 +1,7 @@
use std::vec;
use rtss_dto::common::DataType;
use rtss_log::tracing::debug;
use rtsa_dto::common::DataType;
use rtsa_log::tracing::debug;
use serde_json::Value;
use crate::{
@ -10,7 +10,7 @@ use crate::{
DbAccessError,
};
use super::RtssDbAccessor;
use super::RtsaDbAccessor;
/// 草稿数据管理
#[allow(async_fn_in_trait)]
@ -205,7 +205,7 @@ impl CreateDraftData {
}
}
impl DraftDataAccessor for RtssDbAccessor {
impl DraftDataAccessor for RtsaDbAccessor {
async fn paging_query_draft_data(
&self,
query: DraftDataQuery,
@ -436,8 +436,8 @@ mod tests {
use crate::{SyncUserInfo, UserAccessor};
use super::*;
use rtss_dto::common::{IscsStyle, Role};
use rtss_log::tracing::Level;
use rtsa_dto::common::{IscsStyle, Role};
use rtsa_log::tracing::Level;
use serde::{Deserialize, Serialize};
use sqlx::{types::chrono::Local, PgPool};
@ -449,8 +449,8 @@ mod tests {
// You could also do `use foo_crate::MIGRATOR` and just refer to it as `MIGRATOR` here.
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn basic_use_test(pool: PgPool) -> Result<(), DbAccessError> {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtssDbAccessor::new(pool);
rtsa_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtsaDbAccessor::new(pool);
// 同步10个用户
let mut users = vec![];
for i in 0..10 {

View File

@ -1,4 +1,4 @@
use rtss_dto::common::FeatureType;
use rtsa_dto::common::FeatureType;
use serde_json::Value;
use crate::{
@ -7,7 +7,7 @@ use crate::{
DbAccessError,
};
use super::RtssDbAccessor;
use super::RtsaDbAccessor;
/// 功能特性管理
#[allow(async_fn_in_trait)]
@ -34,7 +34,7 @@ pub trait FeatureAccessor {
async fn get_features(&self, ids: &[i32]) -> Result<Vec<FeatureModel>, DbAccessError>;
}
impl FeatureAccessor for RtssDbAccessor {
impl FeatureAccessor for RtsaDbAccessor {
async fn create_feature(&self, feature: &CreateFeature) -> Result<FeatureModel, DbAccessError> {
let table = FeatureColumn::Table.name();
let feature_type_column = FeatureColumn::FeatureType.name();
@ -217,8 +217,8 @@ impl FeaturePagingFilter {
#[cfg(test)]
mod tests {
use rtss_dto::common::Role;
use rtss_log::tracing::Level;
use rtsa_dto::common::Role;
use rtsa_log::tracing::Level;
use sqlx::{types::chrono::Local, PgPool};
use crate::{SyncUserInfo, UserAccessor};
@ -227,8 +227,8 @@ mod tests {
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_basic_use(pool: PgPool) -> Result<(), DbAccessError> {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtssDbAccessor::new(pool);
rtsa_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtsaDbAccessor::new(pool);
// 同步10个用户
let mut users = vec![];
for i in 0..10 {

View File

@ -5,7 +5,7 @@ pub use draft_data::*;
mod release_data;
pub use release_data::*;
mod user;
use rtss_log::tracing::error;
use rtsa_log::tracing::error;
pub use user::*;
mod feature;
pub use feature::*;
@ -14,7 +14,7 @@ use lazy_static::lazy_static;
use crate::{model::MqttClientIdSeq, DbAccessError};
lazy_static! {
static ref RDA: Mutex<Option<RtssDbAccessor>> = Mutex::new(None);
static ref RDA: Mutex<Option<RtsaDbAccessor>> = Mutex::new(None);
}
/// 初始化全局默认数据库访问器
@ -29,7 +29,7 @@ pub async fn init_default_db_accessor(url: &str) {
}
/// 获取默认数据库访问器
pub fn get_default_db_accessor() -> RtssDbAccessor {
pub fn get_default_db_accessor() -> RtsaDbAccessor {
let rda = RDA.lock().unwrap();
if rda.is_none() {
panic!("数据库访问器未初始化");
@ -38,13 +38,13 @@ pub fn get_default_db_accessor() -> RtssDbAccessor {
}
#[derive(Clone)]
pub struct RtssDbAccessor {
pub struct RtsaDbAccessor {
pool: sqlx::PgPool,
}
impl RtssDbAccessor {
impl RtsaDbAccessor {
pub fn new(pool: sqlx::PgPool) -> Self {
RtssDbAccessor { pool }
RtsaDbAccessor { pool }
}
pub async fn get_next_mqtt_client_id(&self) -> Result<i64, DbAccessError> {
@ -56,22 +56,22 @@ impl RtssDbAccessor {
}
}
pub async fn get_db_accessor(url: &str) -> RtssDbAccessor {
pub async fn get_db_accessor(url: &str) -> RtsaDbAccessor {
let pool = sqlx::PgPool::connect(url).await.expect("连接数据库失败");
RtssDbAccessor::new(pool)
RtsaDbAccessor::new(pool)
}
#[cfg(test)]
mod tests {
use super::*;
use rtss_log::tracing::{self, Level};
use rtsa_log::tracing::{self, Level};
use sqlx::PgPool;
// You could also do `use foo_crate::MIGRATOR` and just refer to it as `MIGRATOR` here.
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_get_mqtt_client_id(pool: PgPool) -> Result<(), DbAccessError> {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtssDbAccessor::new(pool);
rtsa_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = crate::db_access::RtsaDbAccessor::new(pool);
for _ in 0..10 {
let id = accessor.get_next_mqtt_client_id().await?;
tracing::info!("id = {}", id);

View File

@ -1,4 +1,4 @@
use rtss_dto::common::DataType;
use rtsa_dto::common::DataType;
use serde_json::Value;
use sqlx::{types::chrono, Postgres};
@ -11,7 +11,7 @@ use crate::{
DbAccessError,
};
use super::{CreateDraftData, DraftDataAccessor, RtssDbAccessor};
use super::{CreateDraftData, DraftDataAccessor, RtsaDbAccessor};
#[allow(async_fn_in_trait)]
pub trait ReleaseDataAccessor {
@ -109,7 +109,7 @@ pub struct ReleaseFromDraftResult {
pub struct ReleaseDataQuery {
pub name: Option<String>,
pub user_id: Option<i32>,
pub data_type: Option<rtss_dto::common::DataType>,
pub data_type: Option<rtsa_dto::common::DataType>,
pub options: Option<Value>,
pub is_published: Option<bool>,
}
@ -141,7 +141,7 @@ impl ReleaseDataQuery {
self
}
pub fn with_data_type(mut self, data_type: rtss_dto::common::DataType) -> Self {
pub fn with_data_type(mut self, data_type: rtsa_dto::common::DataType) -> Self {
self.data_type = Some(data_type);
self
}
@ -202,7 +202,7 @@ pub struct CreateReleaseVersionData {
pub user_id: i32,
}
impl RtssDbAccessor {
impl RtsaDbAccessor {
async fn insert_release_data_version<'e, 'c: 'e, E>(
&self,
data: CreateReleaseVersionData,
@ -237,7 +237,7 @@ impl RtssDbAccessor {
}
}
impl ReleaseDataAccessor for RtssDbAccessor {
impl ReleaseDataAccessor for RtsaDbAccessor {
async fn release_new_from_draft(
&self,
draft_id: i32,
@ -645,12 +645,12 @@ impl ReleaseDataAccessor for RtssDbAccessor {
#[cfg(test)]
mod tests {
use crate::{CreateDraftData, DraftDataAccessor, RtssDbAccessor, SyncUserInfo, UserAccessor};
use crate::{CreateDraftData, DraftDataAccessor, RtsaDbAccessor, SyncUserInfo, UserAccessor};
use super::*;
use chrono::Local;
use rtss_dto::common::{IscsStyle, Role};
use rtss_log::tracing::Level;
use rtsa_dto::common::{IscsStyle, Role};
use rtsa_log::tracing::Level;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
@ -695,8 +695,8 @@ mod tests {
// You could also do `use foo_crate::MIGRATOR` and just refer to it as `MIGRATOR` here.
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_basic_use(pool: PgPool) -> Result<(), DbAccessError> {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = RtssDbAccessor::new(pool);
rtsa_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = RtsaDbAccessor::new(pool);
// 同步10个用户
let mut users = vec![];
for i in 0..10 {
@ -717,7 +717,7 @@ mod tests {
let data = "test".as_bytes();
let draft = accessor
.create_draft_data(
CreateDraftData::new("test", rtss_dto::common::DataType::Iscs, 1)
CreateDraftData::new("test", rtsa_dto::common::DataType::Iscs, 1)
.with_options(
serde_json::to_value(IscsDataOptions {
style: IscsStyle::DaShiZhiNeng,
@ -767,7 +767,7 @@ mod tests {
// name重复检查
let exist = accessor
.is_release_data_name_exist(rtss_dto::common::DataType::Iscs, name)
.is_release_data_name_exist(rtsa_dto::common::DataType::Iscs, name)
.await?;
assert!(exist);
@ -849,7 +849,7 @@ mod tests {
let data = "test data".as_bytes();
let draft = accessor
.create_draft_data(
CreateDraftData::new(&name, rtss_dto::common::DataType::Em, i).with_data(data),
CreateDraftData::new(&name, rtsa_dto::common::DataType::Em, i).with_data(data),
)
.await?;
let (release_data, _) = accessor
@ -883,7 +883,7 @@ mod tests {
let page_result = accessor.paging_query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 2);
// 分页查询发布数据,按数据类型过滤
let query = ReleaseDataQuery::new().with_data_type(rtss_dto::common::DataType::Em);
let query = ReleaseDataQuery::new().with_data_type(rtsa_dto::common::DataType::Em);
let page = PageQuery::new(1, 10);
let page_result = accessor.paging_query_release_data_list(query, page).await?;
assert_eq!(page_result.total, 8);

View File

@ -7,7 +7,7 @@ use crate::{
DbAccessError,
};
use super::RtssDbAccessor;
use super::RtsaDbAccessor;
/// 草稿数据管理
#[allow(async_fn_in_trait)]
@ -96,7 +96,7 @@ pub struct SyncUserInfo {
pub updated_at: Option<DateTime<Local>>,
}
impl RtssDbAccessor {
impl RtsaDbAccessor {
/// 首次同步用户数据
async fn sync_new_users(&self, users: &[SyncUserInfo]) -> Result<(), DbAccessError> {
let table = UserColumn::Table.name();
@ -223,7 +223,7 @@ impl RtssDbAccessor {
}
}
impl UserAccessor for RtssDbAccessor {
impl UserAccessor for RtsaDbAccessor {
async fn sync_user(&self, users: &[SyncUserInfo]) -> Result<(), DbAccessError> {
self.check_and_sync_user(users).await
}
@ -271,7 +271,7 @@ impl UserAccessor for RtssDbAccessor {
mod tests {
use std::time::Duration;
use rtss_log::tracing::Level;
use rtsa_log::tracing::Level;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
@ -293,8 +293,8 @@ mod tests {
#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_sync_user(pool: PgPool) -> Result<(), DbAccessError> {
// 日志初始化
rtss_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = RtssDbAccessor::new(pool);
rtsa_log::Logging::default().with_level(Level::DEBUG).init();
let accessor = RtsaDbAccessor::new(pool);
let users = vec![
SyncUserInfo {
id: 1,

View File

@ -1,4 +1,4 @@
use rtss_dto::common::{DataType, FeatureType, Role};
use rtsa_dto::common::{DataType, FeatureType, Role};
use serde_json::Value;
use sqlx::types::{
chrono::{DateTime, Local},
@ -14,7 +14,7 @@ pub enum MqttClientIdSeq {
impl MqttClientIdSeq {
pub fn name(&self) -> &str {
match self {
MqttClientIdSeq::Name => "rtss.mqtt_client_id_seq",
MqttClientIdSeq::Name => "rtsa.mqtt_client_id_seq",
}
}
}
@ -46,7 +46,7 @@ pub struct UserModel {
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.draft_data 列映射
/// 数据库表 rtsa.draft_data 列映射
#[derive(Debug)]
pub enum DraftDataColumn {
Table,
@ -80,7 +80,7 @@ pub struct DraftDataModel {
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.release_data 列映射
/// 数据库表 rtsa.release_data 列映射
#[derive(Debug)]
pub enum ReleaseDataColumn {
Table,
@ -111,7 +111,7 @@ pub struct ReleaseDataModel {
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.release_data_version 列映射
/// 数据库表 rtsa.release_data_version 列映射
#[derive(Debug)]
pub enum ReleaseDataVersionColumn {
Table,
@ -137,7 +137,7 @@ pub struct ReleaseDataVersionModel {
pub created_at: DateTime<Local>,
}
/// 数据库表 rtss.feature 列映射
/// 数据库表 rtsa.feature 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub enum FeatureColumn {
@ -169,7 +169,7 @@ pub struct FeatureModel {
pub updated_at: DateTime<Local>,
}
/// 数据库表 rtss.user_config 列映射
/// 数据库表 rtsa.user_config 列映射
#[derive(Debug)]
#[allow(dead_code)]
pub enum UserConfigColumn {
@ -195,7 +195,7 @@ pub struct UserConfigModel {
impl TableColumn for UserColumn {
fn name(&self) -> &str {
match self {
UserColumn::Table => "rtss.user",
UserColumn::Table => "rtsa.user",
UserColumn::Id => "id",
UserColumn::Username => "username",
UserColumn::Password => "password",
@ -211,7 +211,7 @@ impl TableColumn for UserColumn {
impl TableColumn for DraftDataColumn {
fn name(&self) -> &str {
match self {
DraftDataColumn::Table => "rtss.draft_data",
DraftDataColumn::Table => "rtsa.draft_data",
DraftDataColumn::Id => "id",
DraftDataColumn::Name => "name",
DraftDataColumn::DataType => "data_type",
@ -229,7 +229,7 @@ impl TableColumn for DraftDataColumn {
impl TableColumn for ReleaseDataColumn {
fn name(&self) -> &str {
match self {
ReleaseDataColumn::Table => "rtss.release_data",
ReleaseDataColumn::Table => "rtsa.release_data",
ReleaseDataColumn::Id => "id",
ReleaseDataColumn::Name => "name",
ReleaseDataColumn::DataType => "data_type",
@ -246,7 +246,7 @@ impl TableColumn for ReleaseDataColumn {
impl TableColumn for ReleaseDataVersionColumn {
fn name(&self) -> &str {
match self {
ReleaseDataVersionColumn::Table => "rtss.release_data_version",
ReleaseDataVersionColumn::Table => "rtsa.release_data_version",
ReleaseDataVersionColumn::Id => "id",
ReleaseDataVersionColumn::ReleaseDataId => "release_data_id",
ReleaseDataVersionColumn::Options => "options",
@ -261,7 +261,7 @@ impl TableColumn for ReleaseDataVersionColumn {
impl TableColumn for FeatureColumn {
fn name(&self) -> &str {
match self {
FeatureColumn::Table => "rtss.feature",
FeatureColumn::Table => "rtsa.feature",
FeatureColumn::Id => "id",
FeatureColumn::FeatureType => "feature_type",
FeatureColumn::Name => "name",
@ -279,7 +279,7 @@ impl TableColumn for FeatureColumn {
impl TableColumn for UserConfigColumn {
fn name(&self) -> &str {
match self {
UserConfigColumn::Table => "rtss.user_config",
UserConfigColumn::Table => "rtsa.user_config",
UserConfigColumn::Id => "id",
UserConfigColumn::UserId => "user_id",
UserConfigColumn::ConfigType => "config_type",

View File

@ -1,5 +1,5 @@
[package]
name = "rtss_dto"
name = "rtsa_dto"
version = "0.1.0"
edition = "2021"

View File

@ -16,7 +16,7 @@ fn main() {
{
std::env::set_var(
"PROTOC",
"../../rtss-proto-msg/protoc/protoc-27.4-win64/bin/protoc.exe",
"../../rtsa-proto-msg/protoc/protoc-27.4-win64/bin/protoc.exe",
);
}
Config::new()
@ -39,12 +39,12 @@ fn main() {
)
.compile_protos(
&[
"../../rtss-proto-msg/src/em_data.proto",
"../../rtss-proto-msg/src/common.proto",
"../../rtss-proto-msg/src/iscs_graphic_data.proto",
"../../rtss-proto-msg/src/simulation.proto",
"../../rtsa-proto-msg/src/em_data.proto",
"../../rtsa-proto-msg/src/common.proto",
"../../rtsa-proto-msg/src/iscs_graphic_data.proto",
"../../rtsa-proto-msg/src/simulation.proto",
],
&["../../rtss-proto-msg/src/"],
&["../../rtsa-proto-msg/src/"],
)
.unwrap();
@ -52,5 +52,5 @@ fn main() {
Command::new("cargo")
.args(["fmt"])
.status()
.expect("Failed to run cargo fmt on rtss-dto");
.expect("Failed to run cargo fmt on rtsa-dto");
}

View File

@ -1,5 +1,5 @@
[package]
name = "rtss_log"
name = "rtsa_log"
version = "0.1.0"
edition = "2021"

View File

@ -1,5 +1,5 @@
[package]
name = "rtss_mqtt"
name = "rtsa_mqtt"
version = "0.1.0"
edition = "2021"
@ -10,5 +10,6 @@ async-trait = { workspace = true }
bytes = { workspace = true }
lazy_static = { workspace = true }
thiserror = { workspace = true }
tower = { workspace = true }
rtss_log = { path = "../rtss_log" }
rtsa_log = { path = "../rtsa_log" }

View File

@ -11,4 +11,6 @@ pub enum MqttClientError {
ClientError(#[from] ClientError),
#[error("全局客户端未设置")]
NoClient,
#[error("请求超时")]
RequestTimeout,
}

512
crates/rtsa_mqtt/src/lib.rs Normal file
View File

@ -0,0 +1,512 @@
use core::panic;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
task::Waker,
time::Duration,
};
use bytes::Bytes;
use lazy_static::lazy_static;
use rtsa_log::tracing::{error, info, trace};
use rumqttc::{
v5::{
mqttbytes::{
v5::{Packet, Publish, PublishProperties},
QoS,
},
AsyncClient, Event, EventLoop, MqttOptions,
},
Outgoing,
};
use service::{Handler, MqttRequest, MqttRouter};
use tokio::{sync::oneshot, time::timeout};
mod error;
use error::MqttClientError;
mod service;
lazy_static! {
/// 全局静态MqttClient实例
/// 使用注意事项:
/// 每次订阅/发布/请求时都通过get_global_mqtt_client获取新的实例否则可能会出现死锁
static ref MQTT_CLIENT: tokio::sync::Mutex<Option<MqttClient>> = tokio::sync::Mutex::new(None);
}
/// 初始化全局MqttClient实例
pub async fn init_global_mqtt_client(
mut options: MqttClientOptions,
) -> Result<(), MqttClientError> {
let client = options.build();
set_global_mqtt_client(client).await
}
/// 设置全局MqttClient实例
pub async fn set_global_mqtt_client(client: MqttClient) -> Result<(), MqttClientError> {
let mut mqtt_client = MQTT_CLIENT.lock().await;
if mqtt_client.is_some() {
return Err(MqttClientError::AlreadySet);
}
*mqtt_client = Some(client);
Ok(())
}
/// 获取全局MqttClient实例
pub async fn get_global_mqtt_client() -> MqttClient {
let mqtt_client = MQTT_CLIENT.lock().await;
if let Some(client) = mqtt_client.as_ref() {
return client.clone();
}
panic!("MqttClient未初始化: 使用init_global_mqtt_client初始化或者在main函数中调用set_global_mqtt_client设置");
}
pub struct MqttClientOptions {
id: String,
options: MqttOptions,
/// mqtt客户端请求队列的最大容量
max_cap: usize,
request_timeout: Duration,
}
impl MqttClientOptions {
pub fn new(id: &str, url: &str) -> Self {
Self {
id: id.to_string(),
options: MqttOptions::parse_url(format!("{}?client_id={}", url, id))
.expect("解析mqtt url失败"),
max_cap: 30,
request_timeout: Duration::from_secs(5),
}
}
pub fn set_max_cap(mut self, max_cap: usize) -> Self {
self.max_cap = max_cap;
self
}
pub fn set_request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn set_credentials(mut self, username: &str, password: &str) -> Self {
self.options.set_credentials(username, password);
self
}
pub fn build(&mut self) -> MqttClient {
self.options.set_keep_alive(Duration::from_secs(10));
let (client, eventloop) = AsyncClient::new(self.options.clone(), self.max_cap);
let cli = MqttClient {
id: self.id.clone(),
request_timeout: self.request_timeout,
client,
request_id: Arc::new(AtomicU64::new(0)),
router: MqttRouter::new(),
};
cli.run_async(eventloop);
cli
}
}
/// MQTT客户端
/// id: 客户端ID,从数据库的id序列中获取
/// 客户端具有的功能:
/// 1. 启动
/// 2. 订阅
/// 3. 发布
/// 4. 实现类似http的请求相应功能
/// 5. 断开连接
#[derive(Clone)]
pub struct MqttClient {
id: String,
/// 全局的请求超时时间
request_timeout: Duration,
client: AsyncClient,
request_id: Arc<AtomicU64>,
router: MqttRouter,
}
impl MqttClient {
pub async fn clear(&self) -> Result<(), MqttClientError> {
self.client.disconnect().await?;
// 清空订阅处理器
self.router.clear();
Ok(())
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn publish(
&self,
topic: &str,
qos: QoS,
payload: Vec<u8>,
) -> Result<(), MqttClientError> {
self.client.publish(topic, qos, false, payload).await?;
Ok(())
}
pub async fn add_route<H>(&self, topic: &str, handler: H, qos: QoS)
where
H: Handler + Send + Sync + 'static,
{
self.client.subscribe(topic, qos).await.unwrap();
self.router.add_route(topic, handler);
}
pub async fn remove_route(&self, topic: &str) {
self.client.unsubscribe(topic).await.unwrap();
self.router.remove_route(topic);
}
pub fn next_request_id(&self) -> u64 {
self.request_id.fetch_add(1, Ordering::Relaxed)
}
pub async fn handle_request(
&self,
req: Request,
timeout: Duration,
) -> Result<Response, MqttClientError> {
// 订阅响应主题
let response_topic = format!("{}/{}/resp/{}", self.id, req.topic, self.next_request_id());
// 创建请求future
let response_future = MqttResponseFuture::new(&response_topic, timeout);
// 添加响应处理器
self.add_route(&response_topic, response_future.clone(), QoS::ExactlyOnce)
.await;
// 发布请求
let property = PublishProperties {
response_topic: Some(response_topic.clone()),
..req.properties.unwrap_or_default()
};
self.client
.publish_with_properties(req.topic, req.qos, false, req.payload, property)
.await?;
// 等待响应
let resp = response_future.await;
// 注销响应处理器并取消订阅
self.remove_route(&response_topic).await;
if resp.is_timeout() {
return Err(MqttClientError::RequestTimeout);
}
Ok(resp)
}
/// 发送请求并等待响应
pub async fn request(&self, req: Request) -> Result<Response, MqttClientError> {
self.handle_request(req, self.request_timeout).await
}
/// 发送请求并等待响应,指定响应超时时间
/// 响应超时时间为0时表示永不超时
pub async fn request_with_timeout(
&self,
req: Request,
timeout: Duration,
) -> Result<Response, MqttClientError> {
self.handle_request(req, timeout).await
}
fn run_async(&self, eventloop: EventLoop) {
let cli = self.clone();
tokio::spawn(async move {
cli.run(eventloop).await;
});
}
async fn run(&self, mut eventloop: EventLoop) {
while let Ok(notification) = eventloop.poll().await {
match notification {
Event::Incoming(Packet::Publish(publish)) => {
trace!("Received message: {:?}", publish);
let this = self.clone();
let router = self.router.clone();
tokio::spawn(async move {
let response_topic = publish
.properties
.clone()
.and_then(|p| p.response_topic.clone());
if let Some(resp) = router.handle_request(MqttRequest::new(publish)).await {
if let Some(r_topic) = response_topic {
this.publish(&r_topic, QoS::AtMostOnce, resp.payload.to_vec())
.await
.unwrap();
}
}
});
}
Event::Outgoing(Outgoing::Disconnect) => {
info!("Disconnected to the broker");
break;
}
Event::Incoming(Packet::Disconnect(disconnect)) => {
info!("Disconnected from the broker: {:?}", disconnect);
break;
}
_ => {
trace!("Unhandled event: {:?}", notification);
}
}
}
}
}
pub struct Request {
topic: String,
qos: QoS,
payload: Bytes,
properties: Option<PublishProperties>,
}
impl Request {
pub fn new(topic: &str, payload: Bytes) -> Self {
Self {
topic: topic.to_string(),
qos: QoS::AtMostOnce,
payload,
properties: None,
}
}
pub fn with_qos(mut self, qos: QoS) -> Self {
self.qos = qos;
self
}
pub fn with_properties(mut self, properties: PublishProperties) -> Self {
self.properties = Some(properties);
self
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum MqttResponseState {
Waiting,
Received,
Timeout,
}
/// MQTT请求响应
#[derive(Clone, Debug)]
pub struct Response {
state: Arc<Mutex<MqttResponseState>>,
response: Arc<Mutex<Publish>>,
}
impl Default for Response {
fn default() -> Self {
Self::new()
}
}
impl Response {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(MqttResponseState::Waiting)),
response: Arc::new(Mutex::new(Publish::default())),
}
}
pub fn is_waiting(&self) -> bool {
*self.state.lock().unwrap() == MqttResponseState::Waiting
}
pub fn is_received(&self) -> bool {
*self.state.lock().unwrap() == MqttResponseState::Received
}
pub fn is_timeout(&self) -> bool {
*self.state.lock().unwrap() == MqttResponseState::Timeout
}
pub fn set_timeout(&self) {
*self.state.lock().unwrap() = MqttResponseState::Timeout;
}
pub fn set(&self, response: Publish) {
*self.state.lock().unwrap() = MqttResponseState::Received;
*self.response.lock().unwrap() = response;
}
pub fn get(&self) -> Publish {
self.response.lock().unwrap().clone()
}
}
/// MQTT响应Future
#[derive(Clone)]
pub struct MqttResponseFuture {
pub start_time: std::time::Instant,
timeout: Duration,
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
waker: Arc<Mutex<Option<Waker>>>,
response_topic: String,
response: Response,
}
impl MqttResponseFuture {
pub fn new(response_topic: &str, timeout: Duration) -> Self {
let (tx, rx) = oneshot::channel();
let r = Self {
start_time: std::time::Instant::now(),
timeout,
tx: Arc::new(Mutex::new(Some(tx))),
waker: Arc::new(Mutex::new(None)),
response_topic: response_topic.to_string(),
response: Response::new(),
};
// 启动超时检查
r.start_timeout_monitor(rx);
r
}
/// 启动超时监控任务逻辑
fn start_timeout_monitor(&self, rx: oneshot::Receiver<()>) {
if self.timeout.as_millis() == 0 {
return;
}
let response = self.response.clone();
let response_topic = self.response_topic.clone();
let duration = self.timeout;
let waker = self.waker.clone();
tokio::spawn(async move {
if (timeout(duration, rx).await).is_err() {
error!("Mqtt response timeout: {:?}", response_topic);
response.set_timeout();
if let Some(waker) = waker.lock().unwrap().take() {
waker.wake();
}
}
});
}
}
impl Handler for MqttResponseFuture {
fn handle(
&self,
req: MqttRequest,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<service::MqttResponse>> + Send>>
{
let topic = req.topic();
trace!("MqttResponseFuture handle: {:?}", topic);
if topic == self.response_topic {
self.response.set(req.get());
if let Some(tx) = self.tx.lock().unwrap().take() {
tx.send(())
.expect("Send Mqtt response timeout signal failed");
}
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
}
Box::pin(async { None })
}
}
impl std::future::Future for MqttResponseFuture {
type Output = Response;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if self.response.is_waiting() {
trace!(
"topic={} Response future poll waiting...",
self.response_topic
);
self.waker.lock().unwrap().replace(cx.waker().clone());
std::task::Poll::Pending
} else {
trace!(
"topic={} Response future poll ready: {:?}",
self.response_topic,
self.response
);
std::task::Poll::Ready(self.response.clone())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rtsa_log::tracing::Level;
use tokio::time::Duration;
fn create_mqtt_options() -> MqttClientOptions {
MqttClientOptions::new("rtsa_test1", "tcp://localhost:1883")
.set_credentials("rtsa", "Joylink@0503")
}
#[tokio::test]
async fn test_mqtt_client_initialization() {
let options = create_mqtt_options();
let result = init_global_mqtt_client(options).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_mqtt_client_publish() {
let options = create_mqtt_options();
init_global_mqtt_client(options).await.unwrap();
let mqtt_client = get_global_mqtt_client().await;
let result = mqtt_client
.publish("test/topic", QoS::AtLeastOnce, b"Hello, MQTT!".to_vec())
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_mqtt_client_request_response() {
rtsa_log::Logging::default().with_level(Level::TRACE).init();
let options = create_mqtt_options();
init_global_mqtt_client(options).await.unwrap();
let mqtt_client = get_global_mqtt_client().await;
struct EchoHandler;
impl Handler for EchoHandler {
fn handle(
&self,
req: MqttRequest,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Option<service::MqttResponse>> + Send>,
> {
let payload = req.payload();
Box::pin(async move { Some(service::MqttResponse::new(payload)) })
}
}
mqtt_client
.add_route("test/echo", EchoHandler, QoS::AtLeastOnce)
.await;
let request = Request::new("test/echo", Bytes::from("Echo message"));
let response = mqtt_client.request(request).await.unwrap();
assert_eq!(response.get().payload, Bytes::from("Echo message"));
mqtt_client.remove_route("test/echo").await;
}
#[tokio::test]
async fn test_mqtt_client_timeout() {
let options = create_mqtt_options();
init_global_mqtt_client(options).await.unwrap();
let mqtt_client = get_global_mqtt_client().await;
let request =
Request::new("test/timeout", Bytes::from("Timeout test")).with_qos(QoS::ExactlyOnce);
let result = mqtt_client
.request_with_timeout(request, Duration::from_secs(1))
.await;
assert!(result.is_err());
}
}

View File

@ -0,0 +1,247 @@
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
use bytes::Bytes;
use rumqttc::v5::mqttbytes::v5::{Publish, PublishProperties};
pub struct MqttRequest {
publish: Publish,
}
impl MqttRequest {
pub fn new(publish: Publish) -> Self {
MqttRequest { publish }
}
pub fn topic(&self) -> String {
String::from_utf8_lossy(&self.publish.topic).to_string()
}
pub fn payload(&self) -> Bytes {
self.publish.payload.clone()
}
pub fn get(&self) -> Publish {
self.publish.clone()
}
}
pub struct MqttResponse {
pub properties: Option<PublishProperties>,
pub payload: Bytes,
}
impl MqttResponse {
pub fn new(payload: Bytes) -> Self {
MqttResponse {
properties: None,
payload,
}
}
pub fn with_properties(payload: Bytes, properties: PublishProperties) -> Self {
MqttResponse {
properties: Some(properties),
payload,
}
}
}
pub trait Handler: Send + Sync + 'static {
fn handle(
&self,
req: MqttRequest,
) -> Pin<Box<dyn Future<Output = Option<MqttResponse>> + Send>>;
}
impl<F, Fut> Handler for F
where
F: Fn(MqttRequest) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Option<MqttResponse>> + Send + 'static,
{
fn handle(
&self,
req: MqttRequest,
) -> Pin<Box<dyn Future<Output = Option<MqttResponse>> + Send>> {
Box::pin((self)(req))
}
}
#[derive(Clone)]
pub struct MqttRouter {
routes: Arc<Mutex<HashMap<String, Arc<dyn Handler + Send + Sync>>>>,
}
impl MqttRouter {
pub fn new() -> Self {
MqttRouter {
routes: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn add_route<H>(&self, topic: &str, handler: H)
where
H: Handler + Send + Sync + 'static,
{
self.routes
.lock()
.unwrap()
.insert(topic.to_string(), Arc::new(handler));
}
pub fn remove_route(&self, topic: &str) {
self.routes.lock().unwrap().remove(topic);
}
fn get_handler(&self, topic: &str) -> Option<Arc<dyn Handler + Send + Sync>> {
let routes = self.routes.lock().unwrap();
routes.get(topic).cloned()
}
pub fn clear(&self) {
self.routes.lock().unwrap().clear();
}
pub async fn handle_request(&self, req: MqttRequest) -> Option<MqttResponse> {
if let Some(handler) = self.get_handler(&req.topic()) {
handler.handle(req).await
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use rumqttc::v5::mqttbytes::v5::Publish;
// Helper function to create a Publish message
fn create_publish(topic: &str, payload: &[u8]) -> Publish {
Publish {
topic: topic.as_bytes().to_vec().into(),
payload: Bytes::from(payload.to_vec()),
dup: false,
qos: rumqttc::v5::mqttbytes::QoS::AtMostOnce,
retain: false,
pkid: 0,
properties: None,
}
}
// Sample handler that echoes the payload back
async fn echo_handler(req: MqttRequest) -> Option<MqttResponse> {
Some(MqttResponse {
properties: None,
payload: req.payload(),
})
}
// Sample handler that returns None
async fn none_handler(_req: MqttRequest) -> Option<MqttResponse> {
None
}
#[tokio::test]
async fn test_add_and_get_handler() {
let router = MqttRouter::new();
router.add_route("test/topic", echo_handler);
assert!(router.get_handler("test/topic").is_some());
assert!(router.get_handler("invalid/topic").is_none());
}
#[tokio::test]
async fn test_handle_request_with_existing_route() {
let router = MqttRouter::new();
router.add_route("test/topic", echo_handler);
let publish = create_publish("test/topic", b"hello");
let req = MqttRequest::new(publish);
let response = router.handle_request(req).await;
assert!(response.is_some());
assert_eq!(response.unwrap().payload, Bytes::from("hello"));
}
#[tokio::test]
async fn test_handle_request_with_nonexistent_route() {
let router = MqttRouter::new();
let publish = create_publish("invalid/topic", b"hello");
let req = MqttRequest::new(publish);
let response = router.handle_request(req).await;
assert!(response.is_none());
}
#[tokio::test]
async fn test_handler_returning_none() {
let router = MqttRouter::new();
router.add_route("test/topic", none_handler);
let publish = create_publish("test/topic", b"hello");
let req = MqttRequest::new(publish);
let response = router.handle_request(req).await;
assert!(response.is_none());
}
#[tokio::test]
async fn test_multiple_routes() {
let router = MqttRouter::new();
router.add_route("topic/one", echo_handler);
router.add_route("topic/two", none_handler);
let publish_one = create_publish("topic/one", b"payload1");
let req_one = MqttRequest::new(publish_one);
let response_one = router.handle_request(req_one).await;
assert!(response_one.is_some());
assert_eq!(response_one.unwrap().payload, Bytes::from("payload1"));
let publish_two = create_publish("topic/two", b"payload2");
let req_two = MqttRequest::new(publish_two);
let response_two = router.handle_request(req_two).await;
assert!(response_two.is_none());
}
#[tokio::test]
async fn test_concurrent_access() {
use std::sync::Arc;
use tokio::sync::Barrier;
let router = Arc::new(MqttRouter::new());
router.add_route("test/topic", echo_handler);
let barrier = Arc::new(Barrier::new(10));
let mut handles = Vec::new();
for _ in 0..10 {
let router_cloned = router.clone();
let barrier_cloned = barrier.clone();
let handle = tokio::spawn(async move {
let publish = create_publish("test/topic", b"concurrent");
let req = MqttRequest::new(publish);
barrier_cloned.wait().await;
let response = router_cloned.handle_request(req).await;
assert!(response.is_some());
assert_eq!(response.unwrap().payload, Bytes::from("concurrent"));
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
}

View File

@ -1,702 +0,0 @@
use core::panic;
use std::{
any::TypeId,
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
task::Waker,
time::Duration,
};
use bytes::Bytes;
use lazy_static::lazy_static;
use rtss_log::tracing::{debug, error, info, trace};
use rumqttc::{
v5::{
mqttbytes::{
v5::{Packet, Publish, PublishProperties},
QoS,
},
AsyncClient, Event, EventLoop, MqttOptions,
},
Outgoing,
};
use tokio::{sync::oneshot, time::timeout};
mod error;
use error::MqttClientError;
lazy_static! {
/// 全局静态MqttClient实例
/// 使用注意事项:
/// 每次订阅/发布/请求时都通过get_global_mqtt_client获取新的实例否则可能会出现死锁
static ref MQTT_CLIENT: tokio::sync::Mutex<Option<MqttClient>> = tokio::sync::Mutex::new(None);
}
/// 初始化全局MqttClient实例
pub async fn init_global_mqtt_client(
mut options: MqttClientOptions,
) -> Result<(), MqttClientError> {
let client = options.build();
set_global_mqtt_client(client).await
}
/// 设置全局MqttClient实例
pub async fn set_global_mqtt_client(client: MqttClient) -> Result<(), MqttClientError> {
let mut mqtt_client = MQTT_CLIENT.lock().await;
if mqtt_client.is_some() {
return Err(MqttClientError::AlreadySet);
}
*mqtt_client = Some(client);
Ok(())
}
/// 获取全局MqttClient实例
pub async fn get_global_mqtt_client() -> MqttClient {
let mqtt_client = MQTT_CLIENT.lock().await;
if let Some(client) = mqtt_client.as_ref() {
return client.clone();
}
panic!("MqttClient未初始化: 使用init_global_mqtt_client初始化或者在main函数中调用set_global_mqtt_client设置");
}
pub struct MqttClientOptions {
id: String,
options: MqttOptions,
/// mqtt客户端请求队列的最大容量
max_cap: usize,
request_timeout: Duration,
}
impl MqttClientOptions {
pub fn new(id: &str, url: &str) -> Self {
Self {
id: id.to_string(),
options: MqttOptions::parse_url(format!("{}?client_id={}", url, id))
.expect("解析mqtt url失败"),
max_cap: 30,
request_timeout: Duration::from_secs(5),
}
}
pub fn set_max_cap(mut self, max_cap: usize) -> Self {
self.max_cap = max_cap;
self
}
pub fn set_request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn set_credentials(mut self, username: &str, password: &str) -> Self {
self.options.set_credentials(username, password);
self
}
pub fn build(&mut self) -> MqttClient {
self.options.set_keep_alive(Duration::from_secs(10));
let (client, eventloop) = AsyncClient::new(self.options.clone(), self.max_cap);
let subscriptions = SubscribeHandlerMap::new();
let cli = MqttClient {
id: self.id.clone(),
request_timeout: self.request_timeout,
client,
request_id: Arc::new(AtomicU64::new(0)),
subscriptions,
};
cli.run_async(eventloop);
cli
}
}
/// MQTT客户端
/// id: 客户端ID,从数据库的id序列中获取
/// 客户端具有的功能:
/// 1. 启动
/// 2. 订阅
/// 3. 发布
/// 4. 实现类似http的请求相应功能
/// 5. 断开连接
#[derive(Clone)]
pub struct MqttClient {
id: String,
/// 全局的请求超时时间
request_timeout: Duration,
client: AsyncClient,
request_id: Arc<AtomicU64>,
subscriptions: SubscribeHandlerMap,
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub struct HandlerId(TypeId);
#[derive(Clone)]
pub struct SubscribeHandlerMap {
sub_handlers: Arc<Mutex<HashMap<String, MessageHandlerMap>>>,
}
impl SubscribeHandlerMap {
fn new() -> Self {
Self {
sub_handlers: Arc::new(Mutex::new(HashMap::new())),
}
}
fn insert(
&self,
topic: &str,
handler_id: HandlerId,
handler: Arc<dyn MessageHandler>,
) -> HandlerId {
self.sub_handlers
.lock()
.unwrap()
.entry(topic.to_string())
.or_insert_with(MessageHandlerMap::new)
.insert(handler_id, handler);
handler_id
}
fn remove(&self, topic: &str, handler_id: HandlerId) {
if let Some(topic_handlers) = self.sub_handlers.lock().unwrap().get_mut(topic) {
topic_handlers.remove(handler_id);
}
}
#[allow(dead_code)]
fn remove_all(&self, topic: &str) {
if let Some(topic_handlers) = self.sub_handlers.lock().unwrap().get_mut(topic) {
topic_handlers.remove_all();
}
}
fn get_handlers(&self, topic: &str) -> Option<Vec<Arc<dyn MessageHandler>>> {
self.sub_handlers
.lock()
.unwrap()
.get(topic)
.map(|handlers| handlers.values())
}
#[allow(dead_code)]
fn get_mut(&self, topic: &str) -> Option<MessageHandlerMap> {
self.sub_handlers.lock().unwrap().get(topic).cloned()
}
#[allow(dead_code)]
fn is_topic_empty(&self, topic: &str) -> bool {
if let Some(topic_handlers) = self.sub_handlers.lock().unwrap().get(topic) {
topic_handlers.is_empty()
} else {
true
}
}
#[allow(dead_code)]
fn is_empty(&self) -> bool {
self.sub_handlers.lock().unwrap().is_empty()
}
fn clear(&self) {
self.sub_handlers.lock().unwrap().clear();
}
}
#[derive(Clone)]
struct MessageHandlerMap {
handlers: Arc<Mutex<HashMap<HandlerId, Arc<dyn MessageHandler>>>>,
}
impl MessageHandlerMap {
fn new() -> Self {
Self {
handlers: Arc::new(Mutex::new(HashMap::new())),
}
}
fn insert(&self, handler_id: HandlerId, handler: Arc<dyn MessageHandler>) {
self.handlers.lock().unwrap().insert(handler_id, handler);
}
/// 移除处理器,返回剩余处理器数量
fn remove(&self, handler_id: HandlerId) -> Option<Arc<dyn MessageHandler>> {
self.handlers.lock().unwrap().remove(&handler_id)
}
#[allow(dead_code)]
fn remove_all(&self) {
self.handlers.lock().unwrap().clear();
}
fn values(&self) -> Vec<Arc<dyn MessageHandler>> {
self.handlers.lock().unwrap().values().cloned().collect()
}
#[allow(dead_code)]
fn is_empty(&self) -> bool {
self.handlers.lock().unwrap().is_empty()
}
}
#[must_use = "this `SubscribeTopicHandler` implements `Drop`, which will unregister the handler"]
#[derive(Clone)]
pub struct SubscribeTopicHandler {
topic: String,
handler_id: HandlerId,
handler_map: SubscribeHandlerMap,
}
impl SubscribeTopicHandler {
pub fn new(topic: &str, handler_id: HandlerId, handler_map: SubscribeHandlerMap) -> Self {
Self {
topic: topic.to_string(),
handler_id,
handler_map,
}
}
pub fn unregister(&self) {
self.handler_map.remove(&self.topic, self.handler_id);
}
}
/// 订阅消息处理器
#[async_trait::async_trait]
pub trait MessageHandler: Send + Sync {
async fn handle(&self, publish: Publish);
}
/// 为闭包实现消息处理器
#[async_trait::async_trait]
impl<F> MessageHandler for F
where
F: Fn(Publish) + Sync + Send,
{
async fn handle(&self, publish: Publish) {
self(publish);
}
}
impl MqttClient {
pub async fn clear(&self) -> Result<(), MqttClientError> {
self.client.disconnect().await?;
// 清空订阅处理器
self.subscriptions.clear();
Ok(())
}
pub fn id(&self) -> &str {
&self.id
}
pub async fn subscribe(&self, topic: &str, qos: QoS) -> Result<(), MqttClientError> {
self.client.subscribe(topic, qos).await?;
Ok(())
}
pub async fn unsubscribe(&self, topic: &str) -> Result<(), MqttClientError> {
self.client.unsubscribe(topic).await?;
Ok(())
}
pub fn register_topic_handler<H>(&self, topic: &str, handler: H) -> SubscribeTopicHandler
where
H: MessageHandler + 'static,
{
let handler_id = HandlerId(TypeId::of::<H>());
self.subscriptions
.insert(topic, handler_id, Arc::new(handler));
SubscribeTopicHandler::new(topic, handler_id, self.subscriptions.clone())
}
pub fn unregister_topic(&self, topic: &str) {
self.subscriptions.remove_all(topic);
}
pub fn topic_handler_count(&self, topic: &str) -> usize {
if let Some(topic_handlers) = self.subscriptions.get_handlers(topic) {
topic_handlers.len()
} else {
0
}
}
pub async fn publish(
&self,
topic: &str,
qos: QoS,
payload: Vec<u8>,
) -> Result<(), MqttClientError> {
self.client.publish(topic, qos, false, payload).await?;
Ok(())
}
pub fn next_request_id(&self) -> u64 {
self.request_id.fetch_add(1, Ordering::Relaxed)
}
/// 发送请求并等待响应
/// TODO: 需要测试中请求时的并发情况(多个请求同时等待,以及在请求时订阅和发布是否受影响)
pub async fn request(
&self,
topic: &str,
qos: QoS,
payload: Vec<u8>,
) -> Result<MqttResponse, MqttClientError> {
// 订阅响应主题
let response_topic = format!("{}/{}/resp/{}", self.id, topic, self.next_request_id());
self.subscribe(&response_topic, QoS::ExactlyOnce).await?;
// 创建请求future
let response_future = MqttResponseFuture::new(&response_topic, self.request_timeout);
// 注册响应处理器
let response_handler =
self.register_topic_handler(&response_topic, response_future.clone());
// 发布请求
let property = PublishProperties {
response_topic: Some(response_topic.clone()),
..Default::default()
};
self.client
.publish_with_properties(topic, qos, false, payload, property)
.await?;
// 等待响应
let resp = response_future.await;
// 注销响应处理器并取消订阅
response_handler.unregister();
self.unsubscribe(&response_topic).await?;
Ok(resp)
}
/// 发送请求并等待响应,指定响应超时时间
/// 响应超时时间为0时表示永不超时
pub async fn request_with_timeout(
&self,
topic: &str,
qos: QoS,
payload: Vec<u8>,
timeout: Duration,
) -> Result<MqttResponse, MqttClientError> {
let response_topic = format!("{}/{}/resp/{}", self.id, topic, self.next_request_id());
self.subscribe(&response_topic, QoS::ExactlyOnce).await?;
let response_future = MqttResponseFuture::new(&response_topic, timeout);
let response_handler =
self.register_topic_handler(&response_topic, response_future.clone());
let property = PublishProperties {
response_topic: Some(response_topic.clone()),
..Default::default()
};
self.client
.publish_with_properties(topic, qos, false, payload, property)
.await?;
let resp = response_future.await;
response_handler.unregister();
self.unsubscribe(&response_topic).await?;
Ok(resp)
}
fn run_async(&self, eventloop: EventLoop) {
let cli = self.clone();
tokio::spawn(async move {
cli.run(eventloop).await;
});
}
async fn run(&self, mut eventloop: EventLoop) {
while let Ok(notification) = eventloop.poll().await {
match notification {
Event::Incoming(Packet::Publish(publish)) => {
trace!("Received message: {:?}", publish);
let topic: String = String::from_utf8_lossy(&publish.topic).to_string();
if let Some(topic_handlers) = self.subscriptions.get_handlers(&topic) {
for handler in topic_handlers {
let handler = handler.clone();
let p = publish.clone();
tokio::spawn(async move {
handler.handle(p).await;
});
}
}
}
Event::Outgoing(Outgoing::Disconnect) => {
info!("Disconnected to the broker");
break;
}
Event::Incoming(Packet::Disconnect(disconnect)) => {
info!("Disconnected from the broker: {:?}", disconnect);
break;
}
_ => {
trace!("Unhandled event: {:?}", notification);
}
}
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum MqttResponseState {
Waiting,
Received,
Timeout,
}
/// MQTT请求响应
#[derive(Clone, Debug)]
pub struct MqttResponse {
state: Arc<Mutex<MqttResponseState>>,
response: Arc<Mutex<Bytes>>,
}
impl Default for MqttResponse {
fn default() -> Self {
Self::new()
}
}
impl MqttResponse {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(MqttResponseState::Waiting)),
response: Arc::new(Mutex::new(Bytes::new())),
}
}
pub fn is_waiting(&self) -> bool {
*self.state.lock().unwrap() == MqttResponseState::Waiting
}
pub fn is_received(&self) -> bool {
*self.state.lock().unwrap() == MqttResponseState::Received
}
pub fn is_timeout(&self) -> bool {
*self.state.lock().unwrap() == MqttResponseState::Timeout
}
pub fn set_timeout(&self) {
*self.state.lock().unwrap() = MqttResponseState::Timeout;
}
pub fn set(&self, response: Bytes) {
*self.state.lock().unwrap() = MqttResponseState::Received;
*self.response.lock().unwrap() = response;
}
pub fn get(&self) -> Bytes {
self.response.lock().unwrap().clone()
}
}
/// MQTT响应Future
#[derive(Clone)]
pub struct MqttResponseFuture {
pub start_time: std::time::Instant,
timeout: Duration,
tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
waker: Arc<Mutex<Option<Waker>>>,
response_topic: String,
response: MqttResponse,
}
impl MqttResponseFuture {
pub fn new(response_topic: &str, timeout: Duration) -> Self {
let (tx, rx) = oneshot::channel();
let r = Self {
start_time: std::time::Instant::now(),
timeout,
tx: Arc::new(Mutex::new(Some(tx))),
waker: Arc::new(Mutex::new(None)),
response_topic: response_topic.to_string(),
response: MqttResponse::new(),
};
// 启动超时检查
r.start_timeout_monitor(rx);
r
}
/// 启动超时监控任务逻辑
fn start_timeout_monitor(&self, rx: oneshot::Receiver<()>) {
if self.timeout.as_millis() == 0 {
return;
}
let response = self.response.clone();
let response_topic = self.response_topic.clone();
let duration = self.timeout;
let waker = self.waker.clone();
tokio::spawn(async move {
if (timeout(duration, rx).await).is_err() {
error!("Mqtt response timeout: {:?}", response_topic);
response.set_timeout();
if let Some(waker) = waker.lock().unwrap().take() {
waker.wake();
}
}
});
}
}
#[async_trait::async_trait]
impl MessageHandler for MqttResponseFuture {
async fn handle(&self, publish: Publish) {
if publish.topic == self.response_topic {
self.response.set(publish.payload);
if let Some(tx) = self.tx.lock().unwrap().take() {
tx.send(())
.expect("Send Mqtt response timeout signal failed");
}
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
}
}
}
impl std::future::Future for MqttResponseFuture {
type Output = MqttResponse;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
if self.response.is_waiting() {
debug!(
"topic={} Response future poll waiting...",
self.response_topic
);
self.waker.lock().unwrap().replace(cx.waker().clone());
std::task::Poll::Pending
} else {
debug!(
"topic={} Response future poll ready: {:?}",
self.response_topic, self.response
);
std::task::Poll::Ready(self.response.clone())
}
}
}
pub fn get_publish_response_topic(publish: Option<PublishProperties>) -> Option<String> {
publish.and_then(|p| p.response_topic.clone())
}
#[cfg(test)]
mod tests {
use super::*;
use rtss_log::tracing::{info, Level};
use tokio::{
sync::broadcast,
time::{sleep, Duration},
};
fn create_mqtt_options() -> MqttClientOptions {
MqttClientOptions::new("rtss_test1", "tcp://localhost:1883")
.set_credentials("rtsa", "Joylink@0503")
}
#[tokio::test]
async fn test_subscribe_and_publish() {
rtss_log::Logging::default().with_level(Level::TRACE).init();
let client = create_mqtt_options().build();
client
.subscribe("test/topic", QoS::AtMostOnce)
.await
.unwrap();
let handler1 = client.register_topic_handler("test/topic", |publish: Publish| {
info!(
"Handler 1 received: topic={}, payload={:?}",
String::from_utf8_lossy(&publish.topic),
String::from_utf8_lossy(&publish.payload)
);
});
let _ = client.register_topic_handler("test/topic", |publish: Publish| {
info!(
"Handler 2 received: topic={}, payload={:?}",
String::from_utf8_lossy(&publish.topic),
String::from_utf8_lossy(&publish.payload)
);
});
assert_eq!(client.topic_handler_count("test/topic"), 2);
client
.publish("test/topic", QoS::AtMostOnce, b"Hello, MQTT!".to_vec())
.await
.unwrap();
// Wait for a moment to allow handlers to process the message
sleep(Duration::from_millis(200)).await;
// Test remove_handler
client.unsubscribe("test/topic").await.unwrap();
handler1.unregister();
assert_eq!(client.topic_handler_count("test/topic"), 1);
sleep(Duration::from_millis(200)).await;
// Test clear
client.clear().await.unwrap();
assert_eq!(client.topic_handler_count("test/topic"), 0);
}
#[tokio::test]
async fn test_request() {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
init_global_mqtt_client(create_mqtt_options())
.await
.unwrap();
let c = get_global_mqtt_client().await;
c.subscribe("test/request", QoS::AtMostOnce).await.unwrap();
let handler = |p: Publish| {
info!(
"Request handler received: topic={}, payload={:?}",
String::from_utf8_lossy(&p.topic),
String::from_utf8_lossy(&p.payload)
);
let response = Bytes::from("Hello, response!");
let resp_topic = get_publish_response_topic(p.properties.clone());
if let Some(r_topic) = resp_topic {
tokio::spawn(async move {
// 此处需要使用全局MqttClient实例否则会出现死锁
let c = get_global_mqtt_client().await;
c.publish(&r_topic, QoS::AtMostOnce, response.to_vec())
.await
.unwrap();
});
}
};
let _ = c.register_topic_handler("test/request", handler);
let response = c
.request("test/request", QoS::AtMostOnce, b"Hello, request!".to_vec())
.await
.unwrap();
info!("Request response: {:?}", response);
}
#[tokio::test]
async fn test_async_broadcast() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
}

View File

@ -5,9 +5,9 @@ edition = "2021"
[dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
rtss_log = { path = "../crates/rtss_log" }
rtss_api = { path = "crates/rtss_api" }
rtss_db = { path = "../crates/rtss_db" }
rtsa_log = { path = "../crates/rtsa_log" }
rtsa_api = { path = "crates/rtsa_api" }
rtsa_db = { path = "../crates/rtsa_db" }
serde = { workspace = true }
config = { workspace = true }
clap = { workspace = true, features = ["derive"] }

View File

@ -11,11 +11,11 @@ ENV TZ=Asia/Shanghai
# 复制时区信息到系统时区目录
RUN cp /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
WORKDIR /rtss_sim
COPY ./target/x86_64-unknown-linux-musl/release/rtss_simulation ./rtss_sim
WORKDIR /rtsa
COPY ./target/x86_64-unknown-linux-musl/release/manager ./rtsa_m
COPY ./conf/* ./conf/
COPY ./migrations/* ./migrations/
EXPOSE 8765
CMD ["sh", "-c", "./rtss_sim db migrate && ./rtss_sim serve"]
CMD ["sh", "-c", "./rtsa_m db migrate && ./rtsa_m serve"]

View File

@ -2,4 +2,4 @@
url = "postgresql://joylink:Joylink@0503@localhost:5432/joylink"
[sso]
base_url = "http://192.168.33.233/rtss-server"
base_url = "http://192.168.33.233/rtsa-server"

View File

@ -5,4 +5,4 @@ url = "postgresql://joylink:Joylink@0503@10.11.11.2:5432/joylink"
level = "debug"
[sso]
base_url = "http://192.168.33.233/rtss-server"
base_url = "http://192.168.33.233/rtsa-server"

View File

@ -1,5 +1,5 @@
[package]
name = "rtss_api"
name = "rtsa_api"
version = "0.1.0"
edition = "2021"
@ -19,6 +19,6 @@ base64 = "0.22.1"
sysinfo = "0.31.3"
reqwest = { version = "0.12.7", default-features = false, features = ["rustls-tls", "json"] }
rtss_log = { path = "../../../crates/rtss_log" }
rtss_db = { path = "../../../crates/rtss_db" }
rtss_dto = { path = "../../../crates/rtss_dto" }
rtsa_log = { path = "../../../crates/rtsa_log" }
rtsa_db = { path = "../../../crates/rtsa_db" }
rtsa_dto = { path = "../../../crates/rtsa_dto" }

View File

@ -1,6 +1,6 @@
use async_graphql::{InputObject, InputType, OutputType, SimpleObject};
use rtss_db::{common::TableColumn, model::DraftDataColumn};
use rtss_dto::common::IscsStyle;
use rtsa_db::{common::TableColumn, model::DraftDataColumn};
use rtsa_dto::common::IscsStyle;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
@ -36,7 +36,7 @@ mod tests {
#[test]
fn test_iscs_data_options_serialize() {
rtss_log::Logging::default().init();
rtsa_log::Logging::default().init();
let options = IscsDataOptions {
style: IscsStyle::DaShiZhiNeng,
};

View File

@ -3,13 +3,13 @@ use async_graphql::{ComplexObject, Context, InputObject, Object, SimpleObject};
use base64::prelude::*;
use base64::Engine;
use chrono::NaiveDateTime;
use rtss_db::DraftDataAccessor;
use rtss_db::RtssDbAccessor;
use rtss_dto::common::{DataType, Role};
use rtsa_db::DraftDataAccessor;
use rtsa_db::RtsaDbAccessor;
use rtsa_dto::common::{DataType, Role};
use serde_json::Value;
use crate::apis::{PageDto, PageQueryDto};
use crate::loader::RtssDbLoader;
use crate::loader::RtsaDbLoader;
use super::data_options_def::{DataOptions, IscsDataOptions};
use super::release_data::ReleaseDataId;
@ -33,7 +33,7 @@ impl DraftDataQuery {
paging: PageQueryDto,
query: DraftDataFilterDto<Value>,
) -> async_graphql::Result<PageDto<DraftDataDto>> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let paging_result = db_accessor
.paging_query_draft_data(query.into(), paging.into())
.await?;
@ -53,7 +53,7 @@ impl DraftDataQuery {
.await?;
query.user_id = user.id_i32();
query.data_type = Some(DataType::Iscs);
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let paging_result = db_accessor
.paging_query_draft_data(query.into(), paging.into())
.await?;
@ -67,7 +67,7 @@ impl DraftDataQuery {
paging: PageQueryDto,
mut query: DraftDataFilterDto<IscsDataOptions>,
) -> async_graphql::Result<PageDto<DraftIscsDataDto>> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
query.data_type = Some(DataType::Iscs);
let paging_result = db_accessor
.paging_query_draft_data(query.into(), paging.into())
@ -77,7 +77,7 @@ impl DraftDataQuery {
/// 根据id获取草稿数据
#[graphql(guard = "RoleGuard::new(Role::User)")]
async fn draft_data(&self, ctx: &Context<'_>, id: i32) -> async_graphql::Result<DraftDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let draft_data = db_accessor.query_draft_data_by_id(id).await?;
Ok(draft_data.into())
}
@ -94,7 +94,7 @@ impl DraftDataQuery {
.query_user(&ctx.data::<Token>()?.0)
.await?;
let user_id = user.id_i32();
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let exist = db_accessor
.is_draft_data_exist(user_id, &data_type, &name)
.await?;
@ -116,7 +116,7 @@ impl DraftDataMutation {
.query_user(&ctx.data::<Token>()?.0)
.await?;
input = input.with_data_type_and_user_id(DataType::Iscs, user.id_i32());
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let draft_data = db_accessor.create_draft_data(input.into()).await?;
Ok(draft_data.into())
}
@ -128,7 +128,7 @@ impl DraftDataMutation {
id: i32,
name: String,
) -> async_graphql::Result<DraftDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let draft_data = db_accessor.update_draft_data_name(id, &name).await?;
Ok(draft_data.into())
}
@ -141,7 +141,7 @@ impl DraftDataMutation {
id: i32,
data: String, // base64编码的数据
) -> async_graphql::Result<DraftDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let bytes = BASE64_STANDARD
.decode(data)
.map_err(|e| async_graphql::Error::new(format!("base64 decode error: {}", e)))?;
@ -156,7 +156,7 @@ impl DraftDataMutation {
id: i32,
is_shared: bool,
) -> async_graphql::Result<DraftDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let draft_data = db_accessor.set_draft_data_shared(id, is_shared).await?;
Ok(draft_data.into())
}
@ -167,7 +167,7 @@ impl DraftDataMutation {
ctx: &Context<'_>,
id: Vec<i32>,
) -> async_graphql::Result<bool> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
db_accessor.delete_draft_data(id.as_slice()).await?;
Ok(true)
}
@ -179,7 +179,7 @@ impl DraftDataMutation {
id: i32,
release_data_id: i32,
) -> async_graphql::Result<DraftDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let draft_data = db_accessor
.set_default_release_data_id(id, release_data_id)
.await?;
@ -198,7 +198,7 @@ impl DraftDataMutation {
.query_user(&ctx.data::<Token>()?.0)
.await?;
let user_id = user.id_i32();
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let draft_data = db_accessor.save_as_new_draft(id, &name, user_id).await?;
Ok(draft_data.into())
}
@ -223,7 +223,7 @@ impl<T: DataOptions> CreateDraftDataDto<T> {
}
}
impl<T: DataOptions> From<CreateDraftDataDto<T>> for rtss_db::CreateDraftData {
impl<T: DataOptions> From<CreateDraftDataDto<T>> for rtsa_db::CreateDraftData {
fn from(value: CreateDraftDataDto<T>) -> Self {
let cdd = Self::new(
&value.name,
@ -246,12 +246,12 @@ pub struct UserDraftDataFilterDto<T: DataOptions> {
pub user_id: i32,
pub name: Option<String>,
/// 数据类型,在某个具体类型查询时不传,传了也不生效
pub data_type: Option<rtss_dto::common::DataType>,
pub data_type: Option<rtsa_dto::common::DataType>,
pub options: Option<T>,
pub is_shared: Option<bool>,
}
impl<T: DataOptions> From<UserDraftDataFilterDto<T>> for rtss_db::DraftDataQuery {
impl<T: DataOptions> From<UserDraftDataFilterDto<T>> for rtsa_db::DraftDataQuery {
fn from(value: UserDraftDataFilterDto<T>) -> Self {
Self {
user_id: Some(value.user_id),
@ -277,7 +277,7 @@ pub struct DraftDataFilterDto<T: DataOptions> {
pub options: Option<T>,
}
impl<T: DataOptions> From<DraftDataFilterDto<T>> for rtss_db::DraftDataQuery {
impl<T: DataOptions> From<DraftDataFilterDto<T>> for rtsa_db::DraftDataQuery {
fn from(value: DraftDataFilterDto<T>) -> Self {
Self {
user_id: value.user_id,
@ -313,7 +313,7 @@ impl DraftDataDto {
ctx: &Context<'_>,
) -> async_graphql::Result<Option<String>> {
if let Some(version_id) = self.default_release_data_id {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let name = loader.load_one(ReleaseDataId::new(version_id)).await?;
Ok(name)
} else {
@ -323,14 +323,14 @@ impl DraftDataDto {
/// 获取用户name
async fn user_name(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<String>> {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let name = loader.load_one(UserId::new(self.user_id)).await?;
Ok(name)
}
}
impl From<rtss_db::model::DraftDataModel> for DraftDataDto {
fn from(value: rtss_db::model::DraftDataModel) -> Self {
impl From<rtsa_db::model::DraftDataModel> for DraftDataDto {
fn from(value: rtsa_db::model::DraftDataModel) -> Self {
Self {
id: value.id,
name: value.name,
@ -354,8 +354,8 @@ pub struct DraftIscsDataDto {
pub options: Option<IscsDataOptions>,
}
impl From<rtss_db::model::DraftDataModel> for DraftIscsDataDto {
fn from(value: rtss_db::model::DraftDataModel) -> Self {
impl From<rtsa_db::model::DraftDataModel> for DraftIscsDataDto {
fn from(value: rtsa_db::model::DraftDataModel) -> Self {
Self {
options: value
.options

View File

@ -1,15 +1,15 @@
use crate::{
apis::{PageDto, PageQueryDto},
loader::RtssDbLoader,
loader::RtsaDbLoader,
user_auth::{RoleGuard, Token, UserAuthCache},
};
use async_graphql::{
dataloader::DataLoader, ComplexObject, Context, InputObject, Object, SimpleObject,
};
use chrono::NaiveDateTime;
use rtss_db::{CreateFeature, FeatureAccessor, RtssDbAccessor, UpdateFeature};
use rtss_dto::common::FeatureType;
use rtss_dto::common::Role;
use rtsa_db::{CreateFeature, FeatureAccessor, RtsaDbAccessor, UpdateFeature};
use rtsa_dto::common::FeatureType;
use rtsa_dto::common::Role;
use serde_json::Value;
use super::{
@ -33,7 +33,7 @@ impl FeatureQuery {
page: PageQueryDto,
query: FeatureQueryDto,
) -> async_graphql::Result<PageDto<FeatureDto>> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let paging = dba
.paging_query_features(page.into(), &query.into())
.await?;
@ -43,7 +43,7 @@ impl FeatureQuery {
/// id获取功能feature
#[graphql(guard = "RoleGuard::new(Role::User)")]
async fn feature(&self, ctx: &Context<'_>, id: i32) -> async_graphql::Result<FeatureDto> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let feature = dba.get_feature(id).await?;
Ok(feature.into())
}
@ -55,7 +55,7 @@ impl FeatureQuery {
ctx: &Context<'_>,
ids: Vec<i32>,
) -> async_graphql::Result<Vec<FeatureDto>> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let features = dba.get_features(ids.as_slice()).await?;
Ok(features.into_iter().map(|f| f.into()).collect())
}
@ -71,7 +71,7 @@ impl FeatureMutation {
id: i32,
is_published: bool,
) -> async_graphql::Result<FeatureDto> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let feature = dba.set_feature_published(id, is_published).await?;
Ok(feature.into())
}
@ -83,7 +83,7 @@ impl FeatureMutation {
ctx: &Context<'_>,
mut input: CreateFeatureDto<UrFeatureConfig>,
) -> async_graphql::Result<FeatureDto> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let user = ctx
.data::<UserAuthCache>()?
.query_user(&ctx.data::<Token>()?.0)
@ -100,7 +100,7 @@ impl FeatureMutation {
ctx: &Context<'_>,
input: UpdateFeatureDto<UrFeatureConfig>,
) -> async_graphql::Result<FeatureDto> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let feature = dba.update_feature(&input.into()).await?;
Ok(feature.into())
}
@ -168,7 +168,7 @@ pub struct FeatureQueryDto {
pub is_published: Option<bool>,
}
impl From<FeatureQueryDto> for rtss_db::FeaturePagingFilter {
impl From<FeatureQueryDto> for rtsa_db::FeaturePagingFilter {
fn from(value: FeatureQueryDto) -> Self {
Self {
name: value.name,
@ -197,21 +197,21 @@ pub struct FeatureDto {
impl FeatureDto {
/// 创建用户name
async fn creator_name(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<String>> {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let name = loader.load_one(UserId::new(self.creator_id)).await?;
Ok(name)
}
/// 更新用户name
async fn updater_name(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<String>> {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let name = loader.load_one(UserId::new(self.updater_id)).await?;
Ok(name)
}
}
impl From<rtss_db::model::FeatureModel> for FeatureDto {
fn from(value: rtss_db::model::FeatureModel) -> Self {
impl From<rtsa_db::model::FeatureModel> for FeatureDto {
fn from(value: rtsa_db::model::FeatureModel) -> Self {
Self {
id: value.id,
feature_type: value.feature_type,

View File

@ -25,7 +25,7 @@ pub struct Mutation(
);
#[derive(Enum, Copy, Clone, Default, Eq, PartialEq, Debug)]
#[graphql(remote = "rtss_db::common::SortOrder")]
#[graphql(remote = "rtsa_db::common::SortOrder")]
pub enum SortOrder {
#[default]
Asc,
@ -40,7 +40,7 @@ pub struct PageQueryDto {
pub items_per_page: i32,
}
impl From<PageQueryDto> for rtss_db::common::PageQuery {
impl From<PageQueryDto> for rtsa_db::common::PageQuery {
fn from(value: PageQueryDto) -> Self {
Self {
page: value.page,
@ -74,8 +74,8 @@ impl<T: OutputType> PageDto<T> {
}
}
impl<T: OutputType, M: Into<T>> From<rtss_db::common::PageResult<M>> for PageDto<T> {
fn from(value: rtss_db::common::PageResult<M>) -> Self {
impl<T: OutputType, M: Into<T>> From<rtsa_db::common::PageResult<M>> for PageDto<T> {
fn from(value: rtsa_db::common::PageResult<M>) -> Self {
Self::new(
value.total,
value.data.into_iter().map(|m| m.into()).collect(),

View File

@ -5,14 +5,14 @@ use async_graphql::dataloader::*;
use async_graphql::{ComplexObject, Context, InputObject, Object, SimpleObject};
use base64::prelude::*;
use chrono::NaiveDateTime;
use rtss_db::model::*;
use rtss_db::prelude::*;
use rtss_db::{model::ReleaseDataModel, ReleaseDataAccessor, RtssDbAccessor};
use rtss_dto::common::{DataType, Role};
use rtsa_db::model::*;
use rtsa_db::prelude::*;
use rtsa_db::{model::ReleaseDataModel, ReleaseDataAccessor, RtsaDbAccessor};
use rtsa_dto::common::{DataType, Role};
use serde_json::Value;
use crate::apis::draft_data::DraftDataDto;
use crate::loader::RtssDbLoader;
use crate::loader::RtsaDbLoader;
use super::data_options_def::{DataOptions, IscsDataOptions};
use super::user::UserId;
@ -36,7 +36,7 @@ impl ReleaseDataQuery {
page: PageQueryDto,
query: ReleaseTypedDataFilterDto<Value>,
) -> async_graphql::Result<PageDto<ReleaseDataDto>> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let paging = db_accessor
.paging_query_release_data_list(query.into(), page.into())
.await?;
@ -51,7 +51,7 @@ impl ReleaseDataQuery {
page: PageQueryDto,
mut query: ReleaseTypedDataFilterDto<IscsDataOptions>,
) -> async_graphql::Result<PageDto<ReleaseIscsDataWithoutVersionDto>> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
query.data_type = Some(DataType::Iscs);
let paging = db_accessor
.paging_query_release_data_list(query.into(), page.into())
@ -66,7 +66,7 @@ impl ReleaseDataQuery {
ctx: &Context<'_>,
id: i32,
) -> async_graphql::Result<ReleaseDataWithUsedVersionDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let model = db_accessor.query_release_data_with_used_version(id).await?;
Ok(model.into())
}
@ -79,7 +79,7 @@ impl ReleaseDataQuery {
data_type: DataType,
name: String,
) -> async_graphql::Result<bool> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor
.is_release_data_name_exist(data_type, &name)
.await?;
@ -94,7 +94,7 @@ impl ReleaseDataQuery {
data_id: i32,
page: PageQueryDto,
) -> async_graphql::Result<PageDto<ReleaseDataVersionDto>> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let paging = db_accessor
.paging_query_release_data_version_list(data_id, page.into())
.await?;
@ -108,7 +108,7 @@ impl ReleaseDataQuery {
ctx: &Context<'_>,
version_id: i32,
) -> async_graphql::Result<ReleaseDataVersionDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let model = db_accessor
.query_release_data_version_by_id(version_id)
.await?;
@ -132,7 +132,7 @@ impl ReleaseDataMutation {
.query_user(&ctx.data::<Token>()?.0)
.await?;
let user_id = user.id_i32();
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor
.release_new_from_draft(draft_id, &name, &description, Some(user_id))
.await?;
@ -152,7 +152,7 @@ impl ReleaseDataMutation {
.query_user(&ctx.data::<Token>()?.0)
.await?;
let user_id = user.id_i32();
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor
.release_to_existing(draft_id, &description, Some(user_id))
.await?;
@ -167,7 +167,7 @@ impl ReleaseDataMutation {
id: i32,
name: String,
) -> async_graphql::Result<ReleaseDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor.update_release_data_name(id, &name).await?;
Ok(result.into())
}
@ -180,7 +180,7 @@ impl ReleaseDataMutation {
id: i32,
is_published: bool,
) -> async_graphql::Result<ReleaseDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor
.set_release_data_published(id, is_published)
.await?;
@ -195,7 +195,7 @@ impl ReleaseDataMutation {
id: i32,
version_id: i32,
) -> async_graphql::Result<ReleaseDataDto> {
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor
.set_release_data_used_version(id, version_id)
.await?;
@ -214,7 +214,7 @@ impl ReleaseDataMutation {
.query_user(&ctx.data::<Token>()?.0)
.await?;
let user_id = user.id_i32();
let db_accessor = ctx.data::<RtssDbAccessor>()?;
let db_accessor = ctx.data::<RtsaDbAccessor>()?;
let result = db_accessor
.create_draft_from_release_version(version_id, user_id)
.await?;
@ -235,7 +235,7 @@ pub struct ReleaseTypedDataFilterDto<T: DataOptions> {
pub is_published: Option<bool>,
}
impl<T: DataOptions> From<ReleaseTypedDataFilterDto<T>> for rtss_db::ReleaseDataQuery {
impl<T: DataOptions> From<ReleaseTypedDataFilterDto<T>> for rtsa_db::ReleaseDataQuery {
fn from(value: ReleaseTypedDataFilterDto<T>) -> Self {
Self {
name: value.name,
@ -265,7 +265,7 @@ pub struct ReleaseDataDto {
impl ReleaseDataDto {
async fn description(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<String>> {
if let Some(version_id) = self.used_version_id {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let description = loader
.load_one(ReleaseDataVersionId::new(version_id))
.await?;
@ -277,7 +277,7 @@ impl ReleaseDataDto {
/// 获取用户name
async fn user_name(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<String>> {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let name = loader.load_one(UserId::new(self.user_id)).await?;
Ok(name)
}
@ -294,7 +294,7 @@ impl ReleaseDataId {
}
}
impl Loader<ReleaseDataId> for RtssDbLoader {
impl Loader<ReleaseDataId> for RtsaDbLoader {
type Value = String;
type Error = Arc<DbAccessError>;
@ -326,7 +326,7 @@ impl ReleaseDataVersionId {
}
}
impl Loader<ReleaseDataVersionId> for RtssDbLoader {
impl Loader<ReleaseDataVersionId> for RtsaDbLoader {
type Value = String;
type Error = Arc<DbAccessError>;
@ -388,7 +388,7 @@ pub struct ReleaseDataVersionDto {
impl ReleaseDataVersionDto {
/// 获取用户name
async fn user_name(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<String>> {
let loader = ctx.data_unchecked::<DataLoader<RtssDbLoader>>();
let loader = ctx.data_unchecked::<DataLoader<RtsaDbLoader>>();
let name = loader.load_one(UserId::new(self.user_id)).await?;
Ok(name)
}

View File

@ -2,14 +2,14 @@ use std::{collections::HashMap, sync::Arc};
use async_graphql::{dataloader::Loader, Context, InputObject, Object, SimpleObject};
use chrono::NaiveDateTime;
use rtss_db::{DbAccessError, RtssDbAccessor, UserAccessor};
use rtsa_db::{DbAccessError, RtsaDbAccessor, UserAccessor};
use crate::{
loader::RtssDbLoader,
loader::RtsaDbLoader,
user_auth::{build_jwt, Claims, RoleGuard, Token, UserAuthCache, UserInfoDto},
UserAuthClient,
};
use rtss_dto::common::Role;
use rtsa_dto::common::Role;
use super::{PageDto, PageQueryDto};
@ -47,7 +47,7 @@ impl UserQuery {
page: PageQueryDto,
query: UserQueryDto,
) -> async_graphql::Result<PageDto<UserDto>> {
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
let paging = dba.query_user_page(page.into(), query.into()).await?;
Ok(paging.into())
}
@ -63,12 +63,12 @@ impl UserMutation {
async fn sync_user(&self, ctx: &Context<'_>) -> async_graphql::Result<bool> {
let http_client = ctx.data::<UserAuthClient>()?;
let users = http_client.query_all_users(ctx.data::<Token>()?).await?;
let dba = ctx.data::<RtssDbAccessor>()?;
let dba = ctx.data::<RtsaDbAccessor>()?;
dba.sync_user(
users
.into_iter()
.map(|u| u.into())
.collect::<Vec<rtss_db::SyncUserInfo>>()
.collect::<Vec<rtsa_db::SyncUserInfo>>()
.as_slice(),
)
.await?;
@ -85,7 +85,7 @@ pub struct UserQueryDto {
pub roles: Option<Vec<Role>>,
}
impl From<UserQueryDto> for rtss_db::UserPageFilter {
impl From<UserQueryDto> for rtsa_db::UserPageFilter {
fn from(value: UserQueryDto) -> Self {
Self {
id: value.id,
@ -122,8 +122,8 @@ impl From<UserInfoDto> for UserDto {
}
}
impl From<rtss_db::model::UserModel> for UserDto {
fn from(value: rtss_db::model::UserModel) -> Self {
impl From<rtsa_db::model::UserModel> for UserDto {
fn from(value: rtsa_db::model::UserModel) -> Self {
Self {
id: value.id,
name: value.username,
@ -147,7 +147,7 @@ impl UserId {
}
}
impl Loader<UserId> for RtssDbLoader {
impl Loader<UserId> for RtsaDbLoader {
type Value = String;
type Error = Arc<DbAccessError>;

View File

@ -0,0 +1,10 @@
/// 数据库加载器
pub struct RtsaDbLoader {
pub(crate) db_accessor: rtsa_db::RtsaDbAccessor,
}
impl RtsaDbLoader {
pub fn new(db_accessor: rtsa_db::RtsaDbAccessor) -> Self {
Self { db_accessor }
}
}

View File

@ -10,13 +10,13 @@ use axum::{
};
use dataloader::DataLoader;
use http::{playground_source, GraphQLPlaygroundConfig};
use rtss_db::RtssDbAccessor;
use rtss_log::tracing::{debug, info};
use rtsa_db::RtsaDbAccessor;
use rtsa_log::tracing::{debug, info};
use tokio::net::TcpListener;
use tower_http::cors::CorsLayer;
use crate::apis::{Mutation, Query};
use crate::loader::RtssDbLoader;
use crate::loader::RtsaDbLoader;
use crate::user_auth;
pub use crate::user_auth::UserAuthClient;
@ -52,7 +52,7 @@ pub async fn serve(config: ServerConfig) -> anyhow::Result<()> {
.user_auth_client
.clone()
.expect("user auth client not configured");
let dba = rtss_db::get_db_accessor(&config.database_url).await;
let dba = rtsa_db::get_db_accessor(&config.database_url).await;
let schema = new_schema(SchemaOptions::new(client, dba));
let app = Router::new()
@ -76,7 +76,7 @@ pub async fn serve(config: ServerConfig) -> anyhow::Result<()> {
}
async fn graphql_handler(
State(schema): State<RtssAppSchema>,
State(schema): State<RtsaAppSchema>,
headers: HeaderMap,
req: GraphQLRequest,
) -> GraphQLResponse {
@ -92,31 +92,31 @@ async fn graphiql() -> impl IntoResponse {
Html(playground_source(GraphQLPlaygroundConfig::new("/")))
}
pub type RtssAppSchema = Schema<Query, Mutation, EmptySubscription>;
pub type RtsaAppSchema = Schema<Query, Mutation, EmptySubscription>;
pub struct SchemaOptions {
pub user_auth_client: UserAuthClient,
pub user_info_cache: user_auth::UserAuthCache,
pub rtss_dba: RtssDbAccessor,
pub rtsa_dba: RtsaDbAccessor,
}
impl SchemaOptions {
pub fn new(user_auth_client: UserAuthClient, rtss_dba: RtssDbAccessor) -> Self {
pub fn new(user_auth_client: UserAuthClient, rtsa_dba: RtsaDbAccessor) -> Self {
let user_info_cache = user_auth::UserAuthCache::new(user_auth_client.clone());
Self {
user_auth_client,
user_info_cache,
rtss_dba,
rtsa_dba,
}
}
}
pub fn new_schema(options: SchemaOptions) -> RtssAppSchema {
let loader = RtssDbLoader::new(options.rtss_dba.clone());
pub fn new_schema(options: SchemaOptions) -> RtsaAppSchema {
let loader = RtsaDbLoader::new(options.rtsa_dba.clone());
Schema::build(Query::default(), Mutation::default(), EmptySubscription)
.data(options.user_auth_client)
.data(options.user_info_cache)
.data(options.rtss_dba)
.data(options.rtsa_dba)
.data(DataLoader::new(loader, tokio::spawn))
// .data(MutexSimulationManager::default())
.finish()
@ -129,7 +129,7 @@ mod tests {
#[tokio::test]
async fn test_new_schema() {
let dba =
rtss_db::get_db_accessor("postgresql://joylink:Joylink@0503@localhost:5432/joylink")
rtsa_db::get_db_accessor("postgresql://joylink:Joylink@0503@localhost:5432/joylink")
.await;
let _ = new_schema(SchemaOptions::new(
crate::UserAuthClient {

View File

@ -70,7 +70,7 @@ mod tests {
#[test]
fn test_jwt() {
rtss_log::Logging::default().init();
rtsa_log::Logging::default().init();
let claim = Claims::new(5);
let jwt = build_jwt(claim).unwrap();
println!("jwt: {}", jwt.0);

View File

@ -6,8 +6,8 @@ use std::{
use async_graphql::Guard;
use axum::http::HeaderMap;
use chrono::{DateTime, Local};
use rtss_dto::common::Role;
use rtss_log::tracing::error;
use rtsa_dto::common::Role;
use rtsa_log::tracing::error;
use serde::{Deserialize, Serialize};
mod jwt_auth;
@ -225,7 +225,7 @@ fn parse_to_date_time(s: &str) -> chrono::DateTime<Local> {
.unwrap()
}
impl From<UserInfoDto> for rtss_db::SyncUserInfo {
impl From<UserInfoDto> for rtsa_db::SyncUserInfo {
fn from(user_info: UserInfoDto) -> Self {
Self {
id: user_info.id_i32(),
@ -301,9 +301,9 @@ mod tests {
// #[tokio::test]
// async fn test_user_auth_cache() -> anyhow::Result<()> {
// rtss_log::Logging::default().with_level(Level::DEBUG).init();
// rtsa_log::Logging::default().with_level(Level::DEBUG).init();
// let client = UserAuthClient {
// base_url: "http://192.168.33.233/rtss-server".to_string(),
// base_url: "http://192.168.33.233/rtsa-server".to_string(),
// login_url: "/api/login".to_string(),
// logout_url: "/api/login/logout".to_string(),
// user_info_url: "/api/login/getUserInfo".to_string(),

View File

@ -1,10 +0,0 @@
/// 数据库加载器
pub struct RtssDbLoader {
pub(crate) db_accessor: rtss_db::RtssDbAccessor,
}
impl RtssDbLoader {
pub fn new(db_accessor: rtss_db::RtssDbAccessor) -> Self {
Self { db_accessor }
}
}

View File

@ -20,9 +20,9 @@ pub struct Log {
level: String,
}
impl From<Log> for rtss_log::Logging {
impl From<Log> for rtsa_log::Logging {
fn from(log: Log) -> Self {
rtss_log::Logging {
rtsa_log::Logging {
level: log.level.parse().unwrap(),
..Default::default()
}
@ -69,9 +69,9 @@ impl AppConfig {
// Default to 'dev' env
// Note that this file is _optional_
.add_source(File::with_name(&format!("{dir}/{run_mode}")).required(true))
// Add in settings from the environment (with a prefix of RTSS_SIM)
// Add in settings from the environment (with a prefix of rtsa_SIM)
// Eg.. `APP_DEBUG=1 ./target/app` would set the `debug` key
.add_source(Environment::with_prefix("RTSS_SIM").separator("_"))
.add_source(Environment::with_prefix("rtsa_SIM").separator("_"))
// You may also programmatically change settings
// .set_override("database.url", "postgres://")?
// build the configuration

View File

@ -6,7 +6,7 @@ use crate::{app_config, CmdExecutor};
use super::DbSubCommand;
#[derive(Parser, Debug)]
#[command(name = "rtss-sim", version, author, about, long_about = None)]
#[command(name = "rtsa-sim", version, author, about, long_about = None)]
pub struct Cmd {
#[command(subcommand)]
pub cmd: SubCommand,
@ -31,11 +31,11 @@ impl CmdExecutor for ServerOpts {
async fn execute(&self) -> anyhow::Result<()> {
let app_config =
app_config::AppConfig::new(&self.config_path).expect("Failed to load app config");
let log: rtss_log::Logging = app_config.log.into();
let log: rtsa_log::Logging = app_config.log.into();
log.init();
rtss_api::serve(
rtss_api::ServerConfig::new(&app_config.database.url, app_config.server.port)
.with_user_auth_client(rtss_api::UserAuthClient {
rtsa_api::serve(
rtsa_api::ServerConfig::new(&app_config.database.url, app_config.server.port)
.with_user_auth_client(rtsa_api::UserAuthClient {
base_url: app_config.sso.base_url,
login_url: app_config.sso.login_url,
logout_url: app_config.sso.logout_url,

View File

@ -22,6 +22,6 @@ impl CmdExecutor for MigrateOpts {
async fn execute(&self) -> anyhow::Result<()> {
let app_config =
app_config::AppConfig::new(&self.config_path).expect("Failed to load app config");
rtss_db::run_migrations(&app_config.database.url).await
rtsa_db::run_migrations(&app_config.database.url).await
}
}

View File

@ -1 +1 @@
DROP SCHEMA rtss CASCADE;
DROP SCHEMA rtsa CASCADE;

View File

@ -1,12 +1,12 @@
-- 初始化数据库SCHEMA(所有轨道交通信号系统仿真的表、类型等都在rtss SCHEMA下)
CREATE SCHEMA rtss;
-- 初始化数据库SCHEMA(所有轨道交通信号系统仿真的表、类型等都在rtsa SCHEMA下)
CREATE SCHEMA rtsa;
-- 创建mqtt客户端id序列
CREATE SEQUENCE rtss.mqtt_client_id_seq;
CREATE SEQUENCE rtsa.mqtt_client_id_seq;
-- 创建用户表
CREATE TABLE
rtss.user (
rtsa.user (
id SERIAL PRIMARY KEY, -- id 自增主键
username VARCHAR(128) NOT NULL, -- 用户名
password VARCHAR(128) NOT NULL, -- 密码
@ -18,40 +18,40 @@ CREATE TABLE
);
-- 创建用户名称索引
CREATE INDEX ON rtss.user (username);
CREATE INDEX ON rtsa.user (username);
-- 创建用户邮箱索引
CREATE INDEX ON rtss.user (email);
CREATE INDEX ON rtsa.user (email);
-- 创建用户手机号索引
CREATE INDEX ON rtss.user (mobile);
CREATE INDEX ON rtsa.user (mobile);
-- 创建用户角色索引
CREATE INDEX ON rtss.user USING GIN (roles);
CREATE INDEX ON rtsa.user USING GIN (roles);
-- 注释用户表
COMMENT ON TABLE rtss.user IS '用户表';
COMMENT ON TABLE rtsa.user IS '用户表';
-- 注释用户表字段
COMMENT ON COLUMN rtss.user.id IS 'id 自增主键';
COMMENT ON COLUMN rtsa.user.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.user.username IS '用户名';
COMMENT ON COLUMN rtsa.user.username IS '用户名';
COMMENT ON COLUMN rtss.user.password IS '密码';
COMMENT ON COLUMN rtsa.user.password IS '密码';
COMMENT ON COLUMN rtss.user.email IS '邮箱';
COMMENT ON COLUMN rtsa.user.email IS '邮箱';
COMMENT ON COLUMN rtss.user.mobile IS '手机号';
COMMENT ON COLUMN rtsa.user.mobile IS '手机号';
COMMENT ON COLUMN rtss.user.roles IS '角色列表';
COMMENT ON COLUMN rtsa.user.roles IS '角色列表';
COMMENT ON COLUMN rtss.user.created_at IS '创建时间';
COMMENT ON COLUMN rtsa.user.created_at IS '创建时间';
COMMENT ON COLUMN rtss.user.updated_at IS '更新时间';
COMMENT ON COLUMN rtsa.user.updated_at IS '更新时间';
-- 创建草稿数据表
CREATE TABLE
rtss.draft_data (
rtsa.draft_data (
id SERIAL PRIMARY KEY, -- id 自增主键
name VARCHAR(128) NOT NULL, -- 草稿名称
data_type INT NOT NULL, -- 数据类型
@ -62,44 +62,44 @@ CREATE TABLE
is_shared BOOLEAN NOT NULL DEFAULT FALSE, -- 是否共享
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 更新时间
FOREIGN KEY (user_id) REFERENCES rtss.user (id) ON DELETE CASCADE, -- 用户外键
FOREIGN KEY (user_id) REFERENCES rtsa.user (id) ON DELETE CASCADE, -- 用户外键
UNIQUE (name, data_type, user_id) -- 一个用户的某个类型的草稿名称唯一
);
-- 创建草稿数据用户索引
CREATE INDEX ON rtss.draft_data (user_id);
CREATE INDEX ON rtsa.draft_data (user_id);
-- 创建草稿数据类型索引
CREATE INDEX ON rtss.draft_data (data_type);
CREATE INDEX ON rtsa.draft_data (data_type);
-- 创建草稿数据配置项索引
CREATE INDEX ON rtss.draft_data USING GIN (options);
CREATE INDEX ON rtsa.draft_data USING GIN (options);
-- 注释草稿数据表
COMMENT ON TABLE rtss.draft_data IS '草稿数据表';
COMMENT ON TABLE rtsa.draft_data IS '草稿数据表';
-- 注释草稿数据表字段
COMMENT ON COLUMN rtss.draft_data.id IS 'id 自增主键';
COMMENT ON COLUMN rtsa.draft_data.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.draft_data.name IS '草稿名称';
COMMENT ON COLUMN rtsa.draft_data.name IS '草稿名称';
COMMENT ON COLUMN rtss.draft_data.data_type IS '数据类型';
COMMENT ON COLUMN rtsa.draft_data.data_type IS '数据类型';
COMMENT ON COLUMN rtss.draft_data.options IS '数据相关的参数项或配置项';
COMMENT ON COLUMN rtsa.draft_data.options IS '数据相关的参数项或配置项';
COMMENT ON COLUMN rtss.draft_data.data IS '草稿数据';
COMMENT ON COLUMN rtsa.draft_data.data IS '草稿数据';
COMMENT ON COLUMN rtss.draft_data.user_id IS '创建用户id';
COMMENT ON COLUMN rtsa.draft_data.user_id IS '创建用户id';
COMMENT ON COLUMN rtss.draft_data.is_shared IS '是否共享';
COMMENT ON COLUMN rtsa.draft_data.is_shared IS '是否共享';
COMMENT ON COLUMN rtss.draft_data.created_at IS '创建时间';
COMMENT ON COLUMN rtsa.draft_data.created_at IS '创建时间';
COMMENT ON COLUMN rtss.draft_data.updated_at IS '更新时间';
COMMENT ON COLUMN rtsa.draft_data.updated_at IS '更新时间';
-- 创建发布数据表
CREATE TABLE
rtss.release_data (
rtsa.release_data (
id SERIAL PRIMARY KEY, -- id 自增主键
name VARCHAR(128) NOT NULL, -- 发布数据名称(数据唯一标识)
data_type INT NOT NULL, -- 数据类型
@ -109,47 +109,47 @@ CREATE TABLE
is_published BOOLEAN NOT NULL DEFAULT TRUE, -- 是否上架
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 更新时间
FOREIGN KEY (user_id) REFERENCES rtss.user (id) ON DELETE CASCADE, -- 用户外键
FOREIGN KEY (user_id) REFERENCES rtsa.user (id) ON DELETE CASCADE, -- 用户外键
UNIQUE(data_type, name) -- 数据类型和名称唯一
);
-- 创建发布数据名称索引
CREATE INDEX ON rtss.release_data (name);
CREATE INDEX ON rtsa.release_data (name);
-- 创建发布数据用户索引
CREATE INDEX ON rtss.release_data (user_id);
CREATE INDEX ON rtsa.release_data (user_id);
-- 创建发布数据类型索引
CREATE INDEX ON rtss.release_data (data_type);
CREATE INDEX ON rtsa.release_data (data_type);
-- 创建发布数据配置项索引
CREATE INDEX ON rtss.release_data USING GIN (options);
CREATE INDEX ON rtsa.release_data USING GIN (options);
-- 注释发布数据表
COMMENT ON TABLE rtss.release_data IS '发布数据表';
COMMENT ON TABLE rtsa.release_data IS '发布数据表';
-- 注释发布数据表字段
COMMENT ON COLUMN rtss.release_data.id IS 'id 自增主键';
COMMENT ON COLUMN rtsa.release_data.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.release_data.name IS '发布数据名称(数据唯一标识)';
COMMENT ON COLUMN rtsa.release_data.name IS '发布数据名称(数据唯一标识)';
COMMENT ON COLUMN rtss.release_data.data_type IS '数据类型';
COMMENT ON COLUMN rtsa.release_data.data_type IS '数据类型';
COMMENT ON COLUMN rtss.release_data.options IS '数据相关的参数项或配置项';
COMMENT ON COLUMN rtsa.release_data.options IS '数据相关的参数项或配置项';
COMMENT ON COLUMN rtss.release_data.used_version_id IS '使用的版本数据id';
COMMENT ON COLUMN rtsa.release_data.used_version_id IS '使用的版本数据id';
COMMENT ON COLUMN rtss.release_data.user_id IS '发布/更新用户id';
COMMENT ON COLUMN rtsa.release_data.user_id IS '发布/更新用户id';
COMMENT ON COLUMN rtss.release_data.is_published IS '是否上架';
COMMENT ON COLUMN rtsa.release_data.is_published IS '是否上架';
COMMENT ON COLUMN rtss.release_data.created_at IS '创建时间';
COMMENT ON COLUMN rtsa.release_data.created_at IS '创建时间';
COMMENT ON COLUMN rtss.release_data.updated_at IS '更新时间';
COMMENT ON COLUMN rtsa.release_data.updated_at IS '更新时间';
-- 创建发布数据版本表
CREATE TABLE
rtss.release_data_version (
rtsa.release_data_version (
id SERIAL PRIMARY KEY, -- id 自增主键
release_data_id INT NOT NULL, -- 发布数据id
options JSONB NULL, -- 数据相关的参数项或配置项
@ -157,46 +157,46 @@ CREATE TABLE
description TEXT NOT NULL, -- 版本描述
user_id INT NOT NULL, -- 发布用户id
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间
FOREIGN KEY (user_id) REFERENCES rtss.user (id) ON DELETE CASCADE, -- 用户外键
FOREIGN KEY (release_data_id) REFERENCES rtss.release_data (id) ON DELETE CASCADE
FOREIGN KEY (user_id) REFERENCES rtsa.user (id) ON DELETE CASCADE, -- 用户外键
FOREIGN KEY (release_data_id) REFERENCES rtsa.release_data (id) ON DELETE CASCADE
);
-- 创建发布数据版本发布数据索引
CREATE INDEX ON rtss.release_data_version (release_data_id);
CREATE INDEX ON rtsa.release_data_version (release_data_id);
-- 创建发布数据版本用户索引
CREATE INDEX ON rtss.release_data_version (user_id);
CREATE INDEX ON rtsa.release_data_version (user_id);
-- 创建发布数据版本配置项索引
CREATE INDEX ON rtss.release_data_version USING GIN (options);
CREATE INDEX ON rtsa.release_data_version USING GIN (options);
-- 创建发布数据当前版本外键
ALTER TABLE rtss.release_data ADD FOREIGN KEY (used_version_id) REFERENCES rtss.release_data_version (id) ON DELETE SET NULL;
ALTER TABLE rtsa.release_data ADD FOREIGN KEY (used_version_id) REFERENCES rtsa.release_data_version (id) ON DELETE SET NULL;
-- 创建草稿数据默认发布数据外键
ALTER TABLE rtss.draft_data ADD FOREIGN KEY (default_release_data_id) REFERENCES rtss.release_data (id) ON DELETE SET NULL;
ALTER TABLE rtsa.draft_data ADD FOREIGN KEY (default_release_data_id) REFERENCES rtsa.release_data (id) ON DELETE SET NULL;
-- 注释发布数据版本表
COMMENT ON TABLE rtss.release_data_version IS '发布数据版本表';
COMMENT ON TABLE rtsa.release_data_version IS '发布数据版本表';
-- 注释发布数据版本表字段
COMMENT ON COLUMN rtss.release_data_version.id IS 'id 自增主键';
COMMENT ON COLUMN rtsa.release_data_version.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.release_data_version.release_data_id IS '发布数据id';
COMMENT ON COLUMN rtsa.release_data_version.release_data_id IS '发布数据id';
COMMENT ON COLUMN rtss.release_data_version.options IS '数据相关的参数项或配置项';
COMMENT ON COLUMN rtsa.release_data_version.options IS '数据相关的参数项或配置项';
COMMENT ON COLUMN rtss.release_data_version.data IS '数据';
COMMENT ON COLUMN rtsa.release_data_version.data IS '数据';
COMMENT ON COLUMN rtss.release_data_version.description IS '版本描述';
COMMENT ON COLUMN rtsa.release_data_version.description IS '版本描述';
COMMENT ON COLUMN rtss.release_data_version.user_id IS '发布用户id';
COMMENT ON COLUMN rtsa.release_data_version.user_id IS '发布用户id';
COMMENT ON COLUMN rtss.release_data_version.created_at IS '创建时间';
COMMENT ON COLUMN rtsa.release_data_version.created_at IS '创建时间';
-- 创建feature表
CREATE TABLE
rtss.feature (
rtsa.feature (
id SERIAL PRIMARY KEY, -- id 自增主键
feature_type INT NOT NULL, -- feature类型
name VARCHAR(128) NOT NULL UNIQUE, -- feature名称
@ -207,68 +207,68 @@ CREATE TABLE
updater_id INT NOT NULL, -- 更新用户id
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 更新时间
FOREIGN KEY (creator_id) REFERENCES rtss.user (id) ON DELETE CASCADE, -- 用户外键
FOREIGN KEY (updater_id) REFERENCES rtss.user (id) ON DELETE CASCADE -- 用户外键
FOREIGN KEY (creator_id) REFERENCES rtsa.user (id) ON DELETE CASCADE, -- 用户外键
FOREIGN KEY (updater_id) REFERENCES rtsa.user (id) ON DELETE CASCADE -- 用户外键
);
-- 创建feature类型索引
CREATE INDEX ON rtss.feature (feature_type);
CREATE INDEX ON rtsa.feature (feature_type);
-- 创建feature名称索引
CREATE INDEX ON rtss.feature (name);
CREATE INDEX ON rtsa.feature (name);
-- 注释仿真feature表
COMMENT ON TABLE rtss.feature IS 'feature表';
COMMENT ON TABLE rtsa.feature IS 'feature表';
-- 注释仿真feature表字段
COMMENT ON COLUMN rtss.feature.id IS 'id 自增主键';
COMMENT ON COLUMN rtsa.feature.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.feature.feature_type IS 'feature类型';
COMMENT ON COLUMN rtsa.feature.feature_type IS 'feature类型';
COMMENT ON COLUMN rtss.feature.name IS 'feature名称';
COMMENT ON COLUMN rtsa.feature.name IS 'feature名称';
COMMENT ON COLUMN rtss.feature.description IS 'feature描述';
COMMENT ON COLUMN rtsa.feature.description IS 'feature描述';
COMMENT ON COLUMN rtss.feature.config IS 'feature配置';
COMMENT ON COLUMN rtsa.feature.config IS 'feature配置';
COMMENT ON COLUMN rtss.feature.is_published IS '是否上架';
COMMENT ON COLUMN rtsa.feature.is_published IS '是否上架';
COMMENT ON COLUMN rtss.feature.creator_id IS '创建用户id';
COMMENT ON COLUMN rtsa.feature.creator_id IS '创建用户id';
COMMENT ON COLUMN rtss.feature.created_at IS '创建时间';
COMMENT ON COLUMN rtsa.feature.created_at IS '创建时间';
COMMENT ON COLUMN rtss.feature.updated_at IS '更新时间';
COMMENT ON COLUMN rtsa.feature.updated_at IS '更新时间';
-- 创建用户配置表
CREATE TABLE
rtss.user_config (
rtsa.user_config (
id SERIAL PRIMARY KEY, -- id 自增主键
user_id INT NOT NULL, -- 用户id
config_type INT NOT NULL, -- 配置类型
config BYTEA NOT NULL, -- 配置
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 更新时间
FOREIGN KEY (user_id) REFERENCES rtss.user (id) ON DELETE CASCADE -- 用户外键
FOREIGN KEY (user_id) REFERENCES rtsa.user (id) ON DELETE CASCADE -- 用户外键
);
-- 创建用户配置用户索引
CREATE INDEX ON rtss.user_config (user_id);
CREATE INDEX ON rtsa.user_config (user_id);
-- 创建用户配置类型索引
CREATE INDEX ON rtss.user_config (config_type);
CREATE INDEX ON rtsa.user_config (config_type);
-- 注释用户feature配置表
COMMENT ON TABLE rtss.user_config IS '用户feature配置表';
COMMENT ON TABLE rtsa.user_config IS '用户feature配置表';
-- 注释用户feature配置表字段
COMMENT ON COLUMN rtss.user_config.id IS 'id 自增主键';
COMMENT ON COLUMN rtsa.user_config.id IS 'id 自增主键';
COMMENT ON COLUMN rtss.user_config.user_id IS '用户id';
COMMENT ON COLUMN rtsa.user_config.user_id IS '用户id';
COMMENT ON COLUMN rtss.user_config.config_type IS '配置类型';
COMMENT ON COLUMN rtsa.user_config.config_type IS '配置类型';
COMMENT ON COLUMN rtss.user_config.config IS '配置';
COMMENT ON COLUMN rtsa.user_config.config IS '配置';
COMMENT ON COLUMN rtss.user_config.created_at IS '创建时间';
COMMENT ON COLUMN rtsa.user_config.created_at IS '创建时间';
COMMENT ON COLUMN rtss.user_config.updated_at IS '更新时间';
COMMENT ON COLUMN rtsa.user_config.updated_at IS '更新时间';

@ -1 +0,0 @@
Subproject commit 1f53057b3f87790ef27c91399a5bb7e890f05549

View File

@ -18,7 +18,7 @@ clap = { workspace = true, features = ["derive"] }
enum_dispatch = { workspace = true }
anyhow = { workspace = true }
rtss_log = { path = "../crates/rtss_log" }
rtss_dto = { path = "../crates/rtss_dto" }
rtss_db = { path = "../crates/rtss_db" }
rtss_mqtt = { path = "../crates/rtss_mqtt" }
rtsa_log = { path = "../crates/rtsa_log" }
rtsa_dto = { path = "../crates/rtsa_dto" }
rtsa_db = { path = "../crates/rtsa_db" }
rtsa_mqtt = { path = "../crates/rtsa_mqtt" }

View File

@ -1,6 +0,0 @@
[package]
name = "rtss_ci"
version = "0.1.0"
edition = "2021"
[dependencies]

View File

@ -1,14 +0,0 @@
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -1,7 +0,0 @@
[package]
name = "rtss_common"
version = "0.1.0"
edition = "2021"
[dependencies]
bevy_ecs = {workspace = true}

View File

@ -1,76 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use bevy_ecs::{component::Component, entity::Entity, system::Resource};
/// 仿真公共资源
pub struct SimulationResource {
id: String,
uid_entity_mapping: HashMap<String, Entity>,
}
impl SimulationResource {
pub fn new(id: String) -> Self {
SimulationResource {
id,
uid_entity_mapping: HashMap::new(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn get_entity(&self, uid: &str) -> Option<Entity> {
self.uid_entity_mapping.get(uid).cloned()
}
pub fn insert_entity(&mut self, uid: String, entity: Entity) {
self.uid_entity_mapping.insert(uid, entity);
}
}
// 设备编号组件
#[derive(Component, Debug, Clone, PartialEq, Eq)]
pub struct Uid(pub String);
impl Default for Uid {
fn default() -> Self {
Uid("".to_string())
}
}
#[derive(Resource)]
pub struct SharedSimulationResource(pub Arc<Mutex<SimulationResource>>);
impl SharedSimulationResource {
pub fn get_entity(&self, uid: &str) -> Option<Entity> {
self.0.lock().unwrap().uid_entity_mapping.get(uid).cloned()
}
pub fn insert_entity(&self, uid: String, entity: Entity) {
self.0
.lock()
.unwrap()
.uid_entity_mapping
.insert(uid, entity);
}
}
#[cfg(test)]
mod tests {
use bevy_ecs::world;
use super::*;
#[test]
fn it_works() {
let mut simulation_resource = SimulationResource::new("1".to_string());
let mut world = world::World::default();
let uid = Uid("1".to_string());
let entity = world.spawn(uid.clone()).id();
simulation_resource.insert_entity(uid.clone().0, entity);
assert_eq!(simulation_resource.get_entity(&uid.0), Some(entity));
}
}

View File

@ -1,6 +0,0 @@
[package]
name = "rtss_iscs"
version = "0.1.0"
edition = "2021"
[dependencies]

View File

@ -1,14 +0,0 @@
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -1,16 +0,0 @@
[package]
name = "rtss_sim_manage"
version = "0.1.0"
edition = "2021"
[dependencies]
bevy_core = {workspace = true}
bevy_ecs = {workspace = true}
bevy_app = {workspace = true}
bevy_time = {workspace = true}
rayon = {workspace = true}
thiserror = {workspace = true}
rtss_log = { path = "../../crates/rtss_log" }
rtss_common = { path = "../rtss_common" }
rtss_trackside = { path = "../rtss_trackside" }

View File

@ -1,17 +0,0 @@
use bevy_app::App;
use rtss_trackside::TrackSideEquipmentPlugin;
#[derive(Debug)]
pub enum AvailablePlugins {
TrackSideEquipmentPlugin,
}
pub(crate) fn add_needed_plugins(app: &mut App, plugins: Vec<AvailablePlugins>) {
for plugin in plugins {
match plugin {
AvailablePlugins::TrackSideEquipmentPlugin => {
app.add_plugins(TrackSideEquipmentPlugin);
}
}
}
}

View File

@ -1,19 +0,0 @@
mod config_plugins;
mod simulation;
pub use config_plugins::*;
pub use simulation::*;
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@ -1,4 +0,0 @@
fn main() {
println!("Hello, world!");
}

View File

@ -1,468 +0,0 @@
use std::{
cell::RefCell,
collections::HashMap,
ops::Deref,
sync::{mpsc, Arc, Mutex},
time::{Duration, Instant},
};
use bevy_app::{prelude::*, PluginsState};
use bevy_ecs::{
event::{Event, EventWriter},
observer::Trigger,
system::{Query, Res, ResMut, Resource},
world::OnAdd,
};
use bevy_time::{prelude::*, TimePlugin};
use rtss_common::{SharedSimulationResource, SimulationResource, Uid};
use rtss_log::tracing::{debug, error, warn};
use thiserror::Error;
use crate::{add_needed_plugins, AvailablePlugins};
/// 仿真管理器
/// 非线程安全,若需要线程安全请使用类似 `Arc<Mutex<SimulationManager>>` 的方式
pub struct SimulationManager {
txs: RefCell<HashMap<String, Simulation>>,
}
impl Default for SimulationManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Error, Debug)]
pub enum SimulationControlError {
#[error("Unknown error")]
UnknownError,
#[error("Simulation not exist")]
SimulationNotExist,
#[error("Trigger event failed")]
TriggerEventFailed,
#[error("Simulation entity not exist")]
SimulationEntityNotExist,
}
impl SimulationManager {
fn new() -> Self {
let txs = RefCell::new(HashMap::new());
SimulationManager { txs }
}
pub fn count(&self) -> usize {
self.txs.borrow().len()
}
pub fn start_simulation(
&self,
builder: SimulationBuilder,
) -> Result<String, SimulationControlError> {
let id = builder.id.clone();
let sim = Simulation::new(builder);
self.txs.borrow_mut().insert(id.clone(), sim);
Ok(id)
}
pub fn exit_simulation(&self, id: String) -> Result<(), SimulationControlError> {
match self.txs.borrow_mut().remove(&id) {
Some(sim) => sim.exit_simulation(),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn pause_simulation(&self, id: String) -> Result<(), SimulationControlError> {
match self.txs.borrow().get(&id) {
Some(sim) => sim.pause_simulation(),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn resume_simulation(&self, id: String) -> Result<(), SimulationControlError> {
match self.txs.borrow().get(&id) {
Some(sim) => sim.resume_simulation(),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn update_simulation_speed(
&self,
id: String,
speed: f32,
) -> Result<(), SimulationControlError> {
match self.txs.borrow().get(&id) {
Some(sim) => sim.update_simulation_speed(speed),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn trigger_operation<E>(&self, id: String, event: E) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
match self.txs.borrow().get(&id) {
Some(sim) => sim.trigger_operation(event),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
pub fn trigger_entity_operation<E>(
&self,
id: String,
entity_uid: String,
event: E,
) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
match self.txs.borrow().get(&id) {
Some(sim) => sim.trigger_entity_operation(entity_uid, event),
None => {
warn!("Simulation not exist, id={}", id);
Err(SimulationControlError::SimulationNotExist)
}
}
}
}
pub struct SimulationBuilder {
/// 仿真ID
pub(crate) id: String,
/// 仿真主逻辑循环间隔,详细请查看 [`Time<Fixed>`](bevy_time::fixed::Fixed)
pub(crate) loop_duration: Duration,
/// 仿真所需插件
pub(crate) plugins: Vec<AvailablePlugins>,
}
impl Default for SimulationBuilder {
fn default() -> Self {
SimulationBuilder {
id: "default".to_string(),
loop_duration: Duration::from_millis(500),
plugins: Vec::new(),
}
}
}
impl SimulationBuilder {
pub fn id(mut self, id: String) -> Self {
self.id = id;
self
}
pub fn loop_duration(mut self, loop_duration: Duration) -> Self {
self.loop_duration = loop_duration;
self
}
pub fn plugins(mut self, plugins: Vec<AvailablePlugins>) -> Self {
self.plugins = plugins;
self
}
}
#[derive(Resource, Debug)]
pub struct SimulationId(String);
impl SimulationId {
pub fn new(id: String) -> Self {
SimulationId(id)
}
}
impl Deref for SimulationId {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Resource, Debug)]
pub struct SimulationStatus {
// 仿真倍速
pub speed: f32,
// 仿真是否暂停状态
pub paused: bool,
}
impl Default for SimulationStatus {
fn default() -> Self {
SimulationStatus {
speed: 1.0,
paused: true,
}
}
}
/// 仿真控制事件
#[derive(Event, Debug, Clone, Copy)]
pub enum SimulationControlEvent {
Pause,
Unpause,
UpdateSpeed(f32),
Exit,
}
pub struct Simulation {
tx: mpsc::Sender<Box<SimulationHandle>>,
resource: Arc<Mutex<SimulationResource>>,
}
pub type SimulationHandle = dyn FnMut(&mut App) + Send;
impl Simulation {
pub fn new(builder: SimulationBuilder) -> Self {
let simulation_resource = Arc::new(Mutex::new(SimulationResource::new(builder.id.clone())));
let cloned_resource = Arc::clone(&simulation_resource);
let (tx, mut rx) = mpsc::channel();
rayon::spawn(move || {
let mut app = App::new();
let mut virtual_time =
Time::<Virtual>::from_max_delta(builder.loop_duration.mul_f32(2f32));
virtual_time.pause();
// 初始化仿真App
app.add_plugins(TimePlugin)
.insert_resource(virtual_time)
.insert_resource(Time::<Fixed>::from_duration(builder.loop_duration))
.insert_resource(SimulationId::new(builder.id))
.insert_resource(SimulationStatus::default())
.insert_resource(SharedSimulationResource(Arc::clone(&cloned_resource)))
.add_event::<SimulationControlEvent>()
.observe(simulation_status_control)
.observe(entity_observer);
// 添加仿真所需插件
add_needed_plugins(&mut app, builder.plugins);
let wait = Some(builder.loop_duration);
app.set_runner(move |mut app: App| {
let plugins_state = app.plugins_state();
if plugins_state != PluginsState::Cleaned {
app.finish();
app.cleanup();
}
loop {
match runner(&mut app, wait, &mut rx) {
Ok(Some(delay)) => std::thread::sleep(delay),
Ok(None) => continue,
Err(exit) => return exit,
}
}
});
app.run();
});
Simulation {
tx,
resource: simulation_resource,
}
}
fn trigger_event(&self, event: SimulationControlEvent) -> Result<(), SimulationControlError> {
let id = self.resource.lock().unwrap().id().to_string();
let result = self.tx.send(Box::new(move |app: &mut App| {
app.world_mut().trigger(event);
}));
match result {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Failed to send event to simulation, id={}, error={:?}",
id, e
);
Err(SimulationControlError::TriggerEventFailed)
}
}
}
pub fn trigger_operation<E>(&self, event: E) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
let id = self.resource.lock().unwrap().id().to_string();
let result = self.tx.send(Box::new(move |app: &mut App| {
app.world_mut().trigger(event);
}));
match result {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Failed to send event to simulation, id={}, error={:?}",
id, e
);
Err(SimulationControlError::TriggerEventFailed)
}
}
}
pub fn trigger_entity_operation<E>(
&self,
entity_uid: String,
event: E,
) -> Result<(), SimulationControlError>
where
E: Event + Copy,
{
let id = self.resource.lock().unwrap().id().to_string();
match self.resource.lock().unwrap().get_entity(&entity_uid) {
Some(entity) => {
let result = self.tx.send(Box::new(move |app: &mut App| {
app.world_mut().trigger_targets(event, entity);
}));
match result {
Ok(_) => Ok(()),
Err(e) => {
error!(
"Failed to send event to simulation, id={}, error={:?}",
id, e
);
Err(SimulationControlError::TriggerEventFailed)
}
}
}
None => {
error!("Entity not exist, id={}", entity_uid);
Err(SimulationControlError::SimulationEntityNotExist)
}
}
}
pub fn exit_simulation(&self) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::Exit)
}
pub fn pause_simulation(&self) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::Pause)
}
pub fn resume_simulation(&self) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::Unpause)
}
pub fn update_simulation_speed(&self, speed: f32) -> Result<(), SimulationControlError> {
self.trigger_event(SimulationControlEvent::UpdateSpeed(speed))
}
}
pub fn entity_observer(
trigger: Trigger<OnAdd>,
query: Query<&Uid>,
shared: ResMut<SharedSimulationResource>,
) {
let entity = trigger.entity();
match query.get(entity) {
Ok(uid) => {
shared.insert_entity(uid.0.clone(), entity);
debug!("添加uid实体映射, Uid: {:?}, Entity: {:?}", uid, entity);
}
Err(_) => {
warn!("Failed to get Uid from entity: {:?}", entity);
}
}
}
pub fn simulation_status_control(
trigger: Trigger<SimulationControlEvent>,
mut time: ResMut<Time<Virtual>>,
sid: Res<SimulationId>,
mut exit: EventWriter<AppExit>,
) {
match trigger.event() {
SimulationControlEvent::Pause => {
debug!("Pausing simulation");
time.pause();
}
SimulationControlEvent::Unpause => {
debug!("Unpausing simulation");
time.unpause();
}
SimulationControlEvent::UpdateSpeed(speed) => {
debug!("Update simulation speed to {}", speed);
time.set_relative_speed(*speed);
}
SimulationControlEvent::Exit => {
debug!("Exiting simulation, id={:?}", *sid);
exit.send(AppExit::Success);
}
}
}
fn runner(
app: &mut App,
wait: Option<Duration>,
rx: &mut mpsc::Receiver<Box<SimulationHandle>>,
) -> Result<Option<Duration>, AppExit> {
let start_time = Instant::now();
if let Err(e) = rx.try_recv().map(|mut handle| handle(app)) {
match e {
mpsc::TryRecvError::Empty => {}
mpsc::TryRecvError::Disconnected => {
error!("Simulation handle channel disconnected");
}
}
}
app.update();
if let Some(exit) = app.should_exit() {
return Err(exit);
};
let end_time = Instant::now();
if let Some(wait) = wait {
let exe_time = end_time - start_time;
if exe_time < wait {
return Ok(Some(wait - exe_time));
}
}
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simulation_manager() {
let manager = SimulationManager::default();
assert_eq!(manager.count(), 0);
if let Ok(_) = manager.start_simulation(SimulationBuilder::default().id("0".to_string())) {
assert_eq!(manager.count(), 1);
}
if let Ok(_) = manager.start_simulation(SimulationBuilder::default().id("1".to_string())) {
assert_eq!(manager.count(), 2);
}
if let Ok(_) = manager.exit_simulation("0".to_string()) {
assert_eq!(manager.count(), 1);
}
if let Ok(_) = manager.exit_simulation("1".to_string()) {
assert_eq!(manager.count(), 0);
}
}
}

View File

@ -1,13 +0,0 @@
[package]
name = "rtss_trackside"
version = "0.1.0"
edition = "2021"
[dependencies]
bevy_core = {workspace = true}
bevy_ecs = {workspace = true}
bevy_app = {workspace = true}
bevy_time = {workspace = true}
rtss_log = { path = "../../crates/rtss_log" }
rtss_common = { path = "../rtss_common" }

View File

@ -1,20 +0,0 @@
use bevy_ecs::bundle::Bundle;
use rtss_common::Uid;
use super::{PsdState, TurnoutState, TwoNormalPositionsTransform};
// 道岔设备组件包
#[derive(Bundle, Default)]
pub struct TurnoutBundle {
pub uid: Uid,
pub turnout_state: TurnoutState,
pub two_normal_positions_conversion: TwoNormalPositionsTransform,
}
/// 屏蔽门设备组件包
#[derive(Bundle, Default)]
pub struct PsdBundle {
pub uid: Uid,
pub psd_state: PsdState,
pub two_normal_positions_conversion: TwoNormalPositionsTransform,
}

View File

@ -1,42 +0,0 @@
use bevy_ecs::component::Component;
/// 两常态位置转换组件,用于像道岔位置,屏蔽门位置等
#[derive(Component, Debug, Clone, PartialEq, Default)]
pub struct TwoNormalPositionsTransform {
// 当前实际位置,百分比值,0-100
pub position: f32,
// 当前转换速度
pub velocity: f32,
}
/// 道岔设备状态组件
#[derive(Component, Debug, Clone, PartialEq, Eq, Default)]
pub struct TurnoutState {
// 定位表示继电器状态
pub dbj: bool,
// 反位表示继电器状态
pub fbj: bool,
// 是否定位
pub dw: bool,
// 是否反位
pub fw: bool,
// 定操继电器状态
pub dcj: bool,
// 反操继电器状态
pub fcj: bool,
}
pub struct SignalState {}
/// 屏蔽门设备状态组件
#[derive(Component, Debug, Clone, PartialEq, Eq, Default)]
pub struct PsdState {
// 门关继电器状态
pub mgj: bool,
// 关门继电器状态
pub gmj: bool,
// 开门继电器状态
pub kmj: bool,
// 门旁路继电器状态(互锁解除)
pub mplj: bool,
}

View File

@ -1,4 +0,0 @@
mod bundle;
mod equipment;
pub use bundle::*;
pub use equipment::*;

View File

@ -1,9 +0,0 @@
use bevy_ecs::event::Event;
#[derive(Event, Debug, Clone, Copy, Eq, PartialEq)]
pub enum TurnoutControlEvent {
// 道岔定操
DC,
// 道岔反操
FC,
}

View File

@ -1,2 +0,0 @@
mod equipment;
pub use equipment::*;

View File

@ -1,10 +0,0 @@
mod components;
mod events;
mod plugin;
mod resources;
mod systems;
pub use components::*;
pub use events::*;
pub use plugin::*;
pub use resources::*;
pub use systems::*;

View File

@ -1,23 +0,0 @@
use bevy_app::{FixedUpdate, Plugin, Startup};
use bevy_ecs::schedule::IntoSystemConfigs;
use crate::{
handle_turnout_control, loading, turnout_state_update, two_normal_position_transform,
SimulationConfig, TurnoutControlEvent,
};
#[derive(Default)]
pub struct TrackSideEquipmentPlugin;
impl Plugin for TrackSideEquipmentPlugin {
fn build(&self, app: &mut bevy_app::App) {
app.insert_resource(SimulationConfig::default())
.add_event::<TurnoutControlEvent>()
.add_systems(Startup, loading)
.add_systems(
FixedUpdate,
(two_normal_position_transform, turnout_state_update).chain(),
)
.observe(handle_turnout_control);
}
}

View File

@ -1,15 +0,0 @@
use bevy_ecs::system::Resource;
#[derive(Resource, Debug)]
pub struct SimulationConfig {
// 道岔转换总时间,单位ms
pub turnout_transform_time: i32,
}
impl Default for SimulationConfig {
fn default() -> Self {
SimulationConfig {
turnout_transform_time: 3500,
}
}
}

View File

@ -1,32 +0,0 @@
use bevy_ecs::{entity::Entity, system::Query};
use rtss_common::Uid;
use rtss_log::tracing::debug;
use crate::TwoNormalPositionsTransform;
pub const TWO_NORMAL_POSITION_MIN: i32 = 0;
pub const TWO_NORMAL_POSITION_MAX: i32 = 100;
// 两常态位置转换系统
pub fn two_normal_position_transform(
mut query: Query<(Entity, &Uid, &mut TwoNormalPositionsTransform)>,
) {
for (entity, uid, mut transform) in &mut query {
debug!(
"Entity: {:?}, Uid: {:?}, Conversion: {:?}",
entity, uid, transform
);
if transform.velocity == 0f32 {
continue;
}
let p = transform.position + transform.velocity;
if p >= TWO_NORMAL_POSITION_MAX as f32 {
transform.position = TWO_NORMAL_POSITION_MAX as f32;
transform.velocity = TWO_NORMAL_POSITION_MIN as f32;
} else if p <= TWO_NORMAL_POSITION_MIN as f32 {
transform.position = TWO_NORMAL_POSITION_MIN as f32;
transform.velocity = 0 as f32;
} else {
transform.position = p;
}
}
}

View File

@ -1,13 +0,0 @@
use bevy_ecs::{prelude::Commands, system::ResMut};
use rtss_common::{SharedSimulationResource, Uid};
use crate::components;
pub fn loading(mut commands: Commands, res_uid_mapping: ResMut<SharedSimulationResource>) {
let uid = Uid("1".to_string());
let et = commands.spawn(components::TurnoutBundle {
uid: uid.clone(),
..Default::default()
});
res_uid_mapping.insert_entity(uid.0, et.id());
}

View File

@ -1,6 +0,0 @@
mod common;
mod loading;
mod turnout;
pub use common::*;
pub use loading::*;
pub use turnout::*;

View File

@ -1,78 +0,0 @@
use bevy_ecs::{
observer::Trigger,
system::{Query, Res, ResMut},
};
use bevy_time::{Fixed, Time};
use rtss_common::Uid;
use rtss_log::tracing::debug;
use crate::{
events::TurnoutControlEvent, SimulationConfig, TurnoutState, TwoNormalPositionsTransform,
TWO_NORMAL_POSITION_MAX, TWO_NORMAL_POSITION_MIN,
};
// 道岔控制事件处理系统
pub fn handle_turnout_control(
trigger: Trigger<TurnoutControlEvent>,
time: ResMut<Time<Fixed>>,
config: Res<SimulationConfig>,
mut query: Query<(&Uid, &mut TurnoutState, &mut TwoNormalPositionsTransform)>,
) {
let (uid, mut state, mut conversion) = query
.get_mut(trigger.entity())
.expect("通过entity获取道岔异常");
let v = calculate_avg_velocity(
crate::TWO_NORMAL_POSITION_MAX as f32,
config.turnout_transform_time as f32,
time.timestep().as_millis() as f32,
);
match trigger.event() {
TurnoutControlEvent::DC => {
state.dcj = true;
state.fcj = false;
conversion.velocity = -v;
debug!("道岔定操处理:{:?}", uid);
}
TurnoutControlEvent::FC => {
state.dcj = false;
state.fcj = true;
conversion.velocity = v;
debug!("道岔反操处理uid={:?}, conversion={:?}", uid, conversion);
}
}
}
// 道岔状态更新系统
pub fn turnout_state_update(
mut query: Query<(&Uid, &mut TurnoutState, &mut TwoNormalPositionsTransform)>,
) {
for (uid, mut state, conversion) in &mut query {
debug!(
"更新道岔状态Uid={:?}, State={:?}, Conversion={:?}",
uid, state, conversion
);
if conversion.position as i32 == TWO_NORMAL_POSITION_MIN {
state.dw = true;
state.fw = false;
state.dbj = true;
state.fbj = false;
state.dcj = false;
} else if conversion.position as i32 == TWO_NORMAL_POSITION_MAX {
state.fw = true;
state.dw = false;
state.dbj = false;
state.fbj = true;
state.fcj = false;
} else {
state.dw = false;
state.fw = false;
state.dbj = false;
state.fbj = false;
}
}
}
// 计算平均速度
fn calculate_avg_velocity(total_distance: f32, total_time: f32, time_step: f32) -> f32 {
total_distance / (total_time / time_step)
}

View File

@ -15,9 +15,9 @@ pub struct Log {
level: String,
}
impl From<Log> for rtss_log::Logging {
impl From<Log> for rtsa_log::Logging {
fn from(log: Log) -> Self {
rtss_log::Logging {
rtsa_log::Logging {
level: log.level.parse().unwrap(),
..Default::default()
}
@ -61,9 +61,9 @@ impl AppConfig {
// Default to 'dev' env
// Note that this file is _optional_
.add_source(File::with_name(&format!("{dir}/{run_mode}")).required(true))
// Add in settings from the environment (with a prefix of RTSS_SIM)
// Add in settings from the environment (with a prefix of rtsa_SIM)
// Eg.. `APP_DEBUG=1 ./target/app` would set the `debug` key
.add_source(Environment::with_prefix("RTSS_SIM").separator("_"))
.add_source(Environment::with_prefix("rtsa_SIM").separator("_"))
// You may also programmatically change settings
// .set_override("database.url", "postgres://")?
// build the configuration

View File

@ -6,7 +6,7 @@ use crate::app_config;
use super::{CmdExecutor, DbSubCommand};
#[derive(Parser, Debug)]
#[command(name = "rtss-sim", version, author, about, long_about = None)]
#[command(name = "rtsa-sim", version, author, about, long_about = None)]
pub struct Cmd {
#[command(subcommand)]
pub cmd: SubCommand,
@ -31,18 +31,18 @@ impl CmdExecutor for ServerOpts {
async fn execute(&self) -> anyhow::Result<()> {
let app_config =
app_config::AppConfig::new(&self.config_path).expect("Failed to load app config");
let log: rtss_log::Logging = app_config.log.into();
let log: rtsa_log::Logging = app_config.log.into();
log.init();
// 数据库访问器初始化
rtss_db::init_default_db_accessor(&app_config.database.url).await;
rtsa_db::init_default_db_accessor(&app_config.database.url).await;
// mqtt客户端初始化
let cli_id = rtss_db::get_default_db_accessor()
let cli_id = rtsa_db::get_default_db_accessor()
.get_next_mqtt_client_id()
.await?;
let mqtt_cli_options =
rtss_mqtt::MqttClientOptions::new(&format!("rtsa{}", cli_id), &app_config.mqtt.url)
rtsa_mqtt::MqttClientOptions::new(&format!("rtsa{}", cli_id), &app_config.mqtt.url)
.set_credentials(&app_config.mqtt.username, &app_config.mqtt.password);
rtss_mqtt::init_global_mqtt_client(mqtt_cli_options).await?;
rtsa_mqtt::init_global_mqtt_client(mqtt_cli_options).await?;
Ok(())
}
}

View File

@ -24,6 +24,6 @@ impl CmdExecutor for MigrateOpts {
async fn execute(&self) -> anyhow::Result<()> {
let app_config =
app_config::AppConfig::new(&self.config_path).expect("Failed to load app config");
rtss_db::run_migrations(&app_config.database.url).await
rtsa_db::run_migrations(&app_config.database.url).await
}
}

View File

@ -52,8 +52,8 @@ impl SimulationUidEntityMapResource {
#[derive(Resource, Debug)]
pub struct TxResource {
tx: broadcast::Sender<rtss_dto::simulation::Operation>,
rx: Option<broadcast::Receiver<rtss_dto::simulation::Operation>>,
tx: broadcast::Sender<rtsa_dto::simulation::Operation>,
rx: Option<broadcast::Receiver<rtsa_dto::simulation::Operation>>,
}
impl TxResource {
@ -62,11 +62,11 @@ impl TxResource {
TxResource { tx, rx: Some(rx) }
}
pub fn get_tx(&self) -> broadcast::Sender<rtss_dto::simulation::Operation> {
pub fn get_tx(&self) -> broadcast::Sender<rtsa_dto::simulation::Operation> {
self.tx.clone()
}
pub fn subscribe(&mut self) -> broadcast::Receiver<rtss_dto::simulation::Operation> {
pub fn subscribe(&mut self) -> broadcast::Receiver<rtsa_dto::simulation::Operation> {
let rx = self.tx.subscribe();
if self.rx.is_some() {
std::mem::take(&mut self.rx);

View File

@ -1,6 +1,6 @@
use bevy_app::{Plugin, Startup};
use bevy_ecs::system::Res;
use rtss_log::tracing::debug;
use rtsa_log::tracing::debug;
use crate::components::SimulationInfo;

View File

@ -12,7 +12,7 @@ use bevy_ecs::{
system::ResMut,
};
use bevy_time::{Fixed, Time, TimePlugin, Virtual};
use rtss_log::tracing::{debug, error, info};
use rtsa_log::tracing::{debug, error, info};
use tokio::sync::broadcast;
use crate::{
@ -52,7 +52,7 @@ impl SimulationOptions {
#[derive(Debug, Clone)]
pub struct Simulation {
app: Arc<Mutex<SubApp>>,
tx: broadcast::Sender<rtss_dto::simulation::Operation>,
tx: broadcast::Sender<rtsa_dto::simulation::Operation>,
}
impl Simulation {
@ -95,7 +95,7 @@ impl Simulation {
})
}
pub fn send_operation(&self, op: rtss_dto::simulation::Operation) {
pub fn send_operation(&self, op: rtsa_dto::simulation::Operation) {
if let Err(e) = self.tx.send(op) {
error!("send operation error: {}", e);
}
@ -117,31 +117,31 @@ mod tests {
use super::*;
use rtss_dto::simulation::OperationType;
use rtss_log::tracing::Level;
use rtsa_dto::simulation::OperationType;
use rtsa_log::tracing::Level;
#[test]
fn test_new_simulation() {
rtss_log::Logging::default().with_level(Level::DEBUG).init();
rtsa_log::Logging::default().with_level(Level::DEBUG).init();
let simulation1 = Simulation::new(SimulationOptions::new("1", "1")).unwrap();
assert!(simulation1.app.lock().unwrap().update_schedule.is_some());
simulation1.send_operation(rtss_dto::simulation::Operation {
otype: rtss_dto::simulation::OperationType::SetSpeed as i32,
param: Some(rtss_dto::simulation::operation::Param::SetSpeedParam(
rtss_dto::simulation::SetSpeedParam { speed: 2.0 },
simulation1.send_operation(rtsa_dto::simulation::Operation {
otype: rtsa_dto::simulation::OperationType::SetSpeed as i32,
param: Some(rtsa_dto::simulation::operation::Param::SetSpeedParam(
rtsa_dto::simulation::SetSpeedParam { speed: 2.0 },
)),
});
let clone2 = simulation1.clone();
thread::spawn(move || {
sleep(Duration::from_millis(100));
info!("send pause");
clone2.send_operation(rtss_dto::simulation::Operation {
clone2.send_operation(rtsa_dto::simulation::Operation {
otype: OperationType::Pause as i32,
param: None,
});
sleep(Duration::from_millis(200));
info!("send unpause");
clone2.send_operation(rtss_dto::simulation::Operation {
clone2.send_operation(rtsa_dto::simulation::Operation {
otype: OperationType::Unpause as i32,
param: None,
});

View File

@ -1,8 +1,8 @@
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_time::prelude::*;
use rtss_dto::simulation::{operation, OperationType, SetSpeedParam};
use rtss_log::tracing::debug;
use rtsa_dto::simulation::{operation, OperationType, SetSpeedParam};
use rtsa_log::tracing::debug;
use tokio::sync::broadcast;
use crate::components::{SimulationInfo, TxResource};
@ -23,10 +23,10 @@ fn init_rx_resource(mut commands: Commands, mut tx: ResMut<TxResource>) {
}
#[derive(Resource, Debug)]
struct RxResource(pub broadcast::Receiver<rtss_dto::simulation::Operation>);
struct RxResource(pub broadcast::Receiver<rtsa_dto::simulation::Operation>);
impl RxResource {
fn new(rx: broadcast::Receiver<rtss_dto::simulation::Operation>) -> Self {
fn new(rx: broadcast::Receiver<rtsa_dto::simulation::Operation>) -> Self {
RxResource(rx)
}
}

View File

@ -1,5 +1,5 @@
use bevy_ecs::{entity::Entity, system::Query};
use rtss_log::tracing::debug;
use rtsa_log::tracing::debug;
use crate::{components::Uid, modules::trackside::components::TwoNormalPositionsTransform};

View File

@ -1,5 +1,5 @@
use bevy_ecs::{prelude::Commands, system::ResMut};
use rtss_log::tracing::debug;
use rtsa_log::tracing::debug;
use crate::{
components::{SimulationUidEntityMapResource, Uid},

View File

@ -3,7 +3,7 @@ use bevy_ecs::{
system::{Query, Res, ResMut},
};
use bevy_time::{Fixed, Time};
use rtss_log::tracing::debug;
use rtsa_log::tracing::debug;
use crate::{
components::Uid,