diff --git a/tig-benchmarker/Cargo.toml b/tig-benchmarker/Cargo.toml index 23bad0a..3da6cee 100644 --- a/tig-benchmarker/Cargo.toml +++ b/tig-benchmarker/Cargo.toml @@ -9,14 +9,12 @@ edition.workspace = true [dependencies] anyhow = "1.0.81" -clap = { version = "4.5.4", optional = true } +clap = { version = "4.5.4" } cudarc = { version = "0.12.0", features = [ "cuda-version-from-build-system", ], optional = true } futures = { version = "0.3.30" } -gloo-timers = { version = "0.3.0", optional = true, features = ["futures"] } -hostname = { version = "0.4", optional = true } -js-sys = { version = "0.3.68", optional = true } +hostname = { version = "0.4" } once_cell = "1.19.0" rand = { version = "0.8.5", default-features = false, features = ["std_rng"] } rand_distr = { version = "0.4.3", default-features = false, features = [ @@ -24,43 +22,19 @@ rand_distr = { version = "0.4.3", default-features = false, features = [ ] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.113" } -serde-wasm-bindgen = { version = "0.6.5", optional = true } tig-algorithms = { path = "../tig-algorithms" } -tig-api = { path = "../tig-api" } +tig-api = { path = "../tig-api", features = ["request"] } tig-challenges = { path = "../tig-challenges" } tig-structs = { path = "../tig-structs" } tig-utils = { path = "../tig-utils" } tig-worker = { path = "../tig-worker" } -tokio = { version = "1.37.0", features = ["full"], optional = true } -wasm-bindgen = { version = "0.2.91", features = [ - "serde-serialize", -], optional = true } -wasm-bindgen-futures = { version = "0.4.41", optional = true } -warp = { version = "0.3.7", optional = true } -web-sys = { version = "0.3.68", features = ['console'], optional = true } - -[lib] -crate-type = ["cdylib", "rlib"] +tokio = { version = "1.37.0", features = ["full"] } +warp = { version = "0.3.7" } [features] -default = ["browser"] +default = ["standalone"] cuda = ["cudarc", "tig-algorithms/cuda"] -standalone = [ - "dep:clap", - "dep:tokio", - "tig-api/request", - "dep:warp", - "dep:hostname", -] -browser = [ - "dep:gloo-timers", - "dep:wasm-bindgen", - "dep:js-sys", - "dep:serde-wasm-bindgen", - "dep:wasm-bindgen-futures", - "dep:web-sys", - "tig-api/request-js", -] +standalone = [] # c001_a001 = [] diff --git a/tig-benchmarker/README.md b/tig-benchmarker/README.md index feb02c0..abe88bb 100644 --- a/tig-benchmarker/README.md +++ b/tig-benchmarker/README.md @@ -2,23 +2,7 @@ A Rust crate that implements a Benchmarker for TIG. -## Browser Benchmarker - -`tig-benchmarker` can be compiled to WASM with bindings for browsers. - -The browser version is deployed to https://play.tig.foundation/benchmarker - -To build & run it locally, run the following commands before visiting localhost in your browser: - -``` -# uncomment below to install wasm-pack -# curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh -cd tig-benchmarker -wasm-pack build --release --target web -python3 -m http.server 80 -``` - -## Standalone Benchmarker +## Compiling Your Benchmarker `tig-benchmarker` can be compiled into an executable for running standalone, or in slave mode (see notes) @@ -28,10 +12,10 @@ There are two ways to start the master benchmarker: ``` ALGOS_TO_COMPILE="" # See notes # USE_CUDA="cuda" # See notes - cargo build -p tig-benchmarker --release --no-default-features --features "standalone ${ALGOS_TO_COMPILE} ${USE_CUDA}" + cargo build -p tig-benchmarker --release --no-default-features --features "${ALGOS_TO_COMPILE} ${USE_CUDA}" # edit below line for your own algorithm selection - echo '{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"optimal_ann"}' > algo_selection.json - ./target/release/tig-benchmarker
algo_selection.json + SELECTED_ALGORITHMS='{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"basic"}' + ./target/release/tig-benchmarker
$SELECTED_ALGORITHMS ``` 2. Compile executable in a docker, and run the docker: @@ -40,8 +24,8 @@ There are two ways to start the master benchmarker: # USE_CUDA="cuda" # See notes docker build -f tig-benchmarker/Dockerfile --build-arg features="${ALGOS_TO_COMPILE} ${USE_CUDA}" -t tig-benchmarker . # edit below line for your own algorithm selection - echo '{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"optimal_ann"}' > algo_selection.json - docker run -it -v $(pwd):/app tig-benchmarker
algo_selection.json + SELECTED_ALGORITHMS='{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"optimal_ann"}' + docker run -it -v $(pwd):/app tig-benchmarker
$SELECTED_ALGORITHMS ``` **Notes:** @@ -56,11 +40,12 @@ There are two ways to start the master benchmarker: ALGOS_TO_COMPILE="satisfiability_schnoing vehicle_routing_clarke_wright knapsack_dynamic vector_search_optimal_ann" ``` -* Every 10 seconds, the benchmarker reads your json file path and uses the contents to update its algorithm selection. * You can see available algorithms in the dropdowns of the [Benchmarker UI](https://play.tig.foundation/benchmarker) * Alternatively, you can use [`script\list_algorithms.sh`](../scripts/list_algorithms.sh) * `tig-benchmarker` starts a master node by default. The port can be set with `--port ` (default 5115) * `tig-benchmarker` that are started with the option `--master ` are ran as slaves and will poll the master for jobs + * slaves will ignore any job that doesn't match their algorithm selection + * one possible setup is to run the master with `--workers 0`, and then run a separate slave for each challenge with different number of workers * `tig-benchmarker` can be executed with `--help` to see all options including setting the number of workers, and setting the duration of a benchmark * Uncomment `# USE_CUDA="cuda"` to compile `tig-benchmarker` to use CUDA optimisations where they are available. * You must have a CUDA compatible GPU with CUDA toolkit installed diff --git a/tig-benchmarker/index.html b/tig-benchmarker/index.html deleted file mode 100644 index 0e94938..0000000 --- a/tig-benchmarker/index.html +++ /dev/null @@ -1,26 +0,0 @@ - - - - - TIG Benchmarker Test - - -

TIG Browser Benchmarker

-

Instructions:

-
    -
  1. Open the browser console
  2. -
  3. Setup: `await benchmarker.setup(<api_url>, <api_key>, <player_id>)`
  4. -
  5. Select Algorithm: `await benchmarker.select_algorithm(<challenge_name>, <algorithm_name>)`
  6. -
  7. Start: `await benchmarker.start(<num_workers>, <ms_per_benchmark>)`
  8. -
  9. View State: `await benchmarker.state()`
  10. -
  11. Stop: `await benchmarker.stop()`
  12. -
-

Notes:

-
    -
  • challenge_name should be one of satisfiability, vehicle_routing, knapsack
  • -
  • algorithm_name can be found in the dropdowns of Benchmarker UI
  • -
  • recommended ms_per_benchmark is 15000 (15s)
  • -
- - - \ No newline at end of file diff --git a/tig-benchmarker/index.js b/tig-benchmarker/index.js deleted file mode 100644 index 67d0e04..0000000 --- a/tig-benchmarker/index.js +++ /dev/null @@ -1,12 +0,0 @@ -import init, { setup, state, start, stop, select_algorithm } from './pkg/tig_benchmarker.js'; - -async function loadWasm() { - console.log("Loading Benchmarker WASM"); - await init("./pkg/tig_benchmarker_bg.wasm"); - window.benchmarker = { - setup, state, start, stop, select_algorithm - }; - console.log("Benchmarker WASM initialized and functions are available globally"); -} - -loadWasm(); \ No newline at end of file diff --git a/tig-benchmarker/src/benchmarker/cuda_run_benchmark.rs b/tig-benchmarker/src/benchmarker/cuda_run_benchmark.rs index 1e36ed9..7db4687 100644 --- a/tig-benchmarker/src/benchmarker/cuda_run_benchmark.rs +++ b/tig-benchmarker/src/benchmarker/cuda_run_benchmark.rs @@ -1,14 +1,14 @@ use super::{Job, NonceIterator}; -use crate::future_utils; +use crate::utils::time; use cudarc::driver::*; use cudarc::nvrtc::{compile_ptx, Ptx}; -use future_utils::{spawn, time, yield_now, Mutex}; use once_cell::sync::OnceCell; use std::collections::HashMap; use std::sync::Arc; use tig_algorithms::{c001, c002, c003, c004, CudaKernel}; use tig_challenges::ChallengeTrait; -use tig_worker::{compute_solution, verify_solution, SolutionData}; +use tig_worker::{compute_solution, verify_solution}; +use tokio::{spawn, sync::Mutex, task::yield_now}; static PTX_CACHE: OnceCell>> = OnceCell::new(); @@ -18,7 +18,6 @@ pub async fn get_or_compile_cuda( dev: &Arc, ) -> HashMap<&'static str, CudaFunction> { if kernel.is_none() { - println!("No CUDA optimisations available for '{}'", key); return HashMap::new(); } let kernel = kernel.as_ref().unwrap(); @@ -55,31 +54,23 @@ pub async fn get_or_compile_cuda( .collect() } -pub async fn execute( - nonce_iters: Vec>>, - job: &Job, - wasm: &Vec, - solutions_data: Arc>>, - solutions_count: Arc>, -) { - for nonce_iter in nonce_iters { +pub async fn execute(nonce_iterators: Vec>>, job: &Job, wasm: &Vec) { + for nonce_iterator in nonce_iterators { let job = job.clone(); let wasm = wasm.clone(); - let solutions_data = solutions_data.clone(); - let solutions_count = solutions_count.clone(); spawn(async move { let mut last_yield = time(); let dev = CudaDevice::new(0).expect("Failed to create CudaDevice"); let mut challenge_cuda_funcs: Option> = None; let mut algorithm_cuda_funcs: Option> = None; loop { - match { - let mut nonce_iter = (*nonce_iter).lock().await; - (*nonce_iter).next() - } { + let now = time(); + if now >= job.timestamps.end { + break; + } + match { nonce_iterator.lock().await.next() } { None => break, Some(nonce) => { - let now = time(); if now - last_yield > 25 { yield_now().await; last_yield = now; @@ -12321,15 +12312,11 @@ pub async fn execute( if verify_solution(&job.settings, nonce, &solution_data.solution) .is_ok() { - { - let mut solutions_count = (*solutions_count).lock().await; - *solutions_count += 1; - } if solution_data.calc_solution_signature() <= job.solution_signature_threshold { - let mut solutions_data = (*solutions_data).lock().await; - (*solutions_data).push(solution_data); + let mut solutions_data = job.solutions_data.lock().await; + (*solutions_data).insert(nonce, solution_data); } } } diff --git a/tig-benchmarker/src/benchmarker/difficulty_sampler.rs b/tig-benchmarker/src/benchmarker/difficulty_sampler.rs index ec94067..87dd58a 100644 --- a/tig-benchmarker/src/benchmarker/difficulty_sampler.rs +++ b/tig-benchmarker/src/benchmarker/difficulty_sampler.rs @@ -96,6 +96,15 @@ impl DifficultySampler { } pub fn update_with_solutions(&mut self, difficulty: &Vec, num_solutions: u32) { + if difficulty[0] < self.min_difficulty[0] + || difficulty[1] < self.min_difficulty[1] + || difficulty[0] + >= self.min_difficulty[0] + self.dimensions[0] as i32 + self.padding[0] as i32 + || difficulty[1] + >= self.min_difficulty[1] + self.dimensions[1] as i32 + self.padding[1] as i32 + { + return; + } let (x, y) = ( (difficulty[0] - self.min_difficulty[0]) as usize, (difficulty[1] - self.min_difficulty[1]) as usize, @@ -110,13 +119,15 @@ impl DifficultySampler { } let decay = dist * (1.0 - DECAY) + DECAY; let delta = (1.0 - decay) * num_solutions as f32 * SOLUTIONS_MULTIPLIER; - self.weights[x + x_offset][y + y_offset].solutions *= decay; - self.weights[x + x_offset][y + y_offset].solutions += delta; - if x_offset != 0 && x >= x_offset { + if x + x_offset < self.weights.len() && y + y_offset < self.weights[x].len() { + self.weights[x + x_offset][y + y_offset].solutions *= decay; + self.weights[x + x_offset][y + y_offset].solutions += delta; + } + if x_offset != 0 && x >= x_offset && y + y_offset < self.weights[x].len() { self.weights[x - x_offset][y + y_offset].solutions *= decay; self.weights[x - x_offset][y + y_offset].solutions += delta; } - if y_offset != 0 && y >= y_offset { + if x + x_offset < self.weights.len() && y_offset != 0 && y >= y_offset { self.weights[x + x_offset][y - y_offset].solutions *= decay; self.weights[x + x_offset][y - y_offset].solutions += delta; } diff --git a/tig-benchmarker/src/benchmarker/download_wasm.rs b/tig-benchmarker/src/benchmarker/download_wasm.rs index 9d77673..7b2f987 100644 --- a/tig-benchmarker/src/benchmarker/download_wasm.rs +++ b/tig-benchmarker/src/benchmarker/download_wasm.rs @@ -1,8 +1,8 @@ use super::{Job, Result}; -use crate::future_utils::Mutex; use once_cell::sync::OnceCell; use std::collections::HashMap; use tig_utils::get; +use tokio::sync::Mutex; static CACHE: OnceCell>>> = OnceCell::new(); diff --git a/tig-benchmarker/src/benchmarker/find_benchmark_to_submit.rs b/tig-benchmarker/src/benchmarker/find_benchmark_to_submit.rs new file mode 100644 index 0000000..08f3077 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/find_benchmark_to_submit.rs @@ -0,0 +1,36 @@ +use super::{state, Job, QueryData, Result, State}; +use crate::utils::time; + +pub async fn execute() -> Result> { + let mut state = state().lock().await; + let State { + query_data, + pending_benchmark_jobs, + .. + } = &mut *state; + let QueryData { + proofs, + benchmarks, + frauds, + .. + } = &query_data; + let now = time(); + let mut pending_benchmark_ids = pending_benchmark_jobs + .iter() + .filter(|(_, job)| now >= job.timestamps.submit) + .map(|(id, _)| id.clone()) + .collect::>(); + pending_benchmark_ids.sort_by_key(|id| pending_benchmark_jobs[id].timestamps.submit); + for benchmark_id in pending_benchmark_ids { + let job = pending_benchmark_jobs.remove(&benchmark_id).unwrap(); + if benchmarks.contains_key(&benchmark_id) + || proofs.contains_key(&benchmark_id) + || frauds.contains_key(&benchmark_id) + || job.solutions_data.lock().await.len() == 0 + { + continue; + } + return Ok(Some(job)); + } + Ok(None) +} diff --git a/tig-benchmarker/src/benchmarker/find_proof_to_recompute.rs b/tig-benchmarker/src/benchmarker/find_proof_to_recompute.rs new file mode 100644 index 0000000..c9a232c --- /dev/null +++ b/tig-benchmarker/src/benchmarker/find_proof_to_recompute.rs @@ -0,0 +1,60 @@ +use crate::utils::time; + +use super::{state, Job, QueryData, Result, State, Timestamps}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +pub async fn execute(benchmark_duration_ms: u64) -> Result> { + let State { + query_data, + submitted_proof_ids, + pending_proof_jobs, + .. + } = &*state().lock().await; + let QueryData { + latest_block, + benchmarks, + proofs, + frauds, + download_urls, + .. + } = query_data; + for (benchmark_id, benchmark) in benchmarks.iter() { + if !pending_proof_jobs.contains_key(benchmark_id) + && !submitted_proof_ids.contains(benchmark_id) + && !frauds.contains_key(benchmark_id) + && !proofs.contains_key(benchmark_id) + && benchmark.state.is_some() + { + let sampled_nonces = benchmark.state().sampled_nonces.clone().ok_or_else(|| { + format!( + "Expecting benchmark '{}' to have sampled_nonces", + benchmark_id + ) + })?; + let start = time(); + let end = start + benchmark_duration_ms; + let submit = end; + return Ok(Some(Job { + benchmark_id: benchmark.id.clone(), + download_url: download_urls + .get(&benchmark.settings.algorithm_id) + .cloned() + .ok_or_else(|| { + format!( + "Expecting download_url for algorithm '{}'", + benchmark.settings.algorithm_id + ) + })?, + settings: benchmark.settings.clone(), + solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud + sampled_nonces: Some(sampled_nonces), + wasm_vm_config: latest_block.config().wasm_vm.clone(), + weight: 0.0, + timestamps: Timestamps { start, end, submit }, + solutions_data: Arc::new(Mutex::new(HashMap::new())), + })); + } + } + Ok(None) +} diff --git a/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs b/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs index fce0c07..3d6e806 100644 --- a/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs +++ b/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs @@ -1,33 +1,38 @@ -use super::{state, QueryData, Result}; -use std::collections::HashSet; -use tig_worker::SolutionData; +use super::{state, Job, QueryData, Result, State}; +use crate::utils::time; -pub async fn execute() -> Result)>> { +pub async fn execute() -> Result> { + let mut state = state().lock().await; + let State { + query_data, + pending_proof_jobs, + .. + } = &mut *state; let QueryData { proofs, benchmarks, frauds, .. - } = &mut state().lock().await.query_data; - for (benchmark_id, proof) in proofs.iter_mut() { - if proof.solutions_data.is_none() || frauds.contains_key(benchmark_id) { - continue; - } - if let Some(state) = &benchmarks[benchmark_id].state { - let sampled_nonces: HashSet = - state.sampled_nonces.clone().unwrap().into_iter().collect(); - let mut solutions_data = proof.solutions_data.take().unwrap(); - solutions_data.retain(|x| sampled_nonces.contains(&x.nonce)); - let extracted_nonces: HashSet = solutions_data.iter().map(|x| x.nonce).collect(); - if extracted_nonces != sampled_nonces { - return Err(format!( - "No solutions for sampled nonces: '{:?}'", - sampled_nonces - .difference(&extracted_nonces) - .collect::>() - )); + } = &query_data; + let now = time(); + let mut pending_proof_ids = pending_proof_jobs + .iter() + .filter(|(_, job)| now >= job.timestamps.submit) + .map(|(id, _)| id.clone()) + .collect::>(); + pending_proof_ids.sort_by_key(|id| pending_proof_jobs[id].timestamps.submit); + for benchmark_id in pending_proof_ids { + if let Some(sampled_nonces) = benchmarks + .get(&benchmark_id) + .and_then(|b| b.state.as_ref()) + .and_then(|s| s.sampled_nonces.as_ref()) + { + let mut job = pending_proof_jobs.remove(&benchmark_id).unwrap(); + if proofs.contains_key(&benchmark_id) || frauds.contains_key(&benchmark_id) { + continue; } - return Ok(Some((benchmark_id.clone(), solutions_data))); + job.sampled_nonces = Some(sampled_nonces.clone()); + return Ok(Some(job)); } } Ok(None) diff --git a/tig-benchmarker/src/benchmarker/mod.rs b/tig-benchmarker/src/benchmarker/mod.rs index a3502b0..19b9a61 100644 --- a/tig-benchmarker/src/benchmarker/mod.rs +++ b/tig-benchmarker/src/benchmarker/mod.rs @@ -1,8 +1,11 @@ mod difficulty_sampler; pub mod download_wasm; +mod find_benchmark_to_submit; +mod find_proof_to_recompute; mod find_proof_to_submit; mod query_data; -mod setup_job; +pub mod select_job; +mod setup_jobs; mod submit_benchmark; mod submit_proof; @@ -12,18 +15,21 @@ pub mod run_benchmark; #[path = "cuda_run_benchmark.rs"] pub mod run_benchmark; -use crate::future_utils::{sleep, spawn, time, Mutex}; +use crate::utils::{sleep, time, Result}; use difficulty_sampler::DifficultySampler; use once_cell::sync::OnceCell; +use rand::{rngs::StdRng, Rng, SeedableRng}; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tig_api::Api; use tig_structs::{ config::{MinMaxDifficulty, WasmVMConfig}, core::*, }; - -pub type Result = std::result::Result; +use tokio::{sync::Mutex, task::yield_now}; #[derive(Serialize, Clone, Debug)] pub struct QueryData { @@ -37,31 +43,48 @@ pub struct QueryData { pub frauds: HashMap, } -#[derive(Serialize, Clone, Debug)] -pub struct Timer { - pub start: u64, - pub end: u64, - pub now: u64, +pub struct NonceIterator { + start: u64, + current: u64, + end: u64, } -impl Timer { - fn new(ms: u64) -> Self { - let now = time(); - Timer { - start: now, - end: now + ms, - now, + +impl NonceIterator { + pub fn new(start: u64, end: u64) -> Self { + NonceIterator { + start, + current: start, + end, } } - fn update(&mut self) -> &Self { - self.now = time(); - self - } - fn finished(&self) -> bool { - self.now >= self.end + + pub fn num_attempts(&self) -> u64 { + self.current - self.start } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +impl Iterator for NonceIterator { + type Item = u64; + + fn next(&mut self) -> Option { + if self.current < self.end { + let result = Some(self.current); + self.current += 1; + result + } else { + None + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Timestamps { + pub start: u64, + pub end: u64, + pub submit: u64, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Job { pub download_url: String, pub benchmark_id: String, @@ -69,78 +92,42 @@ pub struct Job { pub solution_signature_threshold: u32, pub sampled_nonces: Option>, pub wasm_vm_config: WasmVMConfig, + pub weight: f64, + pub timestamps: Timestamps, + #[serde(skip)] + pub solutions_data: Arc>>, } -#[derive(Serialize, Debug, Clone)] -pub struct NonceIterator { - nonces: Option>, - current: u64, - attempts: u64, -} - -impl NonceIterator { - pub fn from_vec(nonces: Vec) -> Self { - Self { - nonces: Some(nonces), - current: 0, - attempts: 0, - } - } - pub fn from_u64(start: u64) -> Self { - Self { - nonces: None, - current: start, - attempts: 0, - } - } - pub fn attempts(&self) -> u64 { - self.attempts - } - pub fn is_empty(&self) -> bool { - self.nonces.as_ref().is_some_and(|x| x.is_empty()) || self.current == u64::MAX - } - pub fn empty(&mut self) { - if let Some(nonces) = self.nonces.as_mut() { - nonces.clear(); - } - self.current = u64::MAX; - } -} -impl Iterator for NonceIterator { - type Item = u64; - - fn next(&mut self) -> Option { - if let Some(nonces) = self.nonces.as_mut() { - let value = nonces.pop(); - self.attempts += value.is_some() as u64; - value - } else if self.current < u64::MAX { - let value = Some(self.current); - self.attempts += 1; - self.current += 1; - value - } else { - None +impl Job { + pub fn create_nonce_iterators(&self, num_workers: u32) -> Vec>> { + match &self.sampled_nonces { + Some(sampled_nonces) => sampled_nonces + .iter() + .map(|&n| Arc::new(Mutex::new(NonceIterator::new(n, n + 1)))) + .collect(), + None => { + let mut rng = StdRng::seed_from_u64(time()); + let offset = u64::MAX / (num_workers as u64 + 1); + let random_offset = rng.gen_range(0..offset); + (0..num_workers) + .map(|i| { + let start = random_offset + offset * i as u64; + let end = start + offset; + Arc::new(Mutex::new(NonceIterator::new(start, end))) + }) + .collect() + } } } } -#[derive(Serialize, Debug, Clone, PartialEq)] -pub enum Status { - Starting, - Running(String), - Stopping, - Stopped, -} -#[derive(Serialize, Debug, Clone)] +#[derive(Debug, Clone)] pub struct State { - pub status: Status, - pub timer: Option, pub query_data: QueryData, - pub selected_algorithms: HashMap, - pub job: Option, - pub submission_errors: HashMap, - #[serde(skip_serializing)] + pub available_jobs: HashMap, + pub pending_benchmark_jobs: HashMap, + pub pending_proof_jobs: HashMap, + pub submitted_proof_ids: HashSet, pub difficulty_samplers: HashMap, } @@ -160,305 +147,285 @@ pub fn state() -> &'static Mutex { STATE.get().expect("STATE should be initialised") } -async fn update_status(status: &str) { - let mut state = state().lock().await; - if let Status::Running(_) = state.status { - state.status = Status::Running(status.to_string()); - println!("{}", status); - #[cfg(feature = "browser")] - web_sys::console::log_1(&status.to_string().into()); - } -} - -async fn run_once(num_workers: u32, ms_per_benchmark: u32) -> Result<()> { - { - let mut state = (*state()).lock().await; - state.job = None; - state.timer = None; - } - update_status("Querying latest data").await; - // retain only benchmarks that are within the lifespan period - // preserves solution_meta_data and solution_data - let mut new_query_data = query_data::execute().await?; - if { - let state = (*state()).lock().await; - state.query_data.latest_block.id != new_query_data.latest_block.id - } { - { - let mut state = (*state()).lock().await; - let block_started_cutoff = new_query_data.latest_block.details.height.saturating_sub( - new_query_data - .latest_block - .config() - .benchmark_submissions - .lifespan_period, - ); - let mut latest_benchmarks = state.query_data.benchmarks.clone(); - latest_benchmarks.retain(|_, x| x.details.block_started >= block_started_cutoff); - latest_benchmarks.extend(new_query_data.benchmarks.drain()); - - let mut latest_proofs = state.query_data.proofs.clone(); - latest_proofs.retain(|id, _| latest_benchmarks.contains_key(id)); - latest_proofs.extend(new_query_data.proofs.drain()); - - let mut latest_frauds = state.query_data.frauds.clone(); - latest_frauds.retain(|id, _| latest_benchmarks.contains_key(id)); - latest_frauds.extend(new_query_data.frauds.drain()); - - (*state) - .submission_errors - .retain(|id, _| latest_benchmarks.contains_key(id)); - new_query_data.benchmarks = latest_benchmarks; - new_query_data.proofs = latest_proofs; - new_query_data.frauds = latest_frauds; - (*state).query_data = new_query_data; - } - - update_status("Updating difficulty sampler with query data").await; - { - let mut state = state().lock().await; - let State { - query_data, - difficulty_samplers, - .. - } = &mut (*state); - for challenge in query_data.challenges.iter() { - let difficulty_sampler = difficulty_samplers - .entry(challenge.id.clone()) - .or_insert_with(|| DifficultySampler::new()); - let min_difficulty = query_data.latest_block.config().difficulty.parameters - [&challenge.id] - .min_difficulty(); - difficulty_sampler.update_with_block_data(&min_difficulty, challenge.block_data()); - } - } - } - - update_status("Finding proof to submit").await; - match find_proof_to_submit::execute().await? { - Some((benchmark_id, solutions_data)) => { - update_status(&format!("Submitting proof for {}", benchmark_id)).await; - if let Err(e) = submit_proof::execute(benchmark_id.clone(), solutions_data).await { - let mut state = state().lock().await; - state.submission_errors.insert(benchmark_id, e.clone()); - return Err(e); - } - update_status(&format!("Success. Proof {} submitted", benchmark_id)).await; - } - None => { - update_status("No proof to submit").await; - } - } - // creates a benchmark & proof with job.benchmark_id - update_status("Selecting settings to benchmark").await; - setup_job::execute().await?; - let job = { - let state = state().lock().await; - state.job.clone().unwrap() - }; - update_status(&format!("{:?}", job.settings)).await; - - update_status(&format!( - "Downloading algorithm {}", - job.download_url.split("/").last().unwrap() - )) - .await; - let wasm = download_wasm::execute(&job).await?; - - // variables that are shared by workers - let nonce_iters = match &job.sampled_nonces { - Some(nonces) => vec![Arc::new(Mutex::new(NonceIterator::from_vec( - nonces.clone(), - )))], - None => (0..num_workers) - .into_iter() - .map(|x| { - Arc::new(Mutex::new(NonceIterator::from_u64( - u64::MAX / num_workers as u64 * x as u64, - ))) - }) - .collect(), - }; - let solutions_data = Arc::new(Mutex::new(Vec::::new())); - let solutions_count = Arc::new(Mutex::new(0u32)); - update_status("Starting benchmark").await; - run_benchmark::execute( - nonce_iters.iter().cloned().collect(), - &job, - &wasm, - solutions_data.clone(), - solutions_count.clone(), - ) - .await; - { - let mut state = state().lock().await; - (*state).timer = Some(Timer::new(ms_per_benchmark as u64)); - } - loop { - { - // transfers solutions computed by workers to benchmark state - let num_solutions = - drain_solutions(&job.benchmark_id, &mut *(*solutions_data).lock().await).await; - let mut finished = true; - let mut num_attempts = 0; - for nonce_iter in nonce_iters.iter().cloned() { - let nonce_iter = (*nonce_iter).lock().await; - num_attempts += nonce_iter.attempts(); - finished &= nonce_iter.is_empty(); - } - update_status(&format!( - "Computed {} solutions out of {} instances", - num_solutions, num_attempts - )) - .await; - let State { - status, - timer: time_left, - .. - } = &mut (*state().lock().await); - if time_left.as_mut().unwrap().update().finished() - || (finished && num_solutions == (num_attempts as u32)) // nonce_iter is only empty if recomputing - || *status == Status::Stopping - { - break; - } - } - sleep(200).await; - } - for nonce_iter in nonce_iters { - (*(*nonce_iter).lock().await).empty(); - } - - // transfers solutions computed by workers to benchmark state - let num_solutions = - drain_solutions(&job.benchmark_id, &mut *(*solutions_data).lock().await).await; - if let Some(sampled_nonces) = job.sampled_nonces.as_ref() { - if num_solutions != sampled_nonces.len() as u32 { - let mut state = (*state()).lock().await; - (*state) - .query_data - .proofs - .get_mut(&job.benchmark_id) - .unwrap() - .solutions_data - .take(); - return Err(format!( - "Failed to recompute solutions for {}", +pub async fn proof_submitter() { + async fn do_work() -> Result<()> { + println!("[proof_submitter]: checking for any proofs to submit"); + if let Some(mut job) = find_proof_to_submit::execute().await? { + println!( + "[proof_submitter]: submitting proof for benchmark {}", job.benchmark_id - )); - } else { - update_status(&format!( - "Finished. Recompute solutions for {}", - job.benchmark_id - )) - .await; - sleep(5000).await; - } - } else { - update_status("Updating difficulty sampler with solutions").await; - { - let num_solutions = *solutions_count.lock().await; - let mut state = state().lock().await; - state - .difficulty_samplers - .get_mut(&job.settings.challenge_id) - .unwrap() - .update_with_solutions(&job.settings.difficulty, num_solutions); - } - - if num_solutions == 0 { - update_status("Finished. No solutions to submit").await; - } else { - update_status(&format!("Finished. Submitting {} solutions", num_solutions,)).await; - let benchmark_id = match submit_benchmark::execute(&job).await { - Ok(benchmark_id) => benchmark_id, - Err(e) => { - let mut state = (*state()).lock().await; - state - .submission_errors - .insert(job.benchmark_id.clone(), e.clone()); - return Err(e); - } - }; - update_status(&format!("Success. Benchmark {} submitted", benchmark_id)).await; - let mut state = (*state()).lock().await; - let QueryData { - benchmarks, proofs, .. - } = &mut (*state).query_data; - let mut benchmark = benchmarks.remove(&job.benchmark_id).unwrap(); - let mut proof = proofs.remove(&job.benchmark_id).unwrap(); - benchmark.id = benchmark_id.clone(); - proof.benchmark_id = benchmark_id.clone(); - benchmarks.insert(benchmark_id.clone(), benchmark); - proofs.insert(benchmark_id.clone(), proof); - } - } - Ok(()) -} - -pub async fn drain_solutions(benchmark_id: &String, solutions_data: &mut Vec) -> u32 { - let mut state = (*state()).lock().await; - let QueryData { - benchmarks, proofs, .. - } = &mut (*state).query_data; - if let Some(benchmark) = benchmarks.get_mut(benchmark_id) { - let proof = proofs.get_mut(benchmark_id).unwrap(); - if let Some(x) = benchmark.solutions_meta_data.as_mut() { - x.extend( - solutions_data - .iter() - .map(|x| SolutionMetaData::from(x.clone())), ); - benchmark.details.num_solutions = x.len() as u32; - } - let to_update = proof.solutions_data.as_mut().unwrap(); - to_update.extend(solutions_data.drain(..)); - to_update.len() as u32 - } else { - 0 - } -} -pub async fn start(num_workers: u32, ms_per_benchmark: u32) { - { - let mut state = (*state()).lock().await; - if state.status != Status::Stopped { - return; - } - state.status = Status::Starting; - } - spawn(async move { - { - let mut state = (*state()).lock().await; - state.status = Status::Running("Starting".to_string()); - } - loop { { let mut state = (*state()).lock().await; - if state.status == Status::Stopping { - state.status = Status::Stopped; + state.submitted_proof_ids.insert(job.benchmark_id.clone()); + } + match submit_proof::execute(&job).await { + Ok(_) => { + println!( + "[proof_submitter]: successfully submitted proof for benchmark {}", + job.benchmark_id + ); + } + Err(e) => { + println!("[proof_submitter]: failed to submit proof: {:?}", e); + // FIXME hacky way to check for 4xx status + if !e.contains("status: 4") { + println!("[proof_submitter]: re-queueing proof for another submit attempt 10s later"); + job.timestamps.submit = time() + 10000; + let mut state = (*state()).lock().await; + state.submitted_proof_ids.remove(&job.benchmark_id); + state + .pending_proof_jobs + .insert(job.benchmark_id.clone(), job); + } } } - if let Err(e) = run_once(num_workers, ms_per_benchmark).await { - update_status(&format!("Error: {:?}", e)).await; - sleep(5000).await; - } + } else { + println!("[proof_submitter]: no proofs to submit"); } - }); -} -pub async fn stop() { - let mut state = (*state()).lock().await; - match state.status { - Status::Running(_) => { - state.status = Status::Stopping; + Ok(()) + } + + loop { + if let Err(e) = do_work().await { + println!("[proof_submitter]: error: {:?}", e); } - _ => {} + println!("[proof_submitter]: sleeping 5s"); + sleep(5000).await; } } -pub async fn select_algorithm(challenge_name: String, algorithm_name: String) { - let mut state = (*state()).lock().await; - state - .selected_algorithms - .insert(challenge_name, algorithm_name); + +pub async fn benchmark_submitter() { + async fn do_work() -> Result<()> { + println!("[benchmark_submitter]: checking for any benchmarks to submit"); + if let Some(mut job) = find_benchmark_to_submit::execute().await? { + let num_solutions = { + let solutions_data = job.solutions_data.lock().await; + solutions_data.len() + }; + println!( + "[benchmark_submitter]: submitting benchmark {:?} with {} solutions", + job.settings, num_solutions + ); + match submit_benchmark::execute(&job).await { + Ok(benchmark_id) => { + job.benchmark_id = benchmark_id.clone(); + job.timestamps.submit = time(); + println!( + "[benchmark_submitter]: successfully submitted benchmark {}", + benchmark_id + ); + { + let mut state = (*state()).lock().await; + state.pending_proof_jobs.insert(benchmark_id, job); + } + } + Err(e) => { + println!("[benchmark_submitter]: failed to submit benchmark: {:?}", e); + // FIXME hacky way to check for 4xx status + if !e.contains("status: 4") { + println!("[benchmark_submitter]: re-queueing benchmark for another submit attempt 10s later"); + job.timestamps.submit = time() + 10000; + let mut state = (*state()).lock().await; + state + .pending_benchmark_jobs + .insert(job.benchmark_id.clone(), job); + } + } + } + } else { + println!("[benchmark_submitter]: no benchmarks to submit"); + } + Ok(()) + } + + loop { + if let Err(e) = do_work().await { + println!("[benchmark_submitter]: error: {:?}", e); + } + println!("[benchmark_submitter]: sleeping 5s"); + sleep(5000).await; + } +} + +pub async fn data_fetcher() { + async fn do_work() -> Result<()> { + println!("[data_fetcher]: fetching latest data"); + let new_query_data = query_data::execute().await?; + if { + let state = (*state()).lock().await; + state.query_data.latest_block.id != new_query_data.latest_block.id + } { + println!( + "[data_fetcher]: got new block {} @ {}", + new_query_data.latest_block.id, new_query_data.latest_block.details.height + ); + { + let mut state = (*state()).lock().await; + (*state).query_data = new_query_data; + } + + println!("[data_fetcher]: updating difficulty samplers"); + { + let mut state = state().lock().await; + let State { + query_data, + difficulty_samplers, + .. + } = &mut (*state); + for challenge in query_data.challenges.iter() { + let difficulty_sampler = difficulty_samplers + .entry(challenge.id.clone()) + .or_insert_with(|| DifficultySampler::new()); + let min_difficulty = query_data.latest_block.config().difficulty.parameters + [&challenge.id] + .min_difficulty(); + difficulty_sampler + .update_with_block_data(&min_difficulty, challenge.block_data()); + } + } + } else { + println!("[data_fetcher]: no new data"); + } + Ok(()) + } + + loop { + if let Err(e) = do_work().await { + println!("[data_fetcher]: error: {:?}", e); + } + println!("[data_fetcher]: sleeping 10s"); + sleep(10000).await; + } +} + +pub async fn benchmarker( + selected_algorithms: HashMap, + num_workers: u32, + benchmark_duration_ms: u64, + submit_delay_ms: u64, +) { + async fn do_work( + selected_algorithms: &HashMap, + num_workers: u32, + benchmark_duration_ms: u64, + submit_delay_ms: u64, + ) -> Result<()> { + println!("[benchmarker]: setting up jobs"); + let jobs = setup_jobs::execute(selected_algorithms, benchmark_duration_ms, submit_delay_ms) + .await?; + for (i, job) in jobs.values().enumerate() { + println!( + "[benchmarker]: job {}: {:?}, weight: {}", + i, job.settings, job.weight + ); + } + { + let mut state = state().lock().await; + state.available_jobs = jobs.clone(); + } + + println!("[benchmarker]: finding proofs to re-compute"); + let job = { + if let Some(job) = + find_proof_to_recompute::execute(benchmark_duration_ms + 5000).await? + { + println!( + "[benchmarker]: found proof to recompute: {}", + job.benchmark_id + ); + let mut state = state().lock().await; + state.query_data.proofs.insert( + job.benchmark_id.clone(), + Proof { + benchmark_id: job.benchmark_id.clone(), + state: None, + solutions_data: Some(Vec::new()), + }, + ); + job + } else { + println!("[benchmarker]: no proofs to recompute"); + println!("[benchmarker]: weighted sampling one of the available jobs"); + select_job::execute(&jobs).await? + } + }; + + println!( + "[benchmarker]: downloading algorithm {}", + job.download_url.split("/").last().unwrap() + ); + let wasm = download_wasm::execute(&job).await?; + + println!("[benchmarker]: starting benchmark {:?}", job.settings); + let nonce_iterators = job.create_nonce_iterators(num_workers); + run_benchmark::execute(nonce_iterators.clone(), &job, &wasm).await; + loop { + { + let mut num_attempts = 0; + for nonce_iterator in &nonce_iterators { + num_attempts += nonce_iterator.lock().await.num_attempts(); + } + let num_solutions = job.solutions_data.lock().await.len(); + let elapsed = time() - job.timestamps.start; + if num_workers > 0 || job.sampled_nonces.is_some() { + println!( + "[benchmarker]: #solutions: {}, #instances: {}, elapsed: {}ms", + num_solutions, num_attempts, elapsed + ); + } + if time() >= job.timestamps.end + || job + .sampled_nonces + .as_ref() + .is_some_and(|sampled_nonces| num_solutions == sampled_nonces.len()) + { + break; + } + } + sleep(500).await; + } + { + let mut num_attempts = 0; + for nonce_iterator in &nonce_iterators { + num_attempts += nonce_iterator.lock().await.num_attempts(); + } + let mut state = state().lock().await; + let num_solutions = job.solutions_data.lock().await.len() as u32; + if job.sampled_nonces.is_some() { + state + .pending_proof_jobs + .insert(job.benchmark_id.clone(), job); + } else if num_attempts > 0 { + state + .difficulty_samplers + .get_mut(&job.settings.challenge_id) + .unwrap() + .update_with_solutions(&job.settings.difficulty, num_solutions); + } + + let jobs = state.available_jobs.drain().collect::>(); + state.pending_benchmark_jobs.extend(jobs); + } + Ok(()) + } + + loop { + if let Err(e) = do_work( + &selected_algorithms, + num_workers, + benchmark_duration_ms, + submit_delay_ms, + ) + .await + { + println!("[benchmarker]: error: {:?}", e); + println!("[benchmarker]: sleeping 5s"); + sleep(5000).await; + } else { + yield_now().await; + } + } } pub async fn setup(api_url: String, api_key: String, player_id: String) { @@ -476,13 +443,12 @@ pub async fn setup(api_url: String, api_key: String, player_id: String) { } STATE.get_or_init(|| { Mutex::new(State { - status: Status::Stopped, - timer: None, query_data, difficulty_samplers, - selected_algorithms: HashMap::new(), - job: None, - submission_errors: HashMap::new(), + available_jobs: HashMap::new(), + pending_benchmark_jobs: HashMap::new(), + pending_proof_jobs: HashMap::new(), + submitted_proof_ids: HashSet::new(), }) }); } diff --git a/tig-benchmarker/src/benchmarker/query_data.rs b/tig-benchmarker/src/benchmarker/query_data.rs index 4e8432d..03062f0 100644 --- a/tig-benchmarker/src/benchmarker/query_data.rs +++ b/tig-benchmarker/src/benchmarker/query_data.rs @@ -1,9 +1,9 @@ use super::{api, player_id, QueryData, Result}; -use crate::future_utils::{join, Mutex}; use once_cell::sync::OnceCell; use std::collections::HashMap; use tig_api::*; use tig_structs::core::*; +use tokio::{join, sync::Mutex}; static CACHE: OnceCell>> = OnceCell::new(); @@ -14,13 +14,12 @@ pub async fn execute() -> Result { let mut cache = cache.lock().await; if !cache.contains_key(&latest_block.id) { cache.clear(); - let results = join( + let results = join!( query_algorithms(latest_block.id.clone()), query_player_data(latest_block.id.clone()), query_benchmarks(latest_block.id.clone()), query_challenges(latest_block.id.clone()), - ) - .await?; + ); let (algorithms_by_challenge, download_urls) = results.0?; let player_data = results.1?; let (benchmarks, proofs, frauds) = results.2?; diff --git a/tig-benchmarker/src/benchmarker/run_benchmark.rs b/tig-benchmarker/src/benchmarker/run_benchmark.rs index a86e473..5ccfbf1 100644 --- a/tig-benchmarker/src/benchmarker/run_benchmark.rs +++ b/tig-benchmarker/src/benchmarker/run_benchmark.rs @@ -1,33 +1,26 @@ use super::{Job, NonceIterator}; -use crate::future_utils; -use future_utils::{spawn, time, yield_now, Mutex}; +use crate::utils; use std::sync::Arc; use tig_algorithms::{c001, c002, c003, c004}; use tig_challenges::ChallengeTrait; -use tig_worker::{compute_solution, verify_solution, SolutionData}; +use tig_worker::{compute_solution, verify_solution}; +use tokio::{spawn, sync::Mutex, task::yield_now}; +use utils::time; -pub async fn execute( - nonce_iters: Vec>>, - job: &Job, - wasm: &Vec, - solutions_data: Arc>>, - solutions_count: Arc>, -) { - for nonce_iter in nonce_iters { +pub async fn execute(nonce_iterators: Vec>>, job: &Job, wasm: &Vec) { + for nonce_iterator in nonce_iterators { let job = job.clone(); let wasm = wasm.clone(); - let solutions_data = solutions_data.clone(); - let solutions_count = solutions_count.clone(); spawn(async move { let mut last_yield = time(); loop { - match { - let mut nonce_iter = (*nonce_iter).lock().await; - (*nonce_iter).next() - } { + let now = time(); + if now >= job.timestamps.end { + break; + } + match { nonce_iterator.lock().await.next() } { None => break, Some(nonce) => { - let now = time(); if now - last_yield > 25 { yield_now().await; last_yield = now; @@ -12137,15 +12130,11 @@ pub async fn execute( if verify_solution(&job.settings, nonce, &solution_data.solution) .is_ok() { - { - let mut solutions_count = (*solutions_count).lock().await; - *solutions_count += 1; - } if solution_data.calc_solution_signature() <= job.solution_signature_threshold { - let mut solutions_data = (*solutions_data).lock().await; - (*solutions_data).push(solution_data); + let mut solutions_data = job.solutions_data.lock().await; + (*solutions_data).insert(nonce, solution_data); } } } diff --git a/tig-benchmarker/src/benchmarker/select_job.rs b/tig-benchmarker/src/benchmarker/select_job.rs new file mode 100644 index 0000000..d3e6ac6 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/select_job.rs @@ -0,0 +1,22 @@ +use std::collections::HashMap; + +use super::{Job, Result}; +use crate::utils::time; +use rand::{distributions::WeightedIndex, rngs::StdRng, SeedableRng}; +use rand_distr::Distribution; + +pub async fn execute(available_jobs: &HashMap) -> Result { + let benchmark_ids = available_jobs.keys().cloned().collect::>(); + let weights = benchmark_ids + .iter() + .map(|benchmark_id| available_jobs[benchmark_id].weight.clone()) + .collect::>(); + if weights.len() == 0 { + return Err("No jobs available".to_string()); + } + let dist = WeightedIndex::new(&weights) + .map_err(|e| format!("Failed to create WeightedIndex: {}", e))?; + let mut rng = StdRng::seed_from_u64(time()); + let index = dist.sample(&mut rng); + Ok(available_jobs[&benchmark_ids[index]].clone()) +} diff --git a/tig-benchmarker/src/benchmarker/setup_job.rs b/tig-benchmarker/src/benchmarker/setup_job.rs deleted file mode 100644 index 0afac0a..0000000 --- a/tig-benchmarker/src/benchmarker/setup_job.rs +++ /dev/null @@ -1,221 +0,0 @@ -use super::{player_id, state, Job, QueryData, Result, State}; -use crate::future_utils::time; -use rand::{ - distributions::{Alphanumeric, DistString, WeightedIndex}, - rngs::StdRng, - SeedableRng, -}; -use rand_distr::Distribution; -use std::collections::HashMap; -use tig_structs::core::*; - -pub async fn execute() -> Result<()> { - let job = if let Some(x) = find_settings_to_recompute().await? { - x - } else { - pick_settings_to_benchmark().await? - }; - let mut state = state().lock().await; - (*state).job.replace(job.clone()); - let QueryData { - latest_block, - benchmarks, - proofs, - .. - } = &mut state.query_data; - if job.sampled_nonces.is_none() { - benchmarks.insert( - job.benchmark_id.clone(), - Benchmark { - id: job.benchmark_id.clone(), - settings: job.settings.clone(), - details: BenchmarkDetails { - block_started: latest_block.details.height.clone(), - num_solutions: 0, - }, - state: None, - solutions_meta_data: Some(Vec::new()), - solution_data: None, - }, - ); - } - proofs.insert( - job.benchmark_id.clone(), - Proof { - benchmark_id: job.benchmark_id.clone(), - state: None, - solutions_data: Some(Vec::new()), - }, - ); - Ok(()) -} - -async fn find_settings_to_recompute() -> Result> { - let QueryData { - latest_block, - benchmarks, - proofs, - frauds, - download_urls, - .. - } = &state().lock().await.query_data; - for (benchmark_id, benchmark) in benchmarks.iter() { - if !frauds.contains_key(benchmark_id) - && !proofs.contains_key(benchmark_id) - && benchmark.state.is_some() - { - let sampled_nonces = benchmark.state().sampled_nonces.clone().ok_or_else(|| { - format!( - "Expecting benchmark '{}' to have sampled_nonces", - benchmark_id - ) - })?; - return Ok(Some(Job { - benchmark_id: benchmark.id.clone(), - download_url: get_download_url(&benchmark.settings.algorithm_id, download_urls)?, - settings: benchmark.settings.clone(), - solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud - sampled_nonces: Some(sampled_nonces), - wasm_vm_config: latest_block.config().wasm_vm.clone(), - })); - } - } - Ok(None) -} - -async fn pick_settings_to_benchmark() -> Result { - let State { - query_data, - selected_algorithms, - difficulty_samplers, - .. - } = &(*state().lock().await); - let QueryData { - latest_block, - player_data, - challenges, - download_urls, - algorithms_by_challenge, - .. - } = query_data; - let mut rng = StdRng::seed_from_u64(time() as u64); - let challenge = pick_challenge(&mut rng, player_data, challenges, selected_algorithms)?; - let selected_algorithm_id = get_algorithm_id( - algorithms_by_challenge, - challenge, - download_urls, - &selected_algorithms[&challenge.details.name], - )?; - let difficulty = difficulty_samplers[&challenge.id].sample(&mut rng); - Ok(Job { - benchmark_id: Alphanumeric.sample_string(&mut rng, 32), - download_url: get_download_url(&selected_algorithm_id, download_urls)?, - settings: BenchmarkSettings { - player_id: player_id().clone(), - block_id: latest_block.id.clone(), - challenge_id: challenge.id.clone(), - algorithm_id: selected_algorithm_id, - difficulty, - }, - solution_signature_threshold: *challenge.block_data().solution_signature_threshold(), - sampled_nonces: None, - wasm_vm_config: latest_block.config().wasm_vm.clone(), - }) -} - -fn pick_challenge<'a>( - rng: &mut StdRng, - player_data: &'a Option, - challenges: &'a Vec, - selected_algorithms: &HashMap, -) -> Result<&'a Challenge> { - let num_qualifiers_by_challenge = match player_data - .as_ref() - .map(|x| x.num_qualifiers_by_challenge.as_ref()) - { - Some(Some(num_qualifiers_by_challenge)) => num_qualifiers_by_challenge.clone(), - _ => HashMap::new(), - }; - let challenge_name_2_id: HashMap = challenges - .iter() - .map(|c| (c.details.name.clone(), c.id.clone())) - .collect(); - let percent_qualifiers_by_challenge: HashMap = challenges - .iter() - .map(|c| { - let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0); - let challenge_num_qualifiers = *c.block_data().num_qualifiers(); - let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 { - 0f64 - } else { - (player_num_qualifiers as f64) / (challenge_num_qualifiers as f64) - }; - (c.id.clone(), percent) - }) - .collect(); - if selected_algorithms.len() == 0 { - return Err("Your .json is empty".to_string()); - }; - let mut challenge_weights = Vec::<(String, f64)>::new(); - for challenge_name in selected_algorithms.keys() { - let challenge_id = challenge_name_2_id.get(challenge_name).ok_or_else(|| { - format!( - "Your .json contains a non-existent challenge '{}'", - challenge_name - ) - })?; - let max_percent_qualifiers = *percent_qualifiers_by_challenge - .values() - .max_by(|a, b| a.partial_cmp(b).unwrap()) - .unwrap(); - challenge_weights.push(( - challenge_id.clone(), - 4.0 * max_percent_qualifiers / 3.0 - percent_qualifiers_by_challenge[challenge_id] - + 1e-10f64, - )); - } - let dist = WeightedIndex::new( - &challenge_weights - .iter() - .map(|w| w.1.clone()) - .collect::>(), - ) - .map_err(|e| format!("Failed to create WeightedIndex: {}", e))?; - let index = dist.sample(rng); - let random_challenge_id = challenge_weights[index].0.clone(); - let challenge = challenges - .iter() - .find(|c| c.id == *random_challenge_id) - .ok_or_else(|| "Selected challenge should exist")?; - Ok(challenge) -} - -fn get_algorithm_id( - algorithms_by_challenge: &HashMap>, - challenge: &Challenge, - download_urls: &HashMap, - selected_algorithm_name: &String, -) -> Result { - let selected_algorithm_id = algorithms_by_challenge[&challenge.id] - .iter() - .find(|a| download_urls.contains_key(&a.id) && a.details.name == *selected_algorithm_name) - .ok_or_else(|| { - format!( - "Your .json contains a non-existent algorithm '{}'", - selected_algorithm_name - ) - })? - .id - .clone(); - Ok(selected_algorithm_id) -} - -fn get_download_url( - algorithm_id: &String, - download_urls: &HashMap, -) -> Result { - Ok(download_urls - .get(algorithm_id) - .ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))? - .clone()) -} diff --git a/tig-benchmarker/src/benchmarker/setup_jobs.rs b/tig-benchmarker/src/benchmarker/setup_jobs.rs new file mode 100644 index 0000000..ba9d6cf --- /dev/null +++ b/tig-benchmarker/src/benchmarker/setup_jobs.rs @@ -0,0 +1,117 @@ +use super::{player_id, state, Job, QueryData, Result, State, Timestamps}; +use crate::utils::time; +use rand::{rngs::StdRng, SeedableRng}; +use std::{collections::HashMap, sync::Arc}; +use tig_structs::core::*; +use tokio::sync::Mutex; + +pub async fn execute( + selected_algorithms: &HashMap, + benchmark_duration_ms: u64, + submit_delay_ms: u64, +) -> Result> { + let State { + query_data, + difficulty_samplers, + .. + } = &(*state().lock().await); + let QueryData { + latest_block, + player_data, + challenges, + download_urls, + algorithms_by_challenge, + .. + } = query_data; + + let challenge_weights = get_challenge_weights(player_data, challenges); + let start = time(); + let end = start + benchmark_duration_ms; + let submit = end + submit_delay_ms; + let mut jobs = HashMap::new(); + for (challenge_name, algorithm_name) in selected_algorithms.iter() { + let challenge = challenges + .iter() + .find(|c| c.details.name == *challenge_name) + .ok_or_else(|| { + format!( + "Your selected_algorithms contains a non-existent challenge '{}'", + challenge_name + ) + })?; + + let algorithm = algorithms_by_challenge[&challenge.id] + .iter() + .find(|a| download_urls.contains_key(&a.id) && a.details.name == *algorithm_name) + .ok_or_else(|| { + format!( + "Your selected_algorithms contains a non-existent algorithm '{}'", + algorithm_name + ) + })?; + + let mut rng = StdRng::seed_from_u64(time() as u64); + let difficulty = difficulty_samplers[&challenge.id].sample(&mut rng); + let job = Job { + benchmark_id: format!("{}_{}_{}", challenge_name, algorithm_name, time()), + download_url: download_urls.get(&algorithm.id).cloned().ok_or_else(|| { + format!("Expecting download_url for algorithm '{}'", algorithm.id) + })?, + settings: BenchmarkSettings { + player_id: player_id().clone(), + block_id: latest_block.id.clone(), + challenge_id: challenge.id.clone(), + algorithm_id: algorithm.id.clone(), + difficulty, + }, + solution_signature_threshold: *challenge.block_data().solution_signature_threshold(), + sampled_nonces: None, + wasm_vm_config: latest_block.config().wasm_vm.clone(), + weight: challenge_weights[&challenge.id], + timestamps: Timestamps { start, end, submit }, + solutions_data: Arc::new(Mutex::new(HashMap::new())), + }; + jobs.insert(job.benchmark_id.clone(), job); + } + Ok(jobs) +} + +fn get_challenge_weights( + player_data: &Option, + challenges: &Vec, +) -> HashMap { + let num_qualifiers_by_challenge = match player_data + .as_ref() + .map(|x| x.num_qualifiers_by_challenge.as_ref()) + { + Some(Some(num_qualifiers_by_challenge)) => num_qualifiers_by_challenge.clone(), + _ => HashMap::new(), + }; + let percent_qualifiers_by_challenge: HashMap = challenges + .iter() + .map(|c| { + let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0); + let challenge_num_qualifiers = *c.block_data().num_qualifiers(); + let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 { + 0f64 + } else { + (player_num_qualifiers as f64) / (challenge_num_qualifiers as f64) + }; + (c.id.clone(), percent) + }) + .collect(); + let max_percent_qualifiers = *percent_qualifiers_by_challenge + .values() + .max_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap(); + let challenge_weights = percent_qualifiers_by_challenge + .into_iter() + .map(|(challenge_id, p)| { + ( + challenge_id, + 4.0 * max_percent_qualifiers / 3.0 - p + 1e-10f64, + ) + }) + .collect(); + challenge_weights +} diff --git a/tig-benchmarker/src/benchmarker/submit_benchmark.rs b/tig-benchmarker/src/benchmarker/submit_benchmark.rs index 5c9ff3f..d49b2b6 100644 --- a/tig-benchmarker/src/benchmarker/submit_benchmark.rs +++ b/tig-benchmarker/src/benchmarker/submit_benchmark.rs @@ -1,49 +1,26 @@ -use super::{api, state, Job, QueryData, Result}; -use crate::future_utils::sleep; +use super::{api, Job, Result}; use tig_api::SubmitBenchmarkReq; - -const MAX_RETRIES: u32 = 3; +use tig_structs::core::SolutionMetaData; pub async fn execute(job: &Job) -> Result { let req = { - let QueryData { - proofs, benchmarks, .. - } = &mut state().lock().await.query_data; - let benchmark = benchmarks - .get_mut(&job.benchmark_id) - .ok_or_else(|| format!("Job benchmark should exist"))?; - let proof = proofs - .get(&job.benchmark_id) - .ok_or_else(|| format!("Job proof should exist"))?; - let settings = benchmark.settings.clone(); - let solutions_meta_data = benchmark.solutions_meta_data.take().unwrap(); - let solution_data = proof.solutions_data().first().unwrap().clone(); + let solutions_data = job.solutions_data.lock().await; SubmitBenchmarkReq { - settings, - solutions_meta_data, - solution_data, + settings: job.settings.clone(), + solutions_meta_data: solutions_data + .values() + .map(|x| SolutionMetaData::from(x.clone())) + .collect(), + solution_data: solutions_data.values().next().cloned().unwrap(), } }; - for attempt in 1..=MAX_RETRIES { - println!("Submission attempt {} of {}", attempt, MAX_RETRIES); - match api().submit_benchmark(req.clone()).await { - Ok(resp) => { - return match resp.verified { - Ok(_) => Ok(resp.benchmark_id), - Err(e) => Err(format!("Benchmark flagged as fraud: {}", e)), - } - } - Err(e) => { - let err_msg = format!("Failed to submit benchmark: {:?}", e); - if attempt < MAX_RETRIES { - println!("{}", err_msg); - println!("Retrying in 5 seconds..."); - sleep(5000).await; - } else { - return Err(err_msg); - } + match api().submit_benchmark(req.clone()).await { + Ok(resp) => { + return match resp.verified { + Ok(_) => Ok(resp.benchmark_id), + Err(e) => Err(format!("Benchmark flagged as fraud: {}", e)), } } + Err(e) => Err(format!("Failed to submit benchmark: {:?}", e)), } - unreachable!() } diff --git a/tig-benchmarker/src/benchmarker/submit_proof.rs b/tig-benchmarker/src/benchmarker/submit_proof.rs index ad3fb20..40776a4 100644 --- a/tig-benchmarker/src/benchmarker/submit_proof.rs +++ b/tig-benchmarker/src/benchmarker/submit_proof.rs @@ -1,35 +1,27 @@ -use super::{api, Result}; -use crate::future_utils::sleep; +use super::{api, Job, Result}; use tig_api::SubmitProofReq; -use tig_worker::SolutionData; -const MAX_RETRIES: u32 = 3; - -pub async fn execute(benchmark_id: String, solutions_data: Vec) -> Result<()> { - let req = SubmitProofReq { - benchmark_id, - solutions_data, +pub async fn execute(job: &Job) -> Result<()> { + let req = { + let solutions_data = job.solutions_data.lock().await; + SubmitProofReq { + benchmark_id: job.benchmark_id.clone(), + solutions_data: job + .sampled_nonces + .as_ref() + .expect("Expected sampled nonces") + .iter() + .map(|n| solutions_data[n].clone()) + .collect(), + } }; - for attempt in 1..=MAX_RETRIES { - println!("Submission attempt {} of {}", attempt, MAX_RETRIES); - match api().submit_proof(req.clone()).await { - Ok(resp) => { - return match resp.verified { - Ok(_) => Ok(()), - Err(e) => Err(format!("Proof flagged as fraud: {}", e)), - } - } - Err(e) => { - let err_msg = format!("Failed to submit proof: {:?}", e); - if attempt < MAX_RETRIES { - println!("{}", err_msg); - println!("Retrying in 5 seconds..."); - sleep(5000).await; - } else { - return Err(err_msg); - } + match api().submit_proof(req.clone()).await { + Ok(resp) => { + return match resp.verified { + Ok(_) => Ok(()), + Err(e) => Err(format!("Proof flagged as fraud: {}", e)), } } + Err(e) => Err(format!("Failed to submit proof: {:?}", e)), } - unreachable!() } diff --git a/tig-benchmarker/src/future_utils.rs b/tig-benchmarker/src/future_utils.rs deleted file mode 100644 index 548f29b..0000000 --- a/tig-benchmarker/src/future_utils.rs +++ /dev/null @@ -1,121 +0,0 @@ -#[cfg(all(feature = "standalone", feature = "browser"))] -compile_error!("features `standalone` and `browser` are mutually exclusive"); - -use serde::{de::DeserializeOwned, Serialize}; -use std::future::Future; - -#[cfg(feature = "standalone")] -mod utils { - use super::*; - use std::time::{SystemTime, UNIX_EPOCH}; - pub use tokio::sync::Mutex; - use tokio::{join, task, time}; - - pub async fn join( - a: impl Future + 'static, - b: impl Future + 'static, - c: impl Future + 'static, - d: impl Future + 'static, - ) -> Result<(T, U, V, W), String> - where - T: Serialize + DeserializeOwned + 'static, - U: Serialize + DeserializeOwned + 'static, - V: Serialize + DeserializeOwned + 'static, - W: Serialize + DeserializeOwned + 'static, - { - Ok(join!(a, b, c, d)) - } - - pub fn spawn(f: impl Future + 'static + Send) { - tokio::spawn(f); - } - - pub async fn yield_now() { - task::yield_now().await - } - - pub async fn sleep(ms: u32) { - time::sleep(time::Duration::from_millis(ms as u64)).await; - } - - pub fn time() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64 - } -} - -#[cfg(feature = "browser")] -mod utils { - use super::*; - pub use futures::lock::Mutex; - use gloo_timers::future::TimeoutFuture; - use js_sys::{Array, Date, Promise}; - use serde_wasm_bindgen::{from_value, to_value}; - use wasm_bindgen::prelude::*; - use wasm_bindgen_futures::future_to_promise; - use wasm_bindgen_futures::JsFuture; - - fn to_string(e: T) -> String { - format!("{:?}", e) - } - pub async fn join( - a: impl Future + 'static, - b: impl Future + 'static, - c: impl Future + 'static, - d: impl Future + 'static, - ) -> Result<(T, U, V, W), String> - where - T: Serialize + DeserializeOwned + 'static, - U: Serialize + DeserializeOwned + 'static, - V: Serialize + DeserializeOwned + 'static, - W: Serialize + DeserializeOwned + 'static, - { - let a = future_to_promise(async move { Ok(to_value(&a.await)?) }); - let b = future_to_promise(async move { Ok(to_value(&b.await)?) }); - let c = future_to_promise(async move { Ok(to_value(&c.await)?) }); - let d = future_to_promise(async move { Ok(to_value(&d.await)?) }); - - let promises = Array::new(); - promises.push(&a); - promises.push(&b); - promises.push(&c); - promises.push(&d); - - let js_promise = Promise::all(&promises); - let js_values = JsFuture::from(js_promise).await.map_err(to_string)?; - - let values = js_values.dyn_into::().map_err(to_string)?; - let results = ( - from_value(values.get(0)).map_err(to_string)?, - from_value(values.get(1)).map_err(to_string)?, - from_value(values.get(2)).map_err(to_string)?, - from_value(values.get(3)).map_err(to_string)?, - ); - - Ok(results) - } - - pub fn spawn(f: impl Future + 'static) { - // Convert the Rust Future into a JavaScript Promise - let _ = future_to_promise(async move { - f.await; - Ok(JsValue::undefined()) - }); - } - - pub async fn yield_now() { - TimeoutFuture::new(0).await; - } - - pub async fn sleep(ms: u32) { - TimeoutFuture::new(ms).await; - } - - pub fn time() -> u64 { - Date::now() as u64 - } -} - -pub use utils::*; diff --git a/tig-benchmarker/src/lib.rs b/tig-benchmarker/src/lib.rs deleted file mode 100644 index b757d04..0000000 --- a/tig-benchmarker/src/lib.rs +++ /dev/null @@ -1,34 +0,0 @@ -mod benchmarker; -mod future_utils; - -#[cfg(feature = "browser")] -mod exports { - use super::*; - use wasm_bindgen::prelude::*; - - #[wasm_bindgen] - pub async fn state() -> JsValue { - let state = benchmarker::state().lock().await.clone(); - serde_wasm_bindgen::to_value(&state).unwrap() - } - - #[wasm_bindgen] - pub async fn start(num_workers: u32, ms_per_benchmark: u32) { - benchmarker::start(num_workers, ms_per_benchmark).await; - } - - #[wasm_bindgen] - pub async fn stop() { - benchmarker::stop().await; - } - - #[wasm_bindgen] - pub async fn select_algorithm(challenge_name: String, algorithm_name: String) { - benchmarker::select_algorithm(challenge_name, algorithm_name).await; - } - - #[wasm_bindgen] - pub async fn setup(api_url: String, api_key: String, player_id: String) { - benchmarker::setup(api_url, api_key, player_id.to_string()).await; - } -} diff --git a/tig-benchmarker/src/main.rs b/tig-benchmarker/src/main.rs index 0402b24..f133029 100644 --- a/tig-benchmarker/src/main.rs +++ b/tig-benchmarker/src/main.rs @@ -1,14 +1,12 @@ -// #[cfg(any(not(feature = "standalone"), feature = "browser"))] -// compile_error!("to build the binary use `--no-default-features --features standalone`"); - mod benchmarker; -mod future_utils; -use benchmarker::{Job, NonceIterator}; +mod utils; +use benchmarker::{Job, State}; use clap::{value_parser, Arg, Command}; -use future_utils::{sleep, Mutex}; -use std::{collections::HashMap, fs, path::PathBuf, sync::Arc}; +use std::collections::HashMap; use tig_structs::core::*; use tig_utils::{dejsonify, get, jsonify, post}; +use tokio::{spawn, task::yield_now}; +use utils::{sleep, time, Result}; use warp::Filter; fn cli() -> Command { @@ -28,10 +26,10 @@ fn cli() -> Command { .value_parser(value_parser!(String)), ) .arg( - Arg::new("ALGORITHMS_SELECTION") - .help("Path to json file with your algorithm selection") + Arg::new("SELECTED_ALGORITHMS") + .help("Json string with your algorithm selection") .required(true) - .value_parser(value_parser!(PathBuf)), + .value_parser(value_parser!(String)), ) .arg( Arg::new("workers") @@ -43,8 +41,15 @@ fn cli() -> Command { .arg( Arg::new("duration") .long("duration") - .help("(Optional) Set duration of a benchmark in milliseconds") - .default_value("7500") + .help("(Optional) Set duration (in milliseconds) of a benchmark") + .default_value("10000") + .value_parser(value_parser!(u32)), + ) + .arg( + Arg::new("delay") + .long("delay") + .help("(Optional) Set delay (in milliseconds) between benchmark finishing and submitting") + .default_value("5000") .value_parser(value_parser!(u32)), ) .arg( @@ -67,223 +72,235 @@ fn cli() -> Command { .help("(Optional) Set hostname for master node to connect to") .value_parser(value_parser!(String)), ) - .arg( - Arg::new("offset") - .long("offset") - .help("(Optional) Set nonce offset for each slave") - .default_value("5000000") - .value_parser(value_parser!(u64)), - ) } #[tokio::main] async fn main() { let matches = cli().get_matches(); - let algorithms_path = matches.get_one::("ALGORITHMS_SELECTION").unwrap(); + let selected_algorithms = matches + .get_one::("SELECTED_ALGORITHMS") + .unwrap() + .clone(); let num_workers = *matches.get_one::("workers").unwrap(); let port = *matches.get_one::("port").unwrap(); let duration = *matches.get_one::("duration").unwrap(); + let delay = *matches.get_one::("delay").unwrap(); let api_url = matches.get_one::("api").unwrap().clone(); let api_key = matches.get_one::("API_KEY").unwrap().clone(); let player_id = matches.get_one::("PLAYER_ID").unwrap().clone(); - let nonce_offset = matches.get_one::("offset").unwrap().clone(); if let Some(master) = matches.get_one::("master") { - slave_node(master, port, num_workers).await; + slave(master, port, num_workers, selected_algorithms).await; } else { - master_node( + master( api_url, api_key, player_id, num_workers, duration, - algorithms_path, + delay, + selected_algorithms, port, - nonce_offset, ) .await } } -async fn slave_node(master: &String, port: u16, num_workers: u32) { - let master_url = format!("http://{}:{}", master, port); - let mut job: Option = None; - let mut nonce_iters: Vec>> = Vec::new(); - let mut solutions_data = Arc::new(Mutex::new(Vec::::new())); - let mut solutions_count = Arc::new(Mutex::new(0u32)); - let mut num_solutions = 0; - loop { - let next_job = match get::(&format!("{}/job", master_url), None).await { - Ok(resp) => dejsonify::>(&resp).unwrap(), +async fn slave(master: &String, port: u16, num_workers: u32, selected_algorithms: String) { + println!( + "[slave] parsing selected algorithms from: {:?}", + selected_algorithms + ); + let selected_algorithms = serde_json::from_str::>(&selected_algorithms) + .unwrap_or_else(|err| panic!("Failed to parse {:?}: {}", selected_algorithms, err)); + + async fn do_work( + master_url: &String, + num_workers: u32, + selected_algorithms: &HashMap, + ) -> Result<()> { + println!("[slave] fetching jobs from master at: {}", master_url); + let available_jobs = match get::(&format!("{}/jobs", master_url), None).await { + Ok(resp) => dejsonify::>(&resp).unwrap(), Err(e) => { - println!("Error getting job: {:?}", e); - sleep(5000).await; - continue; + return Err(format!("failed to fetch jobs from master: {:?}", e)); } }; - - if job != next_job { - println!("Ending job"); - - for nonce_iter in nonce_iters.iter() { - (*(*nonce_iter).lock().await).empty(); - } - nonce_iters.clear(); - solutions_data = Arc::new(Mutex::new(Vec::::new())); - solutions_count = Arc::new(Mutex::new(0u32)); - num_solutions = 0; - if next_job - .as_ref() - .is_some_and(|x| x.sampled_nonces.is_none()) - { - let job = next_job.as_ref().unwrap(); - println!("Starting new job: {:?}", job); - println!( - "Downloading algorithm {}", - job.download_url.split("/").last().unwrap() - ); - let wasm = match benchmarker::download_wasm::execute(job).await { - Ok(wasm) => wasm, - Err(e) => { - println!("Error downloading wasm: {:?}", e); - sleep(5000).await; - continue; - } - }; - - println!("Getting nonce offset from master"); - let offset = match get::( - &format!("{}/nonce_offset/{:?}", master_url, hostname::get().unwrap()), - None, - ) - .await - { - Ok(resp) => dejsonify::(&resp).unwrap(), - Err(e) => { - println!("Error getting nonce offset: {:?}", e); - sleep(5000).await; - continue; - } - }; - println!("Got nonce offset: {}", offset); - - // variables that are shared by workers - nonce_iters = (0..num_workers) - .into_iter() - .map(|x| { - Arc::new(Mutex::new(NonceIterator::from_u64( - offset + u64::MAX / num_workers as u64 * x as u64, - ))) - }) - .collect(); - println!("Starting benchmark"); - benchmarker::run_benchmark::execute( - nonce_iters.iter().cloned().collect(), - job, - &wasm, - solutions_data.clone(), - solutions_count.clone(), - ) - .await; - } - - job = next_job; - } - if job.as_ref().is_some_and(|x| x.sampled_nonces.is_none()) { - let job = job.as_ref().unwrap(); - let mut solutions_data = solutions_data.lock().await; - let n = solutions_data.len(); - if n > 0 { - num_solutions += n as u32; - let data: Vec = solutions_data.drain(..).collect(); - println!("Posting {} solutions", n); - if let Err(e) = post::( - &format!("{}/solutions_data/{}", master_url, job.benchmark_id), - &jsonify(&data), - Some(vec![( - "Content-Type".to_string(), - "application/json".to_string(), - )]), - ) - .await - { - println!("Error posting solutions data: {:?}", e); - sleep(5000).await; - continue; - } - } - let mut num_attempts = 0; - for nonce_iter in nonce_iters.iter().cloned() { - let nonce_iter = (*nonce_iter).lock().await; - num_attempts += nonce_iter.attempts(); - } + println!("[slave] fetched {} jobs", available_jobs.len()); + for (i, job) in available_jobs.values().enumerate() { println!( - "Computed {} solutions out of {} instances", - num_solutions, num_attempts + "[slave] job {}: {:?}, weight: {}", + i, job.settings, job.weight ); - sleep(100).await; - } else { - println!("No job, sleeping 100ms"); - sleep(100).await; } + println!( + "[slave] filtering jobs that match selected_algorithms: {:?}", + selected_algorithms + ); + let filtered_jobs: HashMap = available_jobs + .into_iter() + .filter(|(benchmark_id, _)| { + selected_algorithms + .iter() + .any(|(challenge_name, algorithm_name)| { + benchmark_id.starts_with(&format!("{}_{}", challenge_name, algorithm_name)) + }) + }) + .collect(); + + if filtered_jobs.len() == 0 { + return Err("no jobs matching selected_algorithms".to_string()); + } + + let job = benchmarker::select_job::execute(&filtered_jobs).await?; + println!( + "[slave]: downloading algorithm {}", + job.download_url.split("/").last().unwrap() + ); + let wasm = benchmarker::download_wasm::execute(&job).await?; + + println!("[slave]: starting benchmark {:?}", job.settings); + let nonce_iterators = job.create_nonce_iterators(num_workers); + benchmarker::run_benchmark::execute(nonce_iterators.clone(), &job, &wasm).await; + while time() < job.timestamps.end { + { + let mut num_attempts = 0; + for nonce_iterator in &nonce_iterators { + num_attempts += nonce_iterator.lock().await.num_attempts(); + } + let num_solutions = job.solutions_data.lock().await.len() as u32; + let elapsed = time() - job.timestamps.start; + println!( + "[slave]: #solutions: {}, #instances: {}, elapsed: {}ms", + num_solutions, num_attempts, elapsed + ); + } + sleep(500).await; + } + + let mut num_attempts = 0; + for nonce_iterator in &nonce_iterators { + num_attempts += nonce_iterator.lock().await.num_attempts(); + } + if num_attempts > 0 { + let solutions_data = &*job.solutions_data.lock().await; + println!( + "[slave]: posting {} solutions to master", + solutions_data.len() + ); + post::( + &format!("{}/solutions_data/{}", master_url, job.benchmark_id), + &jsonify(&solutions_data), + Some(vec![( + "Content-Type".to_string(), + "application/json".to_string(), + )]), + ) + .await + .map_err(|e| format!("failed to post solutions to master: {:?}", e))?; + } + + Ok(()) + } + + let master_url = format!("http://{}:{}", master, port); + loop { + if let Err(e) = do_work(&master_url, num_workers, &selected_algorithms).await { + println!("[slave]: error: {:?}", e); + println!("[slave]: sleeping 5s"); + sleep(5000).await; + } + yield_now().await; } } -async fn master_node( +async fn master( api_url: String, api_key: String, player_id: String, num_workers: u32, duration: u32, - algorithms_path: &PathBuf, + delay: u32, + selected_algorithms: String, port: u16, - nonce_offset: u64, ) { + println!( + "[master] parsing selected algorithms from: {:?}", + selected_algorithms + ); + let selected_algorithms = serde_json::from_str::>(&selected_algorithms) + .unwrap_or_else(|err| panic!("Failed to parse {:?}: {}", selected_algorithms, err)); + println!("[master] setting up"); benchmarker::setup(api_url, api_key, player_id).await; - benchmarker::start(num_workers, duration).await; - future_utils::spawn(async move { - let offsets = Arc::new(Mutex::new(HashMap::new())); - let get_nonce_offset = warp::path!("nonce_offset" / String) - .and(warp::get()) - .and(warp::any().map(move || offsets.clone())) - .and_then( - move |slave_id: String, offsets: Arc>>| async move { - let offsets = &mut (*offsets).lock().await; - let len = offsets.len() as u64; - let o = offsets - .entry(slave_id) - .or_insert_with(|| (len + 1) * nonce_offset); - Ok::<_, warp::Rejection>(warp::reply::json(&o)) - }, - ); - let get_job = warp::path("job").and(warp::get()).and_then(|| async { + println!("[master] starting data_fetcher"); + spawn(async { + benchmarker::data_fetcher().await; + }); + println!("[master] starting benchmark_submitter"); + spawn(async { + benchmarker::benchmark_submitter().await; + }); + println!("[master] starting proof_submitter"); + spawn(async { + benchmarker::proof_submitter().await; + }); + println!("[master] starting benchmarker"); + spawn(async move { + benchmarker::benchmarker( + selected_algorithms, + num_workers, + duration as u64, + delay as u64, + ) + .await; + }); + println!("[master] starting webserver on port {}", port); + spawn(async move { + let get_jobs = warp::path("jobs").and(warp::get()).and_then(|| async { + println!("[master] slave node fetching jobs",); let state = (*benchmarker::state()).lock().await; - Ok::<_, warp::Rejection>(warp::reply::json(&state.job)) + Ok::<_, warp::Rejection>(warp::reply::json(&state.available_jobs)) }); let post_solutions_data = warp::path!("solutions_data" / String) .and(warp::post()) .and(warp::body::json()) .and_then( - |benchmark_id: String, mut solutions_data: Vec| async move { - benchmarker::drain_solutions(&benchmark_id, &mut solutions_data).await; + |benchmark_id: String, solutions_data: HashMap| async move { + { + let num_solutions = solutions_data.len() as u32; + println!("[master] received {} solutions from slave", num_solutions,); + let mut state = (*benchmarker::state()).lock().await; + let State { + available_jobs, + pending_benchmark_jobs, + difficulty_samplers, + .. + } = &mut *state; + if let Some(job) = available_jobs + .get_mut(&benchmark_id) + .or_else(|| pending_benchmark_jobs.get_mut(&benchmark_id)) + { + println!("[master] adding solutions to benchmark {:?}", job.settings,); + difficulty_samplers + .get_mut(&job.settings.challenge_id) + .unwrap() + .update_with_solutions(&job.settings.difficulty, num_solutions); + job.solutions_data.lock().await.extend(solutions_data); + } else { + println!("[master] failed to find benchmark to add solutions to",); + } + } Ok::<_, warp::Rejection>(warp::reply::with_status( - "SolutionsData received", + "solutions received", warp::http::StatusCode::OK, )) }, ); - warp::serve(get_nonce_offset.or(get_job).or(post_solutions_data)) + warp::serve(get_jobs.or(post_solutions_data)) .run(([0, 0, 0, 0], port)) .await; }); loop { - let selection = serde_json::from_str::>( - &fs::read_to_string(algorithms_path).unwrap(), - ) - .unwrap(); - for (challenge_id, algorithm_id) in selection { - benchmarker::select_algorithm(challenge_id, algorithm_id).await; - } - future_utils::sleep(10000).await; + sleep(30000).await; } } diff --git a/tig-benchmarker/src/utils.rs b/tig-benchmarker/src/utils.rs new file mode 100644 index 0000000..fbc8877 --- /dev/null +++ b/tig-benchmarker/src/utils.rs @@ -0,0 +1,15 @@ +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::time; + +pub type Result = std::result::Result; + +pub async fn sleep(ms: u64) { + time::sleep(time::Duration::from_millis(ms)).await; +} + +pub fn time() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 +}