Benchmarker revamp.

This commit is contained in:
FiveMovesAhead 2024-08-28 19:19:02 +08:00
parent 27d845f1ed
commit 73c83ba5d7
22 changed files with 910 additions and 1172 deletions

View File

@ -9,14 +9,12 @@ edition.workspace = true
[dependencies]
anyhow = "1.0.81"
clap = { version = "4.5.4", optional = true }
clap = { version = "4.5.4" }
cudarc = { version = "0.12.0", features = [
"cuda-version-from-build-system",
], optional = true }
futures = { version = "0.3.30" }
gloo-timers = { version = "0.3.0", optional = true, features = ["futures"] }
hostname = { version = "0.4", optional = true }
js-sys = { version = "0.3.68", optional = true }
hostname = { version = "0.4" }
once_cell = "1.19.0"
rand = { version = "0.8.5", default-features = false, features = ["std_rng"] }
rand_distr = { version = "0.4.3", default-features = false, features = [
@ -24,43 +22,19 @@ rand_distr = { version = "0.4.3", default-features = false, features = [
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.113" }
serde-wasm-bindgen = { version = "0.6.5", optional = true }
tig-algorithms = { path = "../tig-algorithms" }
tig-api = { path = "../tig-api" }
tig-api = { path = "../tig-api", features = ["request"] }
tig-challenges = { path = "../tig-challenges" }
tig-structs = { path = "../tig-structs" }
tig-utils = { path = "../tig-utils" }
tig-worker = { path = "../tig-worker" }
tokio = { version = "1.37.0", features = ["full"], optional = true }
wasm-bindgen = { version = "0.2.91", features = [
"serde-serialize",
], optional = true }
wasm-bindgen-futures = { version = "0.4.41", optional = true }
warp = { version = "0.3.7", optional = true }
web-sys = { version = "0.3.68", features = ['console'], optional = true }
[lib]
crate-type = ["cdylib", "rlib"]
tokio = { version = "1.37.0", features = ["full"] }
warp = { version = "0.3.7" }
[features]
default = ["browser"]
default = ["standalone"]
cuda = ["cudarc", "tig-algorithms/cuda"]
standalone = [
"dep:clap",
"dep:tokio",
"tig-api/request",
"dep:warp",
"dep:hostname",
]
browser = [
"dep:gloo-timers",
"dep:wasm-bindgen",
"dep:js-sys",
"dep:serde-wasm-bindgen",
"dep:wasm-bindgen-futures",
"dep:web-sys",
"tig-api/request-js",
]
standalone = []
# c001_a001 = []

View File

@ -2,23 +2,7 @@
A Rust crate that implements a Benchmarker for TIG.
## Browser Benchmarker
`tig-benchmarker` can be compiled to WASM with bindings for browsers.
The browser version is deployed to https://play.tig.foundation/benchmarker
To build & run it locally, run the following commands before visiting localhost in your browser:
```
# uncomment below to install wasm-pack
# curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
cd tig-benchmarker
wasm-pack build --release --target web
python3 -m http.server 80
```
## Standalone Benchmarker
## Compiling Your Benchmarker
`tig-benchmarker` can be compiled into an executable for running standalone, or in slave mode (see notes)
@ -28,10 +12,10 @@ There are two ways to start the master benchmarker:
```
ALGOS_TO_COMPILE="" # See notes
# USE_CUDA="cuda" # See notes
cargo build -p tig-benchmarker --release --no-default-features --features "standalone ${ALGOS_TO_COMPILE} ${USE_CUDA}"
cargo build -p tig-benchmarker --release --no-default-features --features "${ALGOS_TO_COMPILE} ${USE_CUDA}"
# edit below line for your own algorithm selection
echo '{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"optimal_ann"}' > algo_selection.json
./target/release/tig-benchmarker <address> <api_key> algo_selection.json
SELECTED_ALGORITHMS='{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"basic"}'
./target/release/tig-benchmarker <address> <api_key> $SELECTED_ALGORITHMS
```
2. Compile executable in a docker, and run the docker:
@ -40,8 +24,8 @@ There are two ways to start the master benchmarker:
# USE_CUDA="cuda" # See notes
docker build -f tig-benchmarker/Dockerfile --build-arg features="${ALGOS_TO_COMPILE} ${USE_CUDA}" -t tig-benchmarker .
# edit below line for your own algorithm selection
echo '{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"optimal_ann"}' > algo_selection.json
docker run -it -v $(pwd):/app tig-benchmarker <address> <api_key> algo_selection.json
SELECTED_ALGORITHMS='{"satisfiability":"schnoing","vehicle_routing":"clarke_wright","knapsack":"dynamic","vector_search":"optimal_ann"}'
docker run -it -v $(pwd):/app tig-benchmarker <address> <api_key> $SELECTED_ALGORITHMS
```
**Notes:**
@ -56,11 +40,12 @@ There are two ways to start the master benchmarker:
ALGOS_TO_COMPILE="satisfiability_schnoing vehicle_routing_clarke_wright knapsack_dynamic vector_search_optimal_ann"
```
* Every 10 seconds, the benchmarker reads your json file path and uses the contents to update its algorithm selection.
* You can see available algorithms in the dropdowns of the [Benchmarker UI](https://play.tig.foundation/benchmarker)
* Alternatively, you can use [`script\list_algorithms.sh`](../scripts/list_algorithms.sh)
* `tig-benchmarker` starts a master node by default. The port can be set with `--port <port>` (default 5115)
* `tig-benchmarker` that are started with the option `--master <hostname>` are ran as slaves and will poll the master for jobs
* slaves will ignore any job that doesn't match their algorithm selection
* one possible setup is to run the master with `--workers 0`, and then run a separate slave for each challenge with different number of workers
* `tig-benchmarker` can be executed with `--help` to see all options including setting the number of workers, and setting the duration of a benchmark
* Uncomment `# USE_CUDA="cuda"` to compile `tig-benchmarker` to use CUDA optimisations where they are available.
* You must have a CUDA compatible GPU with CUDA toolkit installed

View File

@ -1,26 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8" />
<title>TIG Benchmarker Test</title>
</head>
<body>
<h1>TIG Browser Benchmarker</h1>
<h2>Instructions:</h2>
<ol>
<li>Open the browser console</li>
<li>Setup: `await benchmarker.setup(&lt;api_url&gt;, &lt;api_key&gt;, &lt;player_id&gt;)`</li>
<li>Select Algorithm: `await benchmarker.select_algorithm(&lt;challenge_name&gt;, &lt;algorithm_name&gt;)`</li>
<li>Start: `await benchmarker.start(&lt;num_workers&gt;, &lt;ms_per_benchmark&gt;)`</li>
<li>View State: `await benchmarker.state()`</li>
<li>Stop: `await benchmarker.stop()`</li>
</ol>
<h2>Notes:</h2>
<ul>
<li>challenge_name should be one of satisfiability, vehicle_routing, knapsack</li>
<li>algorithm_name can be found in the dropdowns of <a href="https://play.tig.foundation/benchmarker">Benchmarker UI</a></li>
<li>recommended ms_per_benchmark is 15000 (15s)</li>
</ul>
<script type="module" src="./index.js"></script>
</body>
</html>

View File

@ -1,12 +0,0 @@
import init, { setup, state, start, stop, select_algorithm } from './pkg/tig_benchmarker.js';
async function loadWasm() {
console.log("Loading Benchmarker WASM");
await init("./pkg/tig_benchmarker_bg.wasm");
window.benchmarker = {
setup, state, start, stop, select_algorithm
};
console.log("Benchmarker WASM initialized and functions are available globally");
}
loadWasm();

View File

@ -1,14 +1,14 @@
use super::{Job, NonceIterator};
use crate::future_utils;
use crate::utils::time;
use cudarc::driver::*;
use cudarc::nvrtc::{compile_ptx, Ptx};
use future_utils::{spawn, time, yield_now, Mutex};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use tig_algorithms::{c001, c002, c003, c004, CudaKernel};
use tig_challenges::ChallengeTrait;
use tig_worker::{compute_solution, verify_solution, SolutionData};
use tig_worker::{compute_solution, verify_solution};
use tokio::{spawn, sync::Mutex, task::yield_now};
static PTX_CACHE: OnceCell<Mutex<HashMap<String, Ptx>>> = OnceCell::new();
@ -18,7 +18,6 @@ pub async fn get_or_compile_cuda(
dev: &Arc<CudaDevice>,
) -> HashMap<&'static str, CudaFunction> {
if kernel.is_none() {
println!("No CUDA optimisations available for '{}'", key);
return HashMap::new();
}
let kernel = kernel.as_ref().unwrap();
@ -55,31 +54,23 @@ pub async fn get_or_compile_cuda(
.collect()
}
pub async fn execute(
nonce_iters: Vec<Arc<Mutex<NonceIterator>>>,
job: &Job,
wasm: &Vec<u8>,
solutions_data: Arc<Mutex<Vec<SolutionData>>>,
solutions_count: Arc<Mutex<u32>>,
) {
for nonce_iter in nonce_iters {
pub async fn execute(nonce_iterators: Vec<Arc<Mutex<NonceIterator>>>, job: &Job, wasm: &Vec<u8>) {
for nonce_iterator in nonce_iterators {
let job = job.clone();
let wasm = wasm.clone();
let solutions_data = solutions_data.clone();
let solutions_count = solutions_count.clone();
spawn(async move {
let mut last_yield = time();
let dev = CudaDevice::new(0).expect("Failed to create CudaDevice");
let mut challenge_cuda_funcs: Option<HashMap<&'static str, CudaFunction>> = None;
let mut algorithm_cuda_funcs: Option<HashMap<&'static str, CudaFunction>> = None;
loop {
match {
let mut nonce_iter = (*nonce_iter).lock().await;
(*nonce_iter).next()
} {
let now = time();
if now >= job.timestamps.end {
break;
}
match { nonce_iterator.lock().await.next() } {
None => break,
Some(nonce) => {
let now = time();
if now - last_yield > 25 {
yield_now().await;
last_yield = now;
@ -12321,15 +12312,11 @@ pub async fn execute(
if verify_solution(&job.settings, nonce, &solution_data.solution)
.is_ok()
{
{
let mut solutions_count = (*solutions_count).lock().await;
*solutions_count += 1;
}
if solution_data.calc_solution_signature()
<= job.solution_signature_threshold
{
let mut solutions_data = (*solutions_data).lock().await;
(*solutions_data).push(solution_data);
let mut solutions_data = job.solutions_data.lock().await;
(*solutions_data).insert(nonce, solution_data);
}
}
}

View File

@ -96,6 +96,15 @@ impl DifficultySampler {
}
pub fn update_with_solutions(&mut self, difficulty: &Vec<i32>, num_solutions: u32) {
if difficulty[0] < self.min_difficulty[0]
|| difficulty[1] < self.min_difficulty[1]
|| difficulty[0]
>= self.min_difficulty[0] + self.dimensions[0] as i32 + self.padding[0] as i32
|| difficulty[1]
>= self.min_difficulty[1] + self.dimensions[1] as i32 + self.padding[1] as i32
{
return;
}
let (x, y) = (
(difficulty[0] - self.min_difficulty[0]) as usize,
(difficulty[1] - self.min_difficulty[1]) as usize,
@ -110,13 +119,15 @@ impl DifficultySampler {
}
let decay = dist * (1.0 - DECAY) + DECAY;
let delta = (1.0 - decay) * num_solutions as f32 * SOLUTIONS_MULTIPLIER;
self.weights[x + x_offset][y + y_offset].solutions *= decay;
self.weights[x + x_offset][y + y_offset].solutions += delta;
if x_offset != 0 && x >= x_offset {
if x + x_offset < self.weights.len() && y + y_offset < self.weights[x].len() {
self.weights[x + x_offset][y + y_offset].solutions *= decay;
self.weights[x + x_offset][y + y_offset].solutions += delta;
}
if x_offset != 0 && x >= x_offset && y + y_offset < self.weights[x].len() {
self.weights[x - x_offset][y + y_offset].solutions *= decay;
self.weights[x - x_offset][y + y_offset].solutions += delta;
}
if y_offset != 0 && y >= y_offset {
if x + x_offset < self.weights.len() && y_offset != 0 && y >= y_offset {
self.weights[x + x_offset][y - y_offset].solutions *= decay;
self.weights[x + x_offset][y - y_offset].solutions += delta;
}

View File

@ -1,8 +1,8 @@
use super::{Job, Result};
use crate::future_utils::Mutex;
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use tig_utils::get;
use tokio::sync::Mutex;
static CACHE: OnceCell<Mutex<HashMap<String, Vec<u8>>>> = OnceCell::new();

View File

@ -0,0 +1,36 @@
use super::{state, Job, QueryData, Result, State};
use crate::utils::time;
pub async fn execute() -> Result<Option<Job>> {
let mut state = state().lock().await;
let State {
query_data,
pending_benchmark_jobs,
..
} = &mut *state;
let QueryData {
proofs,
benchmarks,
frauds,
..
} = &query_data;
let now = time();
let mut pending_benchmark_ids = pending_benchmark_jobs
.iter()
.filter(|(_, job)| now >= job.timestamps.submit)
.map(|(id, _)| id.clone())
.collect::<Vec<String>>();
pending_benchmark_ids.sort_by_key(|id| pending_benchmark_jobs[id].timestamps.submit);
for benchmark_id in pending_benchmark_ids {
let job = pending_benchmark_jobs.remove(&benchmark_id).unwrap();
if benchmarks.contains_key(&benchmark_id)
|| proofs.contains_key(&benchmark_id)
|| frauds.contains_key(&benchmark_id)
|| job.solutions_data.lock().await.len() == 0
{
continue;
}
return Ok(Some(job));
}
Ok(None)
}

View File

@ -0,0 +1,60 @@
use crate::utils::time;
use super::{state, Job, QueryData, Result, State, Timestamps};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
pub async fn execute(benchmark_duration_ms: u64) -> Result<Option<Job>> {
let State {
query_data,
submitted_proof_ids,
pending_proof_jobs,
..
} = &*state().lock().await;
let QueryData {
latest_block,
benchmarks,
proofs,
frauds,
download_urls,
..
} = query_data;
for (benchmark_id, benchmark) in benchmarks.iter() {
if !pending_proof_jobs.contains_key(benchmark_id)
&& !submitted_proof_ids.contains(benchmark_id)
&& !frauds.contains_key(benchmark_id)
&& !proofs.contains_key(benchmark_id)
&& benchmark.state.is_some()
{
let sampled_nonces = benchmark.state().sampled_nonces.clone().ok_or_else(|| {
format!(
"Expecting benchmark '{}' to have sampled_nonces",
benchmark_id
)
})?;
let start = time();
let end = start + benchmark_duration_ms;
let submit = end;
return Ok(Some(Job {
benchmark_id: benchmark.id.clone(),
download_url: download_urls
.get(&benchmark.settings.algorithm_id)
.cloned()
.ok_or_else(|| {
format!(
"Expecting download_url for algorithm '{}'",
benchmark.settings.algorithm_id
)
})?,
settings: benchmark.settings.clone(),
solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud
sampled_nonces: Some(sampled_nonces),
wasm_vm_config: latest_block.config().wasm_vm.clone(),
weight: 0.0,
timestamps: Timestamps { start, end, submit },
solutions_data: Arc::new(Mutex::new(HashMap::new())),
}));
}
}
Ok(None)
}

View File

@ -1,33 +1,38 @@
use super::{state, QueryData, Result};
use std::collections::HashSet;
use tig_worker::SolutionData;
use super::{state, Job, QueryData, Result, State};
use crate::utils::time;
pub async fn execute() -> Result<Option<(String, Vec<SolutionData>)>> {
pub async fn execute() -> Result<Option<Job>> {
let mut state = state().lock().await;
let State {
query_data,
pending_proof_jobs,
..
} = &mut *state;
let QueryData {
proofs,
benchmarks,
frauds,
..
} = &mut state().lock().await.query_data;
for (benchmark_id, proof) in proofs.iter_mut() {
if proof.solutions_data.is_none() || frauds.contains_key(benchmark_id) {
continue;
}
if let Some(state) = &benchmarks[benchmark_id].state {
let sampled_nonces: HashSet<u64> =
state.sampled_nonces.clone().unwrap().into_iter().collect();
let mut solutions_data = proof.solutions_data.take().unwrap();
solutions_data.retain(|x| sampled_nonces.contains(&x.nonce));
let extracted_nonces: HashSet<u64> = solutions_data.iter().map(|x| x.nonce).collect();
if extracted_nonces != sampled_nonces {
return Err(format!(
"No solutions for sampled nonces: '{:?}'",
sampled_nonces
.difference(&extracted_nonces)
.collect::<Vec<_>>()
));
} = &query_data;
let now = time();
let mut pending_proof_ids = pending_proof_jobs
.iter()
.filter(|(_, job)| now >= job.timestamps.submit)
.map(|(id, _)| id.clone())
.collect::<Vec<String>>();
pending_proof_ids.sort_by_key(|id| pending_proof_jobs[id].timestamps.submit);
for benchmark_id in pending_proof_ids {
if let Some(sampled_nonces) = benchmarks
.get(&benchmark_id)
.and_then(|b| b.state.as_ref())
.and_then(|s| s.sampled_nonces.as_ref())
{
let mut job = pending_proof_jobs.remove(&benchmark_id).unwrap();
if proofs.contains_key(&benchmark_id) || frauds.contains_key(&benchmark_id) {
continue;
}
return Ok(Some((benchmark_id.clone(), solutions_data)));
job.sampled_nonces = Some(sampled_nonces.clone());
return Ok(Some(job));
}
}
Ok(None)

View File

@ -1,8 +1,11 @@
mod difficulty_sampler;
pub mod download_wasm;
mod find_benchmark_to_submit;
mod find_proof_to_recompute;
mod find_proof_to_submit;
mod query_data;
mod setup_job;
pub mod select_job;
mod setup_jobs;
mod submit_benchmark;
mod submit_proof;
@ -12,18 +15,21 @@ pub mod run_benchmark;
#[path = "cuda_run_benchmark.rs"]
pub mod run_benchmark;
use crate::future_utils::{sleep, spawn, time, Mutex};
use crate::utils::{sleep, time, Result};
use difficulty_sampler::DifficultySampler;
use once_cell::sync::OnceCell;
use rand::{rngs::StdRng, Rng, SeedableRng};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tig_api::Api;
use tig_structs::{
config::{MinMaxDifficulty, WasmVMConfig},
core::*,
};
pub type Result<T> = std::result::Result<T, String>;
use tokio::{sync::Mutex, task::yield_now};
#[derive(Serialize, Clone, Debug)]
pub struct QueryData {
@ -37,31 +43,48 @@ pub struct QueryData {
pub frauds: HashMap<String, Fraud>,
}
#[derive(Serialize, Clone, Debug)]
pub struct Timer {
pub start: u64,
pub end: u64,
pub now: u64,
pub struct NonceIterator {
start: u64,
current: u64,
end: u64,
}
impl Timer {
fn new(ms: u64) -> Self {
let now = time();
Timer {
start: now,
end: now + ms,
now,
impl NonceIterator {
pub fn new(start: u64, end: u64) -> Self {
NonceIterator {
start,
current: start,
end,
}
}
fn update(&mut self) -> &Self {
self.now = time();
self
}
fn finished(&self) -> bool {
self.now >= self.end
pub fn num_attempts(&self) -> u64 {
self.current - self.start
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
impl Iterator for NonceIterator {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.current < self.end {
let result = Some(self.current);
self.current += 1;
result
} else {
None
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Timestamps {
pub start: u64,
pub end: u64,
pub submit: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
pub download_url: String,
pub benchmark_id: String,
@ -69,78 +92,42 @@ pub struct Job {
pub solution_signature_threshold: u32,
pub sampled_nonces: Option<Vec<u64>>,
pub wasm_vm_config: WasmVMConfig,
pub weight: f64,
pub timestamps: Timestamps,
#[serde(skip)]
pub solutions_data: Arc<Mutex<HashMap<u64, SolutionData>>>,
}
#[derive(Serialize, Debug, Clone)]
pub struct NonceIterator {
nonces: Option<Vec<u64>>,
current: u64,
attempts: u64,
}
impl NonceIterator {
pub fn from_vec(nonces: Vec<u64>) -> Self {
Self {
nonces: Some(nonces),
current: 0,
attempts: 0,
}
}
pub fn from_u64(start: u64) -> Self {
Self {
nonces: None,
current: start,
attempts: 0,
}
}
pub fn attempts(&self) -> u64 {
self.attempts
}
pub fn is_empty(&self) -> bool {
self.nonces.as_ref().is_some_and(|x| x.is_empty()) || self.current == u64::MAX
}
pub fn empty(&mut self) {
if let Some(nonces) = self.nonces.as_mut() {
nonces.clear();
}
self.current = u64::MAX;
}
}
impl Iterator for NonceIterator {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if let Some(nonces) = self.nonces.as_mut() {
let value = nonces.pop();
self.attempts += value.is_some() as u64;
value
} else if self.current < u64::MAX {
let value = Some(self.current);
self.attempts += 1;
self.current += 1;
value
} else {
None
impl Job {
pub fn create_nonce_iterators(&self, num_workers: u32) -> Vec<Arc<Mutex<NonceIterator>>> {
match &self.sampled_nonces {
Some(sampled_nonces) => sampled_nonces
.iter()
.map(|&n| Arc::new(Mutex::new(NonceIterator::new(n, n + 1))))
.collect(),
None => {
let mut rng = StdRng::seed_from_u64(time());
let offset = u64::MAX / (num_workers as u64 + 1);
let random_offset = rng.gen_range(0..offset);
(0..num_workers)
.map(|i| {
let start = random_offset + offset * i as u64;
let end = start + offset;
Arc::new(Mutex::new(NonceIterator::new(start, end)))
})
.collect()
}
}
}
}
#[derive(Serialize, Debug, Clone, PartialEq)]
pub enum Status {
Starting,
Running(String),
Stopping,
Stopped,
}
#[derive(Serialize, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct State {
pub status: Status,
pub timer: Option<Timer>,
pub query_data: QueryData,
pub selected_algorithms: HashMap<String, String>,
pub job: Option<Job>,
pub submission_errors: HashMap<String, String>,
#[serde(skip_serializing)]
pub available_jobs: HashMap<String, Job>,
pub pending_benchmark_jobs: HashMap<String, Job>,
pub pending_proof_jobs: HashMap<String, Job>,
pub submitted_proof_ids: HashSet<String>,
pub difficulty_samplers: HashMap<String, DifficultySampler>,
}
@ -160,305 +147,285 @@ pub fn state() -> &'static Mutex<State> {
STATE.get().expect("STATE should be initialised")
}
async fn update_status(status: &str) {
let mut state = state().lock().await;
if let Status::Running(_) = state.status {
state.status = Status::Running(status.to_string());
println!("{}", status);
#[cfg(feature = "browser")]
web_sys::console::log_1(&status.to_string().into());
}
}
async fn run_once(num_workers: u32, ms_per_benchmark: u32) -> Result<()> {
{
let mut state = (*state()).lock().await;
state.job = None;
state.timer = None;
}
update_status("Querying latest data").await;
// retain only benchmarks that are within the lifespan period
// preserves solution_meta_data and solution_data
let mut new_query_data = query_data::execute().await?;
if {
let state = (*state()).lock().await;
state.query_data.latest_block.id != new_query_data.latest_block.id
} {
{
let mut state = (*state()).lock().await;
let block_started_cutoff = new_query_data.latest_block.details.height.saturating_sub(
new_query_data
.latest_block
.config()
.benchmark_submissions
.lifespan_period,
);
let mut latest_benchmarks = state.query_data.benchmarks.clone();
latest_benchmarks.retain(|_, x| x.details.block_started >= block_started_cutoff);
latest_benchmarks.extend(new_query_data.benchmarks.drain());
let mut latest_proofs = state.query_data.proofs.clone();
latest_proofs.retain(|id, _| latest_benchmarks.contains_key(id));
latest_proofs.extend(new_query_data.proofs.drain());
let mut latest_frauds = state.query_data.frauds.clone();
latest_frauds.retain(|id, _| latest_benchmarks.contains_key(id));
latest_frauds.extend(new_query_data.frauds.drain());
(*state)
.submission_errors
.retain(|id, _| latest_benchmarks.contains_key(id));
new_query_data.benchmarks = latest_benchmarks;
new_query_data.proofs = latest_proofs;
new_query_data.frauds = latest_frauds;
(*state).query_data = new_query_data;
}
update_status("Updating difficulty sampler with query data").await;
{
let mut state = state().lock().await;
let State {
query_data,
difficulty_samplers,
..
} = &mut (*state);
for challenge in query_data.challenges.iter() {
let difficulty_sampler = difficulty_samplers
.entry(challenge.id.clone())
.or_insert_with(|| DifficultySampler::new());
let min_difficulty = query_data.latest_block.config().difficulty.parameters
[&challenge.id]
.min_difficulty();
difficulty_sampler.update_with_block_data(&min_difficulty, challenge.block_data());
}
}
}
update_status("Finding proof to submit").await;
match find_proof_to_submit::execute().await? {
Some((benchmark_id, solutions_data)) => {
update_status(&format!("Submitting proof for {}", benchmark_id)).await;
if let Err(e) = submit_proof::execute(benchmark_id.clone(), solutions_data).await {
let mut state = state().lock().await;
state.submission_errors.insert(benchmark_id, e.clone());
return Err(e);
}
update_status(&format!("Success. Proof {} submitted", benchmark_id)).await;
}
None => {
update_status("No proof to submit").await;
}
}
// creates a benchmark & proof with job.benchmark_id
update_status("Selecting settings to benchmark").await;
setup_job::execute().await?;
let job = {
let state = state().lock().await;
state.job.clone().unwrap()
};
update_status(&format!("{:?}", job.settings)).await;
update_status(&format!(
"Downloading algorithm {}",
job.download_url.split("/").last().unwrap()
))
.await;
let wasm = download_wasm::execute(&job).await?;
// variables that are shared by workers
let nonce_iters = match &job.sampled_nonces {
Some(nonces) => vec![Arc::new(Mutex::new(NonceIterator::from_vec(
nonces.clone(),
)))],
None => (0..num_workers)
.into_iter()
.map(|x| {
Arc::new(Mutex::new(NonceIterator::from_u64(
u64::MAX / num_workers as u64 * x as u64,
)))
})
.collect(),
};
let solutions_data = Arc::new(Mutex::new(Vec::<SolutionData>::new()));
let solutions_count = Arc::new(Mutex::new(0u32));
update_status("Starting benchmark").await;
run_benchmark::execute(
nonce_iters.iter().cloned().collect(),
&job,
&wasm,
solutions_data.clone(),
solutions_count.clone(),
)
.await;
{
let mut state = state().lock().await;
(*state).timer = Some(Timer::new(ms_per_benchmark as u64));
}
loop {
{
// transfers solutions computed by workers to benchmark state
let num_solutions =
drain_solutions(&job.benchmark_id, &mut *(*solutions_data).lock().await).await;
let mut finished = true;
let mut num_attempts = 0;
for nonce_iter in nonce_iters.iter().cloned() {
let nonce_iter = (*nonce_iter).lock().await;
num_attempts += nonce_iter.attempts();
finished &= nonce_iter.is_empty();
}
update_status(&format!(
"Computed {} solutions out of {} instances",
num_solutions, num_attempts
))
.await;
let State {
status,
timer: time_left,
..
} = &mut (*state().lock().await);
if time_left.as_mut().unwrap().update().finished()
|| (finished && num_solutions == (num_attempts as u32)) // nonce_iter is only empty if recomputing
|| *status == Status::Stopping
{
break;
}
}
sleep(200).await;
}
for nonce_iter in nonce_iters {
(*(*nonce_iter).lock().await).empty();
}
// transfers solutions computed by workers to benchmark state
let num_solutions =
drain_solutions(&job.benchmark_id, &mut *(*solutions_data).lock().await).await;
if let Some(sampled_nonces) = job.sampled_nonces.as_ref() {
if num_solutions != sampled_nonces.len() as u32 {
let mut state = (*state()).lock().await;
(*state)
.query_data
.proofs
.get_mut(&job.benchmark_id)
.unwrap()
.solutions_data
.take();
return Err(format!(
"Failed to recompute solutions for {}",
pub async fn proof_submitter() {
async fn do_work() -> Result<()> {
println!("[proof_submitter]: checking for any proofs to submit");
if let Some(mut job) = find_proof_to_submit::execute().await? {
println!(
"[proof_submitter]: submitting proof for benchmark {}",
job.benchmark_id
));
} else {
update_status(&format!(
"Finished. Recompute solutions for {}",
job.benchmark_id
))
.await;
sleep(5000).await;
}
} else {
update_status("Updating difficulty sampler with solutions").await;
{
let num_solutions = *solutions_count.lock().await;
let mut state = state().lock().await;
state
.difficulty_samplers
.get_mut(&job.settings.challenge_id)
.unwrap()
.update_with_solutions(&job.settings.difficulty, num_solutions);
}
if num_solutions == 0 {
update_status("Finished. No solutions to submit").await;
} else {
update_status(&format!("Finished. Submitting {} solutions", num_solutions,)).await;
let benchmark_id = match submit_benchmark::execute(&job).await {
Ok(benchmark_id) => benchmark_id,
Err(e) => {
let mut state = (*state()).lock().await;
state
.submission_errors
.insert(job.benchmark_id.clone(), e.clone());
return Err(e);
}
};
update_status(&format!("Success. Benchmark {} submitted", benchmark_id)).await;
let mut state = (*state()).lock().await;
let QueryData {
benchmarks, proofs, ..
} = &mut (*state).query_data;
let mut benchmark = benchmarks.remove(&job.benchmark_id).unwrap();
let mut proof = proofs.remove(&job.benchmark_id).unwrap();
benchmark.id = benchmark_id.clone();
proof.benchmark_id = benchmark_id.clone();
benchmarks.insert(benchmark_id.clone(), benchmark);
proofs.insert(benchmark_id.clone(), proof);
}
}
Ok(())
}
pub async fn drain_solutions(benchmark_id: &String, solutions_data: &mut Vec<SolutionData>) -> u32 {
let mut state = (*state()).lock().await;
let QueryData {
benchmarks, proofs, ..
} = &mut (*state).query_data;
if let Some(benchmark) = benchmarks.get_mut(benchmark_id) {
let proof = proofs.get_mut(benchmark_id).unwrap();
if let Some(x) = benchmark.solutions_meta_data.as_mut() {
x.extend(
solutions_data
.iter()
.map(|x| SolutionMetaData::from(x.clone())),
);
benchmark.details.num_solutions = x.len() as u32;
}
let to_update = proof.solutions_data.as_mut().unwrap();
to_update.extend(solutions_data.drain(..));
to_update.len() as u32
} else {
0
}
}
pub async fn start(num_workers: u32, ms_per_benchmark: u32) {
{
let mut state = (*state()).lock().await;
if state.status != Status::Stopped {
return;
}
state.status = Status::Starting;
}
spawn(async move {
{
let mut state = (*state()).lock().await;
state.status = Status::Running("Starting".to_string());
}
loop {
{
let mut state = (*state()).lock().await;
if state.status == Status::Stopping {
state.status = Status::Stopped;
state.submitted_proof_ids.insert(job.benchmark_id.clone());
}
match submit_proof::execute(&job).await {
Ok(_) => {
println!(
"[proof_submitter]: successfully submitted proof for benchmark {}",
job.benchmark_id
);
}
Err(e) => {
println!("[proof_submitter]: failed to submit proof: {:?}", e);
// FIXME hacky way to check for 4xx status
if !e.contains("status: 4") {
println!("[proof_submitter]: re-queueing proof for another submit attempt 10s later");
job.timestamps.submit = time() + 10000;
let mut state = (*state()).lock().await;
state.submitted_proof_ids.remove(&job.benchmark_id);
state
.pending_proof_jobs
.insert(job.benchmark_id.clone(), job);
}
}
}
if let Err(e) = run_once(num_workers, ms_per_benchmark).await {
update_status(&format!("Error: {:?}", e)).await;
sleep(5000).await;
}
} else {
println!("[proof_submitter]: no proofs to submit");
}
});
}
pub async fn stop() {
let mut state = (*state()).lock().await;
match state.status {
Status::Running(_) => {
state.status = Status::Stopping;
Ok(())
}
loop {
if let Err(e) = do_work().await {
println!("[proof_submitter]: error: {:?}", e);
}
_ => {}
println!("[proof_submitter]: sleeping 5s");
sleep(5000).await;
}
}
pub async fn select_algorithm(challenge_name: String, algorithm_name: String) {
let mut state = (*state()).lock().await;
state
.selected_algorithms
.insert(challenge_name, algorithm_name);
pub async fn benchmark_submitter() {
async fn do_work() -> Result<()> {
println!("[benchmark_submitter]: checking for any benchmarks to submit");
if let Some(mut job) = find_benchmark_to_submit::execute().await? {
let num_solutions = {
let solutions_data = job.solutions_data.lock().await;
solutions_data.len()
};
println!(
"[benchmark_submitter]: submitting benchmark {:?} with {} solutions",
job.settings, num_solutions
);
match submit_benchmark::execute(&job).await {
Ok(benchmark_id) => {
job.benchmark_id = benchmark_id.clone();
job.timestamps.submit = time();
println!(
"[benchmark_submitter]: successfully submitted benchmark {}",
benchmark_id
);
{
let mut state = (*state()).lock().await;
state.pending_proof_jobs.insert(benchmark_id, job);
}
}
Err(e) => {
println!("[benchmark_submitter]: failed to submit benchmark: {:?}", e);
// FIXME hacky way to check for 4xx status
if !e.contains("status: 4") {
println!("[benchmark_submitter]: re-queueing benchmark for another submit attempt 10s later");
job.timestamps.submit = time() + 10000;
let mut state = (*state()).lock().await;
state
.pending_benchmark_jobs
.insert(job.benchmark_id.clone(), job);
}
}
}
} else {
println!("[benchmark_submitter]: no benchmarks to submit");
}
Ok(())
}
loop {
if let Err(e) = do_work().await {
println!("[benchmark_submitter]: error: {:?}", e);
}
println!("[benchmark_submitter]: sleeping 5s");
sleep(5000).await;
}
}
pub async fn data_fetcher() {
async fn do_work() -> Result<()> {
println!("[data_fetcher]: fetching latest data");
let new_query_data = query_data::execute().await?;
if {
let state = (*state()).lock().await;
state.query_data.latest_block.id != new_query_data.latest_block.id
} {
println!(
"[data_fetcher]: got new block {} @ {}",
new_query_data.latest_block.id, new_query_data.latest_block.details.height
);
{
let mut state = (*state()).lock().await;
(*state).query_data = new_query_data;
}
println!("[data_fetcher]: updating difficulty samplers");
{
let mut state = state().lock().await;
let State {
query_data,
difficulty_samplers,
..
} = &mut (*state);
for challenge in query_data.challenges.iter() {
let difficulty_sampler = difficulty_samplers
.entry(challenge.id.clone())
.or_insert_with(|| DifficultySampler::new());
let min_difficulty = query_data.latest_block.config().difficulty.parameters
[&challenge.id]
.min_difficulty();
difficulty_sampler
.update_with_block_data(&min_difficulty, challenge.block_data());
}
}
} else {
println!("[data_fetcher]: no new data");
}
Ok(())
}
loop {
if let Err(e) = do_work().await {
println!("[data_fetcher]: error: {:?}", e);
}
println!("[data_fetcher]: sleeping 10s");
sleep(10000).await;
}
}
pub async fn benchmarker(
selected_algorithms: HashMap<String, String>,
num_workers: u32,
benchmark_duration_ms: u64,
submit_delay_ms: u64,
) {
async fn do_work(
selected_algorithms: &HashMap<String, String>,
num_workers: u32,
benchmark_duration_ms: u64,
submit_delay_ms: u64,
) -> Result<()> {
println!("[benchmarker]: setting up jobs");
let jobs = setup_jobs::execute(selected_algorithms, benchmark_duration_ms, submit_delay_ms)
.await?;
for (i, job) in jobs.values().enumerate() {
println!(
"[benchmarker]: job {}: {:?}, weight: {}",
i, job.settings, job.weight
);
}
{
let mut state = state().lock().await;
state.available_jobs = jobs.clone();
}
println!("[benchmarker]: finding proofs to re-compute");
let job = {
if let Some(job) =
find_proof_to_recompute::execute(benchmark_duration_ms + 5000).await?
{
println!(
"[benchmarker]: found proof to recompute: {}",
job.benchmark_id
);
let mut state = state().lock().await;
state.query_data.proofs.insert(
job.benchmark_id.clone(),
Proof {
benchmark_id: job.benchmark_id.clone(),
state: None,
solutions_data: Some(Vec::new()),
},
);
job
} else {
println!("[benchmarker]: no proofs to recompute");
println!("[benchmarker]: weighted sampling one of the available jobs");
select_job::execute(&jobs).await?
}
};
println!(
"[benchmarker]: downloading algorithm {}",
job.download_url.split("/").last().unwrap()
);
let wasm = download_wasm::execute(&job).await?;
println!("[benchmarker]: starting benchmark {:?}", job.settings);
let nonce_iterators = job.create_nonce_iterators(num_workers);
run_benchmark::execute(nonce_iterators.clone(), &job, &wasm).await;
loop {
{
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
let num_solutions = job.solutions_data.lock().await.len();
let elapsed = time() - job.timestamps.start;
if num_workers > 0 || job.sampled_nonces.is_some() {
println!(
"[benchmarker]: #solutions: {}, #instances: {}, elapsed: {}ms",
num_solutions, num_attempts, elapsed
);
}
if time() >= job.timestamps.end
|| job
.sampled_nonces
.as_ref()
.is_some_and(|sampled_nonces| num_solutions == sampled_nonces.len())
{
break;
}
}
sleep(500).await;
}
{
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
let mut state = state().lock().await;
let num_solutions = job.solutions_data.lock().await.len() as u32;
if job.sampled_nonces.is_some() {
state
.pending_proof_jobs
.insert(job.benchmark_id.clone(), job);
} else if num_attempts > 0 {
state
.difficulty_samplers
.get_mut(&job.settings.challenge_id)
.unwrap()
.update_with_solutions(&job.settings.difficulty, num_solutions);
}
let jobs = state.available_jobs.drain().collect::<HashMap<_, _>>();
state.pending_benchmark_jobs.extend(jobs);
}
Ok(())
}
loop {
if let Err(e) = do_work(
&selected_algorithms,
num_workers,
benchmark_duration_ms,
submit_delay_ms,
)
.await
{
println!("[benchmarker]: error: {:?}", e);
println!("[benchmarker]: sleeping 5s");
sleep(5000).await;
} else {
yield_now().await;
}
}
}
pub async fn setup(api_url: String, api_key: String, player_id: String) {
@ -476,13 +443,12 @@ pub async fn setup(api_url: String, api_key: String, player_id: String) {
}
STATE.get_or_init(|| {
Mutex::new(State {
status: Status::Stopped,
timer: None,
query_data,
difficulty_samplers,
selected_algorithms: HashMap::new(),
job: None,
submission_errors: HashMap::new(),
available_jobs: HashMap::new(),
pending_benchmark_jobs: HashMap::new(),
pending_proof_jobs: HashMap::new(),
submitted_proof_ids: HashSet::new(),
})
});
}

View File

@ -1,9 +1,9 @@
use super::{api, player_id, QueryData, Result};
use crate::future_utils::{join, Mutex};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use tig_api::*;
use tig_structs::core::*;
use tokio::{join, sync::Mutex};
static CACHE: OnceCell<Mutex<HashMap<String, QueryData>>> = OnceCell::new();
@ -14,13 +14,12 @@ pub async fn execute() -> Result<QueryData> {
let mut cache = cache.lock().await;
if !cache.contains_key(&latest_block.id) {
cache.clear();
let results = join(
let results = join!(
query_algorithms(latest_block.id.clone()),
query_player_data(latest_block.id.clone()),
query_benchmarks(latest_block.id.clone()),
query_challenges(latest_block.id.clone()),
)
.await?;
);
let (algorithms_by_challenge, download_urls) = results.0?;
let player_data = results.1?;
let (benchmarks, proofs, frauds) = results.2?;

View File

@ -1,33 +1,26 @@
use super::{Job, NonceIterator};
use crate::future_utils;
use future_utils::{spawn, time, yield_now, Mutex};
use crate::utils;
use std::sync::Arc;
use tig_algorithms::{c001, c002, c003, c004};
use tig_challenges::ChallengeTrait;
use tig_worker::{compute_solution, verify_solution, SolutionData};
use tig_worker::{compute_solution, verify_solution};
use tokio::{spawn, sync::Mutex, task::yield_now};
use utils::time;
pub async fn execute(
nonce_iters: Vec<Arc<Mutex<NonceIterator>>>,
job: &Job,
wasm: &Vec<u8>,
solutions_data: Arc<Mutex<Vec<SolutionData>>>,
solutions_count: Arc<Mutex<u32>>,
) {
for nonce_iter in nonce_iters {
pub async fn execute(nonce_iterators: Vec<Arc<Mutex<NonceIterator>>>, job: &Job, wasm: &Vec<u8>) {
for nonce_iterator in nonce_iterators {
let job = job.clone();
let wasm = wasm.clone();
let solutions_data = solutions_data.clone();
let solutions_count = solutions_count.clone();
spawn(async move {
let mut last_yield = time();
loop {
match {
let mut nonce_iter = (*nonce_iter).lock().await;
(*nonce_iter).next()
} {
let now = time();
if now >= job.timestamps.end {
break;
}
match { nonce_iterator.lock().await.next() } {
None => break,
Some(nonce) => {
let now = time();
if now - last_yield > 25 {
yield_now().await;
last_yield = now;
@ -12137,15 +12130,11 @@ pub async fn execute(
if verify_solution(&job.settings, nonce, &solution_data.solution)
.is_ok()
{
{
let mut solutions_count = (*solutions_count).lock().await;
*solutions_count += 1;
}
if solution_data.calc_solution_signature()
<= job.solution_signature_threshold
{
let mut solutions_data = (*solutions_data).lock().await;
(*solutions_data).push(solution_data);
let mut solutions_data = job.solutions_data.lock().await;
(*solutions_data).insert(nonce, solution_data);
}
}
}

View File

@ -0,0 +1,22 @@
use std::collections::HashMap;
use super::{Job, Result};
use crate::utils::time;
use rand::{distributions::WeightedIndex, rngs::StdRng, SeedableRng};
use rand_distr::Distribution;
pub async fn execute(available_jobs: &HashMap<String, Job>) -> Result<Job> {
let benchmark_ids = available_jobs.keys().cloned().collect::<Vec<String>>();
let weights = benchmark_ids
.iter()
.map(|benchmark_id| available_jobs[benchmark_id].weight.clone())
.collect::<Vec<f64>>();
if weights.len() == 0 {
return Err("No jobs available".to_string());
}
let dist = WeightedIndex::new(&weights)
.map_err(|e| format!("Failed to create WeightedIndex: {}", e))?;
let mut rng = StdRng::seed_from_u64(time());
let index = dist.sample(&mut rng);
Ok(available_jobs[&benchmark_ids[index]].clone())
}

View File

@ -1,221 +0,0 @@
use super::{player_id, state, Job, QueryData, Result, State};
use crate::future_utils::time;
use rand::{
distributions::{Alphanumeric, DistString, WeightedIndex},
rngs::StdRng,
SeedableRng,
};
use rand_distr::Distribution;
use std::collections::HashMap;
use tig_structs::core::*;
pub async fn execute() -> Result<()> {
let job = if let Some(x) = find_settings_to_recompute().await? {
x
} else {
pick_settings_to_benchmark().await?
};
let mut state = state().lock().await;
(*state).job.replace(job.clone());
let QueryData {
latest_block,
benchmarks,
proofs,
..
} = &mut state.query_data;
if job.sampled_nonces.is_none() {
benchmarks.insert(
job.benchmark_id.clone(),
Benchmark {
id: job.benchmark_id.clone(),
settings: job.settings.clone(),
details: BenchmarkDetails {
block_started: latest_block.details.height.clone(),
num_solutions: 0,
},
state: None,
solutions_meta_data: Some(Vec::new()),
solution_data: None,
},
);
}
proofs.insert(
job.benchmark_id.clone(),
Proof {
benchmark_id: job.benchmark_id.clone(),
state: None,
solutions_data: Some(Vec::new()),
},
);
Ok(())
}
async fn find_settings_to_recompute() -> Result<Option<Job>> {
let QueryData {
latest_block,
benchmarks,
proofs,
frauds,
download_urls,
..
} = &state().lock().await.query_data;
for (benchmark_id, benchmark) in benchmarks.iter() {
if !frauds.contains_key(benchmark_id)
&& !proofs.contains_key(benchmark_id)
&& benchmark.state.is_some()
{
let sampled_nonces = benchmark.state().sampled_nonces.clone().ok_or_else(|| {
format!(
"Expecting benchmark '{}' to have sampled_nonces",
benchmark_id
)
})?;
return Ok(Some(Job {
benchmark_id: benchmark.id.clone(),
download_url: get_download_url(&benchmark.settings.algorithm_id, download_urls)?,
settings: benchmark.settings.clone(),
solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud
sampled_nonces: Some(sampled_nonces),
wasm_vm_config: latest_block.config().wasm_vm.clone(),
}));
}
}
Ok(None)
}
async fn pick_settings_to_benchmark() -> Result<Job> {
let State {
query_data,
selected_algorithms,
difficulty_samplers,
..
} = &(*state().lock().await);
let QueryData {
latest_block,
player_data,
challenges,
download_urls,
algorithms_by_challenge,
..
} = query_data;
let mut rng = StdRng::seed_from_u64(time() as u64);
let challenge = pick_challenge(&mut rng, player_data, challenges, selected_algorithms)?;
let selected_algorithm_id = get_algorithm_id(
algorithms_by_challenge,
challenge,
download_urls,
&selected_algorithms[&challenge.details.name],
)?;
let difficulty = difficulty_samplers[&challenge.id].sample(&mut rng);
Ok(Job {
benchmark_id: Alphanumeric.sample_string(&mut rng, 32),
download_url: get_download_url(&selected_algorithm_id, download_urls)?,
settings: BenchmarkSettings {
player_id: player_id().clone(),
block_id: latest_block.id.clone(),
challenge_id: challenge.id.clone(),
algorithm_id: selected_algorithm_id,
difficulty,
},
solution_signature_threshold: *challenge.block_data().solution_signature_threshold(),
sampled_nonces: None,
wasm_vm_config: latest_block.config().wasm_vm.clone(),
})
}
fn pick_challenge<'a>(
rng: &mut StdRng,
player_data: &'a Option<PlayerBlockData>,
challenges: &'a Vec<Challenge>,
selected_algorithms: &HashMap<String, String>,
) -> Result<&'a Challenge> {
let num_qualifiers_by_challenge = match player_data
.as_ref()
.map(|x| x.num_qualifiers_by_challenge.as_ref())
{
Some(Some(num_qualifiers_by_challenge)) => num_qualifiers_by_challenge.clone(),
_ => HashMap::new(),
};
let challenge_name_2_id: HashMap<String, String> = challenges
.iter()
.map(|c| (c.details.name.clone(), c.id.clone()))
.collect();
let percent_qualifiers_by_challenge: HashMap<String, f64> = challenges
.iter()
.map(|c| {
let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0);
let challenge_num_qualifiers = *c.block_data().num_qualifiers();
let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 {
0f64
} else {
(player_num_qualifiers as f64) / (challenge_num_qualifiers as f64)
};
(c.id.clone(), percent)
})
.collect();
if selected_algorithms.len() == 0 {
return Err("Your <algorithm_selection>.json is empty".to_string());
};
let mut challenge_weights = Vec::<(String, f64)>::new();
for challenge_name in selected_algorithms.keys() {
let challenge_id = challenge_name_2_id.get(challenge_name).ok_or_else(|| {
format!(
"Your <algorithm_selection>.json contains a non-existent challenge '{}'",
challenge_name
)
})?;
let max_percent_qualifiers = *percent_qualifiers_by_challenge
.values()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
challenge_weights.push((
challenge_id.clone(),
4.0 * max_percent_qualifiers / 3.0 - percent_qualifiers_by_challenge[challenge_id]
+ 1e-10f64,
));
}
let dist = WeightedIndex::new(
&challenge_weights
.iter()
.map(|w| w.1.clone())
.collect::<Vec<f64>>(),
)
.map_err(|e| format!("Failed to create WeightedIndex: {}", e))?;
let index = dist.sample(rng);
let random_challenge_id = challenge_weights[index].0.clone();
let challenge = challenges
.iter()
.find(|c| c.id == *random_challenge_id)
.ok_or_else(|| "Selected challenge should exist")?;
Ok(challenge)
}
fn get_algorithm_id(
algorithms_by_challenge: &HashMap<String, Vec<Algorithm>>,
challenge: &Challenge,
download_urls: &HashMap<String, String>,
selected_algorithm_name: &String,
) -> Result<String> {
let selected_algorithm_id = algorithms_by_challenge[&challenge.id]
.iter()
.find(|a| download_urls.contains_key(&a.id) && a.details.name == *selected_algorithm_name)
.ok_or_else(|| {
format!(
"Your <algorithm_selection>.json contains a non-existent algorithm '{}'",
selected_algorithm_name
)
})?
.id
.clone();
Ok(selected_algorithm_id)
}
fn get_download_url(
algorithm_id: &String,
download_urls: &HashMap<String, String>,
) -> Result<String> {
Ok(download_urls
.get(algorithm_id)
.ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))?
.clone())
}

View File

@ -0,0 +1,117 @@
use super::{player_id, state, Job, QueryData, Result, State, Timestamps};
use crate::utils::time;
use rand::{rngs::StdRng, SeedableRng};
use std::{collections::HashMap, sync::Arc};
use tig_structs::core::*;
use tokio::sync::Mutex;
pub async fn execute(
selected_algorithms: &HashMap<String, String>,
benchmark_duration_ms: u64,
submit_delay_ms: u64,
) -> Result<HashMap<String, Job>> {
let State {
query_data,
difficulty_samplers,
..
} = &(*state().lock().await);
let QueryData {
latest_block,
player_data,
challenges,
download_urls,
algorithms_by_challenge,
..
} = query_data;
let challenge_weights = get_challenge_weights(player_data, challenges);
let start = time();
let end = start + benchmark_duration_ms;
let submit = end + submit_delay_ms;
let mut jobs = HashMap::new();
for (challenge_name, algorithm_name) in selected_algorithms.iter() {
let challenge = challenges
.iter()
.find(|c| c.details.name == *challenge_name)
.ok_or_else(|| {
format!(
"Your selected_algorithms contains a non-existent challenge '{}'",
challenge_name
)
})?;
let algorithm = algorithms_by_challenge[&challenge.id]
.iter()
.find(|a| download_urls.contains_key(&a.id) && a.details.name == *algorithm_name)
.ok_or_else(|| {
format!(
"Your selected_algorithms contains a non-existent algorithm '{}'",
algorithm_name
)
})?;
let mut rng = StdRng::seed_from_u64(time() as u64);
let difficulty = difficulty_samplers[&challenge.id].sample(&mut rng);
let job = Job {
benchmark_id: format!("{}_{}_{}", challenge_name, algorithm_name, time()),
download_url: download_urls.get(&algorithm.id).cloned().ok_or_else(|| {
format!("Expecting download_url for algorithm '{}'", algorithm.id)
})?,
settings: BenchmarkSettings {
player_id: player_id().clone(),
block_id: latest_block.id.clone(),
challenge_id: challenge.id.clone(),
algorithm_id: algorithm.id.clone(),
difficulty,
},
solution_signature_threshold: *challenge.block_data().solution_signature_threshold(),
sampled_nonces: None,
wasm_vm_config: latest_block.config().wasm_vm.clone(),
weight: challenge_weights[&challenge.id],
timestamps: Timestamps { start, end, submit },
solutions_data: Arc::new(Mutex::new(HashMap::new())),
};
jobs.insert(job.benchmark_id.clone(), job);
}
Ok(jobs)
}
fn get_challenge_weights(
player_data: &Option<PlayerBlockData>,
challenges: &Vec<Challenge>,
) -> HashMap<String, f64> {
let num_qualifiers_by_challenge = match player_data
.as_ref()
.map(|x| x.num_qualifiers_by_challenge.as_ref())
{
Some(Some(num_qualifiers_by_challenge)) => num_qualifiers_by_challenge.clone(),
_ => HashMap::new(),
};
let percent_qualifiers_by_challenge: HashMap<String, f64> = challenges
.iter()
.map(|c| {
let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0);
let challenge_num_qualifiers = *c.block_data().num_qualifiers();
let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 {
0f64
} else {
(player_num_qualifiers as f64) / (challenge_num_qualifiers as f64)
};
(c.id.clone(), percent)
})
.collect();
let max_percent_qualifiers = *percent_qualifiers_by_challenge
.values()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
let challenge_weights = percent_qualifiers_by_challenge
.into_iter()
.map(|(challenge_id, p)| {
(
challenge_id,
4.0 * max_percent_qualifiers / 3.0 - p + 1e-10f64,
)
})
.collect();
challenge_weights
}

View File

@ -1,49 +1,26 @@
use super::{api, state, Job, QueryData, Result};
use crate::future_utils::sleep;
use super::{api, Job, Result};
use tig_api::SubmitBenchmarkReq;
const MAX_RETRIES: u32 = 3;
use tig_structs::core::SolutionMetaData;
pub async fn execute(job: &Job) -> Result<String> {
let req = {
let QueryData {
proofs, benchmarks, ..
} = &mut state().lock().await.query_data;
let benchmark = benchmarks
.get_mut(&job.benchmark_id)
.ok_or_else(|| format!("Job benchmark should exist"))?;
let proof = proofs
.get(&job.benchmark_id)
.ok_or_else(|| format!("Job proof should exist"))?;
let settings = benchmark.settings.clone();
let solutions_meta_data = benchmark.solutions_meta_data.take().unwrap();
let solution_data = proof.solutions_data().first().unwrap().clone();
let solutions_data = job.solutions_data.lock().await;
SubmitBenchmarkReq {
settings,
solutions_meta_data,
solution_data,
settings: job.settings.clone(),
solutions_meta_data: solutions_data
.values()
.map(|x| SolutionMetaData::from(x.clone()))
.collect(),
solution_data: solutions_data.values().next().cloned().unwrap(),
}
};
for attempt in 1..=MAX_RETRIES {
println!("Submission attempt {} of {}", attempt, MAX_RETRIES);
match api().submit_benchmark(req.clone()).await {
Ok(resp) => {
return match resp.verified {
Ok(_) => Ok(resp.benchmark_id),
Err(e) => Err(format!("Benchmark flagged as fraud: {}", e)),
}
}
Err(e) => {
let err_msg = format!("Failed to submit benchmark: {:?}", e);
if attempt < MAX_RETRIES {
println!("{}", err_msg);
println!("Retrying in 5 seconds...");
sleep(5000).await;
} else {
return Err(err_msg);
}
match api().submit_benchmark(req.clone()).await {
Ok(resp) => {
return match resp.verified {
Ok(_) => Ok(resp.benchmark_id),
Err(e) => Err(format!("Benchmark flagged as fraud: {}", e)),
}
}
Err(e) => Err(format!("Failed to submit benchmark: {:?}", e)),
}
unreachable!()
}

View File

@ -1,35 +1,27 @@
use super::{api, Result};
use crate::future_utils::sleep;
use super::{api, Job, Result};
use tig_api::SubmitProofReq;
use tig_worker::SolutionData;
const MAX_RETRIES: u32 = 3;
pub async fn execute(benchmark_id: String, solutions_data: Vec<SolutionData>) -> Result<()> {
let req = SubmitProofReq {
benchmark_id,
solutions_data,
pub async fn execute(job: &Job) -> Result<()> {
let req = {
let solutions_data = job.solutions_data.lock().await;
SubmitProofReq {
benchmark_id: job.benchmark_id.clone(),
solutions_data: job
.sampled_nonces
.as_ref()
.expect("Expected sampled nonces")
.iter()
.map(|n| solutions_data[n].clone())
.collect(),
}
};
for attempt in 1..=MAX_RETRIES {
println!("Submission attempt {} of {}", attempt, MAX_RETRIES);
match api().submit_proof(req.clone()).await {
Ok(resp) => {
return match resp.verified {
Ok(_) => Ok(()),
Err(e) => Err(format!("Proof flagged as fraud: {}", e)),
}
}
Err(e) => {
let err_msg = format!("Failed to submit proof: {:?}", e);
if attempt < MAX_RETRIES {
println!("{}", err_msg);
println!("Retrying in 5 seconds...");
sleep(5000).await;
} else {
return Err(err_msg);
}
match api().submit_proof(req.clone()).await {
Ok(resp) => {
return match resp.verified {
Ok(_) => Ok(()),
Err(e) => Err(format!("Proof flagged as fraud: {}", e)),
}
}
Err(e) => Err(format!("Failed to submit proof: {:?}", e)),
}
unreachable!()
}

View File

@ -1,121 +0,0 @@
#[cfg(all(feature = "standalone", feature = "browser"))]
compile_error!("features `standalone` and `browser` are mutually exclusive");
use serde::{de::DeserializeOwned, Serialize};
use std::future::Future;
#[cfg(feature = "standalone")]
mod utils {
use super::*;
use std::time::{SystemTime, UNIX_EPOCH};
pub use tokio::sync::Mutex;
use tokio::{join, task, time};
pub async fn join<T, U, V, W>(
a: impl Future<Output = T> + 'static,
b: impl Future<Output = U> + 'static,
c: impl Future<Output = V> + 'static,
d: impl Future<Output = W> + 'static,
) -> Result<(T, U, V, W), String>
where
T: Serialize + DeserializeOwned + 'static,
U: Serialize + DeserializeOwned + 'static,
V: Serialize + DeserializeOwned + 'static,
W: Serialize + DeserializeOwned + 'static,
{
Ok(join!(a, b, c, d))
}
pub fn spawn(f: impl Future<Output = ()> + 'static + Send) {
tokio::spawn(f);
}
pub async fn yield_now() {
task::yield_now().await
}
pub async fn sleep(ms: u32) {
time::sleep(time::Duration::from_millis(ms as u64)).await;
}
pub fn time() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
}
#[cfg(feature = "browser")]
mod utils {
use super::*;
pub use futures::lock::Mutex;
use gloo_timers::future::TimeoutFuture;
use js_sys::{Array, Date, Promise};
use serde_wasm_bindgen::{from_value, to_value};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::future_to_promise;
use wasm_bindgen_futures::JsFuture;
fn to_string<T: std::fmt::Debug>(e: T) -> String {
format!("{:?}", e)
}
pub async fn join<T, U, V, W>(
a: impl Future<Output = T> + 'static,
b: impl Future<Output = U> + 'static,
c: impl Future<Output = V> + 'static,
d: impl Future<Output = W> + 'static,
) -> Result<(T, U, V, W), String>
where
T: Serialize + DeserializeOwned + 'static,
U: Serialize + DeserializeOwned + 'static,
V: Serialize + DeserializeOwned + 'static,
W: Serialize + DeserializeOwned + 'static,
{
let a = future_to_promise(async move { Ok(to_value(&a.await)?) });
let b = future_to_promise(async move { Ok(to_value(&b.await)?) });
let c = future_to_promise(async move { Ok(to_value(&c.await)?) });
let d = future_to_promise(async move { Ok(to_value(&d.await)?) });
let promises = Array::new();
promises.push(&a);
promises.push(&b);
promises.push(&c);
promises.push(&d);
let js_promise = Promise::all(&promises);
let js_values = JsFuture::from(js_promise).await.map_err(to_string)?;
let values = js_values.dyn_into::<Array>().map_err(to_string)?;
let results = (
from_value(values.get(0)).map_err(to_string)?,
from_value(values.get(1)).map_err(to_string)?,
from_value(values.get(2)).map_err(to_string)?,
from_value(values.get(3)).map_err(to_string)?,
);
Ok(results)
}
pub fn spawn(f: impl Future<Output = ()> + 'static) {
// Convert the Rust Future into a JavaScript Promise
let _ = future_to_promise(async move {
f.await;
Ok(JsValue::undefined())
});
}
pub async fn yield_now() {
TimeoutFuture::new(0).await;
}
pub async fn sleep(ms: u32) {
TimeoutFuture::new(ms).await;
}
pub fn time() -> u64 {
Date::now() as u64
}
}
pub use utils::*;

View File

@ -1,34 +0,0 @@
mod benchmarker;
mod future_utils;
#[cfg(feature = "browser")]
mod exports {
use super::*;
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
pub async fn state() -> JsValue {
let state = benchmarker::state().lock().await.clone();
serde_wasm_bindgen::to_value(&state).unwrap()
}
#[wasm_bindgen]
pub async fn start(num_workers: u32, ms_per_benchmark: u32) {
benchmarker::start(num_workers, ms_per_benchmark).await;
}
#[wasm_bindgen]
pub async fn stop() {
benchmarker::stop().await;
}
#[wasm_bindgen]
pub async fn select_algorithm(challenge_name: String, algorithm_name: String) {
benchmarker::select_algorithm(challenge_name, algorithm_name).await;
}
#[wasm_bindgen]
pub async fn setup(api_url: String, api_key: String, player_id: String) {
benchmarker::setup(api_url, api_key, player_id.to_string()).await;
}
}

View File

@ -1,14 +1,12 @@
// #[cfg(any(not(feature = "standalone"), feature = "browser"))]
// compile_error!("to build the binary use `--no-default-features --features standalone`");
mod benchmarker;
mod future_utils;
use benchmarker::{Job, NonceIterator};
mod utils;
use benchmarker::{Job, State};
use clap::{value_parser, Arg, Command};
use future_utils::{sleep, Mutex};
use std::{collections::HashMap, fs, path::PathBuf, sync::Arc};
use std::collections::HashMap;
use tig_structs::core::*;
use tig_utils::{dejsonify, get, jsonify, post};
use tokio::{spawn, task::yield_now};
use utils::{sleep, time, Result};
use warp::Filter;
fn cli() -> Command {
@ -28,10 +26,10 @@ fn cli() -> Command {
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("ALGORITHMS_SELECTION")
.help("Path to json file with your algorithm selection")
Arg::new("SELECTED_ALGORITHMS")
.help("Json string with your algorithm selection")
.required(true)
.value_parser(value_parser!(PathBuf)),
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("workers")
@ -43,8 +41,15 @@ fn cli() -> Command {
.arg(
Arg::new("duration")
.long("duration")
.help("(Optional) Set duration of a benchmark in milliseconds")
.default_value("7500")
.help("(Optional) Set duration (in milliseconds) of a benchmark")
.default_value("10000")
.value_parser(value_parser!(u32)),
)
.arg(
Arg::new("delay")
.long("delay")
.help("(Optional) Set delay (in milliseconds) between benchmark finishing and submitting")
.default_value("5000")
.value_parser(value_parser!(u32)),
)
.arg(
@ -67,223 +72,235 @@ fn cli() -> Command {
.help("(Optional) Set hostname for master node to connect to")
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("offset")
.long("offset")
.help("(Optional) Set nonce offset for each slave")
.default_value("5000000")
.value_parser(value_parser!(u64)),
)
}
#[tokio::main]
async fn main() {
let matches = cli().get_matches();
let algorithms_path = matches.get_one::<PathBuf>("ALGORITHMS_SELECTION").unwrap();
let selected_algorithms = matches
.get_one::<String>("SELECTED_ALGORITHMS")
.unwrap()
.clone();
let num_workers = *matches.get_one::<u32>("workers").unwrap();
let port = *matches.get_one::<u16>("port").unwrap();
let duration = *matches.get_one::<u32>("duration").unwrap();
let delay = *matches.get_one::<u32>("delay").unwrap();
let api_url = matches.get_one::<String>("api").unwrap().clone();
let api_key = matches.get_one::<String>("API_KEY").unwrap().clone();
let player_id = matches.get_one::<String>("PLAYER_ID").unwrap().clone();
let nonce_offset = matches.get_one::<u64>("offset").unwrap().clone();
if let Some(master) = matches.get_one::<String>("master") {
slave_node(master, port, num_workers).await;
slave(master, port, num_workers, selected_algorithms).await;
} else {
master_node(
master(
api_url,
api_key,
player_id,
num_workers,
duration,
algorithms_path,
delay,
selected_algorithms,
port,
nonce_offset,
)
.await
}
}
async fn slave_node(master: &String, port: u16, num_workers: u32) {
let master_url = format!("http://{}:{}", master, port);
let mut job: Option<Job> = None;
let mut nonce_iters: Vec<Arc<Mutex<NonceIterator>>> = Vec::new();
let mut solutions_data = Arc::new(Mutex::new(Vec::<SolutionData>::new()));
let mut solutions_count = Arc::new(Mutex::new(0u32));
let mut num_solutions = 0;
loop {
let next_job = match get::<String>(&format!("{}/job", master_url), None).await {
Ok(resp) => dejsonify::<Option<Job>>(&resp).unwrap(),
async fn slave(master: &String, port: u16, num_workers: u32, selected_algorithms: String) {
println!(
"[slave] parsing selected algorithms from: {:?}",
selected_algorithms
);
let selected_algorithms = serde_json::from_str::<HashMap<String, String>>(&selected_algorithms)
.unwrap_or_else(|err| panic!("Failed to parse {:?}: {}", selected_algorithms, err));
async fn do_work(
master_url: &String,
num_workers: u32,
selected_algorithms: &HashMap<String, String>,
) -> Result<()> {
println!("[slave] fetching jobs from master at: {}", master_url);
let available_jobs = match get::<String>(&format!("{}/jobs", master_url), None).await {
Ok(resp) => dejsonify::<HashMap<String, Job>>(&resp).unwrap(),
Err(e) => {
println!("Error getting job: {:?}", e);
sleep(5000).await;
continue;
return Err(format!("failed to fetch jobs from master: {:?}", e));
}
};
if job != next_job {
println!("Ending job");
for nonce_iter in nonce_iters.iter() {
(*(*nonce_iter).lock().await).empty();
}
nonce_iters.clear();
solutions_data = Arc::new(Mutex::new(Vec::<SolutionData>::new()));
solutions_count = Arc::new(Mutex::new(0u32));
num_solutions = 0;
if next_job
.as_ref()
.is_some_and(|x| x.sampled_nonces.is_none())
{
let job = next_job.as_ref().unwrap();
println!("Starting new job: {:?}", job);
println!(
"Downloading algorithm {}",
job.download_url.split("/").last().unwrap()
);
let wasm = match benchmarker::download_wasm::execute(job).await {
Ok(wasm) => wasm,
Err(e) => {
println!("Error downloading wasm: {:?}", e);
sleep(5000).await;
continue;
}
};
println!("Getting nonce offset from master");
let offset = match get::<String>(
&format!("{}/nonce_offset/{:?}", master_url, hostname::get().unwrap()),
None,
)
.await
{
Ok(resp) => dejsonify::<u64>(&resp).unwrap(),
Err(e) => {
println!("Error getting nonce offset: {:?}", e);
sleep(5000).await;
continue;
}
};
println!("Got nonce offset: {}", offset);
// variables that are shared by workers
nonce_iters = (0..num_workers)
.into_iter()
.map(|x| {
Arc::new(Mutex::new(NonceIterator::from_u64(
offset + u64::MAX / num_workers as u64 * x as u64,
)))
})
.collect();
println!("Starting benchmark");
benchmarker::run_benchmark::execute(
nonce_iters.iter().cloned().collect(),
job,
&wasm,
solutions_data.clone(),
solutions_count.clone(),
)
.await;
}
job = next_job;
}
if job.as_ref().is_some_and(|x| x.sampled_nonces.is_none()) {
let job = job.as_ref().unwrap();
let mut solutions_data = solutions_data.lock().await;
let n = solutions_data.len();
if n > 0 {
num_solutions += n as u32;
let data: Vec<SolutionData> = solutions_data.drain(..).collect();
println!("Posting {} solutions", n);
if let Err(e) = post::<String>(
&format!("{}/solutions_data/{}", master_url, job.benchmark_id),
&jsonify(&data),
Some(vec![(
"Content-Type".to_string(),
"application/json".to_string(),
)]),
)
.await
{
println!("Error posting solutions data: {:?}", e);
sleep(5000).await;
continue;
}
}
let mut num_attempts = 0;
for nonce_iter in nonce_iters.iter().cloned() {
let nonce_iter = (*nonce_iter).lock().await;
num_attempts += nonce_iter.attempts();
}
println!("[slave] fetched {} jobs", available_jobs.len());
for (i, job) in available_jobs.values().enumerate() {
println!(
"Computed {} solutions out of {} instances",
num_solutions, num_attempts
"[slave] job {}: {:?}, weight: {}",
i, job.settings, job.weight
);
sleep(100).await;
} else {
println!("No job, sleeping 100ms");
sleep(100).await;
}
println!(
"[slave] filtering jobs that match selected_algorithms: {:?}",
selected_algorithms
);
let filtered_jobs: HashMap<String, Job> = available_jobs
.into_iter()
.filter(|(benchmark_id, _)| {
selected_algorithms
.iter()
.any(|(challenge_name, algorithm_name)| {
benchmark_id.starts_with(&format!("{}_{}", challenge_name, algorithm_name))
})
})
.collect();
if filtered_jobs.len() == 0 {
return Err("no jobs matching selected_algorithms".to_string());
}
let job = benchmarker::select_job::execute(&filtered_jobs).await?;
println!(
"[slave]: downloading algorithm {}",
job.download_url.split("/").last().unwrap()
);
let wasm = benchmarker::download_wasm::execute(&job).await?;
println!("[slave]: starting benchmark {:?}", job.settings);
let nonce_iterators = job.create_nonce_iterators(num_workers);
benchmarker::run_benchmark::execute(nonce_iterators.clone(), &job, &wasm).await;
while time() < job.timestamps.end {
{
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
let num_solutions = job.solutions_data.lock().await.len() as u32;
let elapsed = time() - job.timestamps.start;
println!(
"[slave]: #solutions: {}, #instances: {}, elapsed: {}ms",
num_solutions, num_attempts, elapsed
);
}
sleep(500).await;
}
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
if num_attempts > 0 {
let solutions_data = &*job.solutions_data.lock().await;
println!(
"[slave]: posting {} solutions to master",
solutions_data.len()
);
post::<String>(
&format!("{}/solutions_data/{}", master_url, job.benchmark_id),
&jsonify(&solutions_data),
Some(vec![(
"Content-Type".to_string(),
"application/json".to_string(),
)]),
)
.await
.map_err(|e| format!("failed to post solutions to master: {:?}", e))?;
}
Ok(())
}
let master_url = format!("http://{}:{}", master, port);
loop {
if let Err(e) = do_work(&master_url, num_workers, &selected_algorithms).await {
println!("[slave]: error: {:?}", e);
println!("[slave]: sleeping 5s");
sleep(5000).await;
}
yield_now().await;
}
}
async fn master_node(
async fn master(
api_url: String,
api_key: String,
player_id: String,
num_workers: u32,
duration: u32,
algorithms_path: &PathBuf,
delay: u32,
selected_algorithms: String,
port: u16,
nonce_offset: u64,
) {
println!(
"[master] parsing selected algorithms from: {:?}",
selected_algorithms
);
let selected_algorithms = serde_json::from_str::<HashMap<String, String>>(&selected_algorithms)
.unwrap_or_else(|err| panic!("Failed to parse {:?}: {}", selected_algorithms, err));
println!("[master] setting up");
benchmarker::setup(api_url, api_key, player_id).await;
benchmarker::start(num_workers, duration).await;
future_utils::spawn(async move {
let offsets = Arc::new(Mutex::new(HashMap::new()));
let get_nonce_offset = warp::path!("nonce_offset" / String)
.and(warp::get())
.and(warp::any().map(move || offsets.clone()))
.and_then(
move |slave_id: String, offsets: Arc<Mutex<HashMap<String, u64>>>| async move {
let offsets = &mut (*offsets).lock().await;
let len = offsets.len() as u64;
let o = offsets
.entry(slave_id)
.or_insert_with(|| (len + 1) * nonce_offset);
Ok::<_, warp::Rejection>(warp::reply::json(&o))
},
);
let get_job = warp::path("job").and(warp::get()).and_then(|| async {
println!("[master] starting data_fetcher");
spawn(async {
benchmarker::data_fetcher().await;
});
println!("[master] starting benchmark_submitter");
spawn(async {
benchmarker::benchmark_submitter().await;
});
println!("[master] starting proof_submitter");
spawn(async {
benchmarker::proof_submitter().await;
});
println!("[master] starting benchmarker");
spawn(async move {
benchmarker::benchmarker(
selected_algorithms,
num_workers,
duration as u64,
delay as u64,
)
.await;
});
println!("[master] starting webserver on port {}", port);
spawn(async move {
let get_jobs = warp::path("jobs").and(warp::get()).and_then(|| async {
println!("[master] slave node fetching jobs",);
let state = (*benchmarker::state()).lock().await;
Ok::<_, warp::Rejection>(warp::reply::json(&state.job))
Ok::<_, warp::Rejection>(warp::reply::json(&state.available_jobs))
});
let post_solutions_data = warp::path!("solutions_data" / String)
.and(warp::post())
.and(warp::body::json())
.and_then(
|benchmark_id: String, mut solutions_data: Vec<SolutionData>| async move {
benchmarker::drain_solutions(&benchmark_id, &mut solutions_data).await;
|benchmark_id: String, solutions_data: HashMap<u64, SolutionData>| async move {
{
let num_solutions = solutions_data.len() as u32;
println!("[master] received {} solutions from slave", num_solutions,);
let mut state = (*benchmarker::state()).lock().await;
let State {
available_jobs,
pending_benchmark_jobs,
difficulty_samplers,
..
} = &mut *state;
if let Some(job) = available_jobs
.get_mut(&benchmark_id)
.or_else(|| pending_benchmark_jobs.get_mut(&benchmark_id))
{
println!("[master] adding solutions to benchmark {:?}", job.settings,);
difficulty_samplers
.get_mut(&job.settings.challenge_id)
.unwrap()
.update_with_solutions(&job.settings.difficulty, num_solutions);
job.solutions_data.lock().await.extend(solutions_data);
} else {
println!("[master] failed to find benchmark to add solutions to",);
}
}
Ok::<_, warp::Rejection>(warp::reply::with_status(
"SolutionsData received",
"solutions received",
warp::http::StatusCode::OK,
))
},
);
warp::serve(get_nonce_offset.or(get_job).or(post_solutions_data))
warp::serve(get_jobs.or(post_solutions_data))
.run(([0, 0, 0, 0], port))
.await;
});
loop {
let selection = serde_json::from_str::<HashMap<String, String>>(
&fs::read_to_string(algorithms_path).unwrap(),
)
.unwrap();
for (challenge_id, algorithm_id) in selection {
benchmarker::select_algorithm(challenge_id, algorithm_id).await;
}
future_utils::sleep(10000).await;
sleep(30000).await;
}
}

View File

@ -0,0 +1,15 @@
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time;
pub type Result<T> = std::result::Result<T, String>;
pub async fn sleep(ms: u64) {
time::sleep(time::Duration::from_millis(ms)).await;
}
pub fn time() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}