log entries to db
This commit is contained in:
parent
d6b78eb62c
commit
e6775f1981
3 changed files with 47 additions and 29 deletions
|
@ -110,6 +110,8 @@ pub(crate) struct JobExitStatus {
|
||||||
pub(crate) duration: time::Duration,
|
pub(crate) duration: time::Duration,
|
||||||
/// The name of the container this job ran in
|
/// The name of the container this job ran in
|
||||||
pub(crate) container_name: String,
|
pub(crate) container_name: String,
|
||||||
|
/// Uuid
|
||||||
|
pub(crate) job_uuid: String
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==========================
|
// ==========================
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::env;
|
||||||
use std::fs::{create_dir_all, File, OpenOptions};
|
use std::fs::{create_dir_all, File, OpenOptions};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::time::Instant;
|
use std::time::{Instant, SystemTime};
|
||||||
|
|
||||||
/// Logging for a [`Job`]
|
/// Logging for a [`Job`]
|
||||||
// TODO: log to postgres instead; maybe i already made a comment todo-ing this idk
|
// TODO: log to postgres instead; maybe i already made a comment todo-ing this idk
|
||||||
|
@ -75,14 +75,15 @@ impl JobLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) mod sql {
|
pub(crate) mod sql {
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use sqlx::{Connection, PgConnection};
|
use sqlx::{Connection, PgConnection};
|
||||||
use std::{env, time::Instant};
|
use std::{env, time::SystemTime};
|
||||||
|
|
||||||
/// Returns a new connection to postgres
|
/// Returns a new connection to postgres
|
||||||
///
|
///
|
||||||
/// *x*: How many times to retry the reconnect
|
/// *x*: How many times to retry the reconnect
|
||||||
pub(crate) async fn start(x: u16) -> Box<PgConnection> {
|
pub(crate) async fn start(x: u16) -> PgConnection {
|
||||||
let mut conn = Box::new(db_connect_with_retries(x).await);
|
let mut conn = db_connect_with_retries(x).await;
|
||||||
create_tables(&mut conn).await;
|
create_tables(&mut conn).await;
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
@ -138,38 +139,34 @@ pub(crate) mod sql {
|
||||||
|
|
||||||
// TODO: when adding logging to postgres directly, update this so it 1) adds the job at the start, 2) logs line-by-line, and 3) adds the end time and exit code at the end of the job
|
// TODO: when adding logging to postgres directly, update this so it 1) adds the job at the start, 2) logs line-by-line, and 3) adds the end time and exit code at the end of the job
|
||||||
pub(crate) async fn log_job(
|
pub(crate) async fn log_job(
|
||||||
mut conn: Box<PgConnection>,
|
conn: &mut PgConnection,
|
||||||
start_time: Instant,
|
start_time: SystemTime,
|
||||||
end_time: Instant,
|
end_time: SystemTime,
|
||||||
exit_code: Option<i32>,
|
exit_code: Option<i32>,
|
||||||
job_id: String,
|
job_id: String,
|
||||||
revision: String,
|
revision: String,
|
||||||
uuid: String,
|
uuid: String,
|
||||||
log_path: String,
|
log_path: String,
|
||||||
) {
|
) {
|
||||||
let start_time =
|
let start_time: DateTime<Utc> = start_time.into();
|
||||||
chrono::DateTime::from_timestamp_millis(start_time.elapsed().as_millis() as i64)
|
let start_time = start_time.format("%+").to_string();
|
||||||
.unwrap()
|
let end_time: DateTime<Utc> = end_time.into();
|
||||||
.format("%+")
|
let end_time = end_time.format("%+").to_string();
|
||||||
.to_string();
|
|
||||||
let end_time =
|
|
||||||
chrono::DateTime::from_timestamp_millis(end_time.elapsed().as_millis() as i64)
|
|
||||||
.unwrap()
|
|
||||||
.format("%+")
|
|
||||||
.to_string();
|
|
||||||
let exit_code = match exit_code {
|
let exit_code = match exit_code {
|
||||||
Some(code) => code.to_string(),
|
Some(code) => code.to_string(),
|
||||||
None => "NULL".to_string(),
|
None => "NULL".to_string(),
|
||||||
};
|
};
|
||||||
sqlx::query(format!("INSERT INTO job_logs (start_time, end_time, exit_code, job_id, revision, uuid, log_path)
|
let query = format!("INSERT INTO job_logs (start_time, end_time, exit_code, job_id, revision, uuid, log_path) VALUES ('{start_time}', '{end_time}', {exit_code}, '{job_id}', '{revision}', '{uuid}', '{log_path}')");
|
||||||
VALUES ('{start_time}', '{end_time}', {exit_code}, '{job_id}', '{revision}', '{uuid}', '{log_path}'));
|
sqlx::query(query.as_str())
|
||||||
").as_str()).execute(conn.as_mut()).await.unwrap();
|
.execute(conn.as_mut())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tries to connect to the database *x* times, panics after reaching that limit
|
/// Tries to connect to the database *x* times, panics after reaching that limit
|
||||||
|
|
||||||
/// Creates table(s) for gregory if they don't exist already
|
/// Creates table(s) for gregory if they don't exist already
|
||||||
pub(crate) async fn create_tables(conn: &mut Box<PgConnection>) {
|
pub(crate) async fn create_tables(conn: &mut PgConnection) {
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"CREATE TABLE IF NOT EXISTS job_logs (
|
"CREATE TABLE IF NOT EXISTS job_logs (
|
||||||
start_time timestamp,
|
start_time timestamp,
|
||||||
|
@ -184,7 +181,7 @@ pub(crate) mod sql {
|
||||||
);
|
);
|
||||||
",
|
",
|
||||||
)
|
)
|
||||||
.execute(conn.as_mut())
|
.execute(conn)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
31
src/main.rs
31
src/main.rs
|
@ -17,6 +17,7 @@ use std::process::Command;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
use std::time::SystemTime;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
mod cli;
|
mod cli;
|
||||||
|
@ -72,14 +73,33 @@ async fn run(config_path: String) {
|
||||||
|
|
||||||
let failed_packages: Vec<String> = Vec::new();
|
let failed_packages: Vec<String> = Vec::new();
|
||||||
|
|
||||||
|
|
||||||
|
let mut pg_connection = sql::start(5).await;
|
||||||
|
|
||||||
// runs the jobs (will need to be updated after sorting is added)
|
// runs the jobs (will need to be updated after sorting is added)
|
||||||
for (job_id, job) in state.jobs {
|
for (job_id, job) in state.jobs {
|
||||||
let job_exit_status = run_job(&state.conf, job_id, job);
|
let start_time = SystemTime::now();
|
||||||
println!("{:#?}", job_exit_status);
|
let job_exit_status = run_job(&state.conf, job_id.clone(), job.clone());
|
||||||
|
|
||||||
|
// TODO: PUSH IT TO THE DB HERE
|
||||||
|
sql::log_job(
|
||||||
|
pg_connection.as_mut(),
|
||||||
|
start_time,
|
||||||
|
start_time + job_exit_status.duration,
|
||||||
|
job_exit_status.exit_code,
|
||||||
|
job_id,
|
||||||
|
job.revision,
|
||||||
|
job_exit_status.job_uuid,
|
||||||
|
job_exit_status.log_path,
|
||||||
|
).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_job(conf: &Config, job_id: String, job: Job) -> JobExitStatus {
|
fn run_job(
|
||||||
|
conf: &Config,
|
||||||
|
job_id: String,
|
||||||
|
job: Job,
|
||||||
|
) -> JobExitStatus {
|
||||||
// limit threads to max_threads in the config
|
// limit threads to max_threads in the config
|
||||||
let mut threads = job.threads;
|
let mut threads = job.threads;
|
||||||
if job.threads > conf.max_threads {
|
if job.threads > conf.max_threads {
|
||||||
|
@ -165,14 +185,13 @@ fn run_job(conf: &Config, job_id: String, job: Job) -> JobExitStatus {
|
||||||
|
|
||||||
let log_path = job_logger.lock().unwrap().path();
|
let log_path = job_logger.lock().unwrap().path();
|
||||||
|
|
||||||
// TODO: PUSH IT TO THE DB HERE
|
|
||||||
|
|
||||||
return JobExitStatus {
|
return JobExitStatus {
|
||||||
container_name: script_path,
|
container_name: script_path,
|
||||||
duration: cmd_output.clone().duration(),
|
duration: cmd_output.clone().duration(),
|
||||||
job,
|
job,
|
||||||
exit_code: cmd_output.status_code(),
|
exit_code: cmd_output.status_code(),
|
||||||
log_path,
|
log_path,
|
||||||
|
job_uuid: run_id.to_string(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,7 +308,7 @@ struct State {
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
/// sqlx::query("DELETE FROM table").execute(&mut state.conn).await?;
|
/// sqlx::query("DELETE FROM table").execute(&mut state.conn).await?;
|
||||||
/// ```
|
/// ```
|
||||||
sql: Box<PgConnection>,
|
sql: PgConnection,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue