From a31db191e871f867c2f3169924b35380c438398d Mon Sep 17 00:00:00 2001 From: Filip Znachor Date: Sun, 7 May 2023 13:47:51 +0200 Subject: [PATCH] Added service spawning and stopping, yaml config --- Cargo.lock | 38 +++++++++++----- Cargo.toml | 4 +- conf.rs | 13 +++--- config.toml | 14 ------ config.yml | 15 +++++++ data.rs | 13 +++--- main.rs | 75 ++++--------------------------- services.rs | 127 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 192 insertions(+), 107 deletions(-) mode change 100644 => 100755 Cargo.lock mode change 100644 => 100755 Cargo.toml mode change 100644 => 100755 conf.rs delete mode 100644 config.toml create mode 100755 config.yml mode change 100644 => 100755 data.rs mode change 100644 => 100755 main.rs create mode 100755 services.rs diff --git a/Cargo.lock b/Cargo.lock old mode 100644 new mode 100755 index b422b20..bdee6b2 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,14 +302,14 @@ dependencies = [ [[package]] name = "odproxy" -version = "0.1.1" +version = "0.1.2" dependencies = [ "hyper", "hyperlocal", "lazy_static", "serde", + "serde_yaml", "tokio", - "toml", "tower", ] @@ -437,6 +437,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ryu" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" + [[package]] name = "scopeguard" version = "1.1.0" @@ -463,6 +469,19 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "serde_yaml" +version = "0.9.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9d684e3ec7de3bf5466b32bd75303ac16f0736426e5a4e0d6e489559ce1249c" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -563,15 +582,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "toml" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" -dependencies = [ - "serde", -] - [[package]] name = "tower" version = "0.4.13" @@ -638,6 +648,12 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +[[package]] +name = "unsafe-libyaml" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1865806a559042e51ab5414598446a5871b561d21b6764f2eabb0dd481d880a6" + [[package]] name = "want" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml old mode 100644 new mode 100755 index 134200e..7848e97 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "odproxy" -version = "0.1.1" +version = "0.1.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -11,8 +11,8 @@ hyper = { version = "0.14", features = ["full"] } tokio = { version = "1", features = ["full"] } tower = { version = "0.4", features = ["full"] } serde = { version = "1.0", features = ["derive"] } +serde_yaml = "0.9" lazy_static = "1.4.0" -toml = "0.5.8" [[bin]] name = "odproxy" diff --git a/conf.rs b/conf.rs old mode 100644 new mode 100755 index 1aac9ed..adbceb7 --- a/conf.rs +++ b/conf.rs @@ -1,9 +1,9 @@ use std::net::SocketAddr; use std::{fs::File, process::exit}; use std::io::prelude::*; -use toml::{de::from_str}; use serde::Deserialize; use lazy_static::lazy_static; +use serde_yaml::from_str; lazy_static! { pub static ref CONFIG: RootConf = load_config(); @@ -20,7 +20,8 @@ pub struct ProxyConf { pub hosts: Vec, pub target: String, pub socket: Option, - pub spawn: Option + pub spawn: Option, + pub timeout: Option } #[derive(Debug, Deserialize)] @@ -31,16 +32,16 @@ pub struct SpawnConf { } fn load_config() -> RootConf { - let file = File::open("config.toml"); + let file = File::open("config.yml"); if file.is_err() { - println!("[!] Unable to read config file"); exit(-1); + println!("[!] Config file was not found!"); exit(-1); } let mut contents = String::new(); if file.unwrap().read_to_string(&mut contents).is_err() { - println!("[!] Unable to read config file"); exit(-1); + println!("[!] Unable to read config file!"); exit(-1); } match from_str(&contents) { Ok(conf) => conf, - Err(_) => {println!("[!] Unable to parse config"); exit(0);} + Err(_) => {println!("[!] Unable to parse config!"); exit(0);} } } \ No newline at end of file diff --git a/config.toml b/config.toml deleted file mode 100644 index 18b9945..0000000 --- a/config.toml +++ /dev/null @@ -1,14 +0,0 @@ -listen = "[::]:3000" - -[[proxy]] -hosts = [ "website.local.gd", "website-alt.local.gd" ] -socket = true -target = "./www.sock" -[proxy.spawn] -command = "/usr/bin/node" -args = [ "./webserver/index.mjs" ] -envs = [ ["PORT", "www.sock"] ] - -[[proxy]] -hosts = [ "git.local.gd" ] -target = "http://192.168.0.3:80" \ No newline at end of file diff --git a/config.yml b/config.yml new file mode 100755 index 0000000..f20a503 --- /dev/null +++ b/config.yml @@ -0,0 +1,15 @@ +listen: "[::]:3000" + +proxy: + +- hosts: [ "website.local.gd", "website-alt.local.gd" ] + socket: true + target: "./www.sock" + spawn: + command: "/usr/bin/node" + args: [ "./webserver/index.mjs" ] + envs: [ ["PORT", "www.sock"] ] + timeout: 10 + +- hosts: [ "git.local.gd" ] + target: "http://192.168.0.3:80" \ No newline at end of file diff --git a/data.rs b/data.rs old mode 100644 new mode 100755 index 6a743bb..04d2d9c --- a/data.rs +++ b/data.rs @@ -13,19 +13,18 @@ lazy_static! { pub struct ServiceData { pub child: Option, - pub running: bool + pub running: bool, + pub last_active: u64 } impl ServiceData { - pub fn new(child: Option) -> ServiceData { + pub fn new() -> ServiceData { ServiceData { - child, - running: false + child: None, + running: false, + last_active: 0 } } - pub fn set_running(&mut self, running: bool) { - self.running = running; - } } pub fn get_proxy(host_index: Option<&usize>) -> Option<&ProxyConf> { diff --git a/main.rs b/main.rs old mode 100644 new mode 100755 index de4756a..d471611 --- a/main.rs +++ b/main.rs @@ -1,16 +1,16 @@ mod conf; mod data; +mod services; -use std::{str::FromStr, process::Command, path::Path, time::Duration}; -use conf::{ProxyConf, SpawnConf}; -use data::{HOST_MAP, SERVICES, ServiceData}; -use hyperlocal::{UnixClientExt}; -use tokio::{fs, time::sleep}; +use std::str::FromStr; +use data::HOST_MAP; +use hyperlocal::UnixClientExt; +use services::check_service; use tower::make::Shared; use hyper::{service::service_fn, Body, Client, Request, Response, Server}; -use crate::conf::CONFIG; +use crate::{conf::CONFIG, services::prepare_services}; async fn run(req: Request) -> Result, hyper::Error> { @@ -20,7 +20,7 @@ async fn run(req: Request) -> Result, hyper::Error> { match proxy { Some(p) => { - check_service(host_index.unwrap().clone(),p).await; + check_service(host_index.unwrap().clone(), p).await; // Create new Request let mut request_builder = Request::builder().method(req.method()); @@ -59,69 +59,10 @@ async fn run(req: Request) -> Result, hyper::Error> { } -fn set_service_running(index: usize) { - SERVICES.lock().unwrap().get_mut(index).unwrap().set_running(true); -} - -fn is_service_running(index: usize) -> bool { - SERVICES.lock().unwrap().get(index).unwrap().running -} - -async fn check_service(index: usize, proxy: &ProxyConf) { - - match &proxy.spawn { - Some(spawn) => { - spawn_service(index, spawn); - if !is_service_running(index) { - wait_for_service(proxy).await; - set_service_running(index); - } - }, - None => {} - } - -} - -fn spawn_service(index: usize, spawn: &SpawnConf) -> bool { - match SERVICES.lock() { - Ok(mut array) => { - if array.get(index).is_none() { - let command = spawn.command.clone(); - let args = spawn.args.clone().unwrap_or(vec![]); - let envs = spawn.envs.clone().unwrap_or(vec![]); - let spawned_child = Command::new(command).args(args).envs(envs).spawn(); - match spawned_child { - Ok(child) => { - array.insert(index, ServiceData::new(Some(child))); - return true; - }, - Err(_) => println!("Error while spawning process!") - } - } - }, - Err(_) => {} - } - return false; -} - -async fn wait_for_service(proxy: &ProxyConf) { - let path = Path::new(&proxy.target); - while !path.exists() { - sleep(Duration::from_millis(100)).await; - } -} - #[tokio::main] async fn main() { - for proxy in CONFIG.proxy.iter() { - if proxy.socket.unwrap_or(false) { - let path = Path::new(&proxy.target); - if path.exists() { - fs::remove_file(path).await.unwrap(); - } - } - } + prepare_services().await; let make_service = Shared::new(service_fn(run)); diff --git a/services.rs b/services.rs new file mode 100755 index 0000000..4ede123 --- /dev/null +++ b/services.rs @@ -0,0 +1,127 @@ +use std::{process::{Command, Stdio, Child}, time::{Duration, SystemTime, UNIX_EPOCH}, path::Path, io::Error, thread}; +use tokio::{time::sleep, fs}; + +use crate::{data::{SERVICES, ServiceData}, conf::{ProxyConf, CONFIG}}; + +fn modify_service_data(index: usize, modify_fn: F) + where F: FnOnce(&mut ServiceData) +{ + let mut vec = SERVICES.lock().unwrap(); + if let Some(service_data) = vec.get_mut(index) { + modify_fn(service_data); + } +} + +fn set_service_running(index: usize) { + modify_service_data(index, |s| { + s.running = true; + }); +} + +fn set_service_last_active(index: usize) { + modify_service_data(index, |s| { + s.last_active = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + }); +} + +fn is_service_running(index: usize) -> bool { + if let Some(service_data) = SERVICES.lock().unwrap().get(index) { + service_data.running + } else { + false + } +} + +pub async fn check_service(index: usize, proxy: &ProxyConf) { + + if proxy.spawn.is_some() { + if proxy.socket.unwrap_or(false) && SERVICES.lock().unwrap().get(index).unwrap().child.is_none() { + let path = Path::new(&proxy.target); + if path.exists() { + fs::remove_file(path).await.unwrap(); + } + } + start_service(index, proxy); + if !is_service_running(index) { + wait_for_service(proxy).await; + set_service_running(index); + } + set_service_last_active(index); + } + +} + +fn start_service(index: usize, proxy: &ProxyConf) -> bool { + let mut status = false; + let spawn = proxy.spawn.as_ref().unwrap(); + modify_service_data(index, |s| { + if s.child.is_some() { + return; + } + let command = spawn.command.clone(); + let args = spawn.args.clone().unwrap_or(vec![]); + let envs = spawn.envs.clone().unwrap_or(vec![]); + let spawned_child = create_child(command, args, envs); + match spawned_child { + Ok(child) => { + s.child = Some(child); + status = true; + }, + Err(_) => println!("Error while spawning process!") + } + }); + return status; +} + +fn stop_service(index: usize) { + modify_service_data(index, |s| { + match s.child.as_mut() { + Some(c) => { + c.kill().unwrap(); + }, + None => {} + } + s.running = false; + s.child = None; + }); +} + +async fn wait_for_service(proxy: &ProxyConf) { + let path = Path::new(&proxy.target); + while !path.exists() { + sleep(Duration::from_millis(100)).await; + } +} + +pub async fn prepare_services() { + for i in 0..CONFIG.proxy.len() { + let mut vec = SERVICES.lock().unwrap(); + vec.insert(i, ServiceData::new()); + } + + let interval_duration = Duration::from_secs(10); + thread::spawn(move || { + loop { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + for (i, proxy) in CONFIG.proxy.iter().enumerate() { + match proxy.timeout { + Some(t) => { + { + let vec = SERVICES.lock().unwrap(); + let s = vec.get(i).unwrap(); + if !s.running || s.last_active+t > now {continue;} + } + stop_service(i); + }, + None => {} + } + } + thread::sleep(interval_duration); + } + }); +} + +fn create_child(command: String, args: Vec, envs: Vec<(String, String)>) -> Result { + let stdio = Stdio::piped(); + return Command::new(command).args(args).envs(envs).stdout(stdio).spawn(); +} \ No newline at end of file