From d56356e08601fed43cd57ba3e14ba4e5e8e4cd01 Mon Sep 17 00:00:00 2001 From: FiveMovesAhead Date: Wed, 24 Apr 2024 21:10:09 +0800 Subject: [PATCH] Fix rare race condition where signature threshold is different from settings. --- tig-benchmarker/src/benchmarker.rs | 130 ++++++++++++++++------------- 1 file changed, 72 insertions(+), 58 deletions(-) diff --git a/tig-benchmarker/src/benchmarker.rs b/tig-benchmarker/src/benchmarker.rs index 733722d9..0eaa1648 100644 --- a/tig-benchmarker/src/benchmarker.rs +++ b/tig-benchmarker/src/benchmarker.rs @@ -30,7 +30,6 @@ pub struct Job { settings: BenchmarkSettings, duration: Duration, solution_signature_threshold: u32, - wasm_blob: Vec, nonce_iter: NonceIterator, wasm_vm_config: WasmVMConfig, } @@ -117,6 +116,7 @@ impl Iterator for NonceIterator { } static STATE: OnceCell> = OnceCell::new(); +static BLOBS: OnceCell>>> = OnceCell::new(); static API: OnceCell = OnceCell::new(); static PLAYER_ID: OnceCell = OnceCell::new(); const BLOCK_DATA_POLLER_ID: &'static str = "Block data poller"; @@ -150,6 +150,7 @@ pub async fn select_algorithm(challenge_id: String, algorithm_id: String) { } pub async fn setup(api_url: String, api_key: String, player_id: String, num_workers: u32) { + BLOBS.get_or_init(|| Mutex::new(HashMap::new())); STATE.get_or_init(|| Mutex::new(State::new())); API.get_or_init(|| Api::new(api_url, api_key)); PLAYER_ID.get_or_init(|| player_id); @@ -466,7 +467,7 @@ async fn update_block_data() -> Result<()> { Ok(()) } -async fn find_settings_to_recompute() -> Option<(String, BenchmarkSettings, NonceIterator)> { +async fn find_settings_to_recompute() -> Option { let state = mutex().lock().await; for (benchmark_id, benchmark) in state.benchmarks.iter() { if !state.proofs.contains_key(benchmark_id) && benchmark.state.is_some() { @@ -475,17 +476,27 @@ async fn find_settings_to_recompute() -> Option<(String, BenchmarkSettings, Nonc panic!("FIXME"); }); println!("Sampled nonces: {:?}", sampled_nonces); - return Some(( - benchmark_id.clone(), - benchmark.settings.clone(), - NonceIterator::new(Some(sampled_nonces)), - )); + return Some(Job { + benchmark_id: benchmark.id.clone(), + settings: benchmark.settings.clone(), + duration: Duration { + start: time(), + end: time() + 5000, + now: time(), + }, + solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud + nonce_iter: NonceIterator::new(Some(sampled_nonces)), + wasm_vm_config: WasmVMConfig { + max_memory: u64::MAX, + max_fuel: u64::MAX, + }, + }); } } None } -async fn pick_settings_to_benchmark() -> (String, BenchmarkSettings, NonceIterator) { +async fn pick_settings_to_benchmark() -> Job { let block_id = get_latest_block_id().await; let state = mutex().lock().await; let num_qualifiers_by_challenge = match &state.benchmarker_data { @@ -558,9 +569,9 @@ async fn pick_settings_to_benchmark() -> (String, BenchmarkSettings, NonceIterat &max_difficulty, *block_data.scaling_factor(), ); - ( - Alphanumeric.sample_string(&mut rng, 32), - BenchmarkSettings { + Job { + benchmark_id: Alphanumeric.sample_string(&mut rng, 32), + settings: BenchmarkSettings { player_id: PLAYER_ID .get() .unwrap_or_else(|| { @@ -573,23 +584,48 @@ async fn pick_settings_to_benchmark() -> (String, BenchmarkSettings, NonceIterat algorithm_id: selected_algorithm_id, difficulty: random_difficulty, }, - NonceIterator::new(None), - ) + duration: Duration { + start: time(), + end: time() + 30000, + now: time(), + }, + solution_signature_threshold: *block_data.solution_signature_threshold(), + nonce_iter: NonceIterator::new(None), + wasm_vm_config: state + .latest_block + .as_ref() + .unwrap_or_else(|| { + web_sys::console::error_1(&"pick_settings_to_benchmark6".to_string().into()); + panic!("FIXME"); + }) + .config() + .wasm_vm + .clone(), + } } async fn download_wasm_blob(algorithm_id: &String) -> Result> { - let state = mutex().lock().await; - let download_url = state - .download_urls - .get(algorithm_id.as_str()) - .ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))?; - let wasm = get::>(download_url.as_str(), None) - .await - .map_err(|e| format!("Failed to download wasm from {}: {:?}", download_url, e))?; - Ok(wasm) + // lock it so that only do 1 download + let mut blobs = BLOBS.get().unwrap().lock().await; + if let Some(wasm_blob) = blobs.get(algorithm_id) { + Ok(wasm_blob.clone()) + } else { + let state = mutex().lock().await; + let download_url = state + .download_urls + .get(algorithm_id.as_str()) + .ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))?; + let wasm = get::>(download_url.as_str(), None) + .await + .map_err(|e| format!("Failed to download wasm from {}: {:?}", download_url, e))?; + (*blobs).insert(algorithm_id.clone(), wasm.clone()); + Ok(wasm) + } } async fn do_benchmark() -> Result<()> { + let mut last_algorithm_id = "None".to_string(); + let mut blob = Vec::new(); while let Some((job, Some(nonce))) = { let mut state = mutex().lock().await; if state.running { @@ -601,10 +637,14 @@ async fn do_benchmark() -> Result<()> { None } } { + if last_algorithm_id != job.settings.algorithm_id { + blob = download_wasm_blob(&job.settings.algorithm_id).await?; + last_algorithm_id = job.settings.algorithm_id.clone(); + } if let Ok(solution_data) = compute_solution( &job.settings, nonce, - job.wasm_blob.as_slice(), + blob.as_slice(), job.wasm_vm_config.max_memory, job.wasm_vm_config.max_fuel, ) @@ -649,46 +689,32 @@ async fn do_benchmark() -> Result<()> { async fn do_manage_benchmark() -> Result<()> { update_status(MANAGER_ID, "Checking for any benchmarks to recompute").await; - let (benchmark_id, settings, nonce_iter) = if let Some(x) = find_settings_to_recompute().await { + let job = if let Some(x) = find_settings_to_recompute().await { update_status(MANAGER_ID, "Found benchmark to recompute").await; x } else { update_status(MANAGER_ID, "Picking new settings to benchmark").await; pick_settings_to_benchmark().await }; - update_status(MANAGER_ID, &format!("{:?}", settings)).await; + update_status(MANAGER_ID, &format!("{:?}", job.settings)).await; update_status( MANAGER_ID, - &format!("Downloading algorithm: {}", settings.algorithm_id), + &format!("Downloading algorithm: {}", job.settings.algorithm_id), ) .await; - let wasm_blob = download_wasm_blob(&settings.algorithm_id).await?; + download_wasm_blob(&job.settings.algorithm_id).await?; update_status(MANAGER_ID, &format!("Setting up benchmark")).await; + let benchmark_id = job.benchmark_id.clone(); let mut state = mutex().lock().await; - let solution_signature_threshold = *state - .challenges - .iter() - .find(|x| x.id == settings.challenge_id) - .unwrap_or_else(|| { - web_sys::console::error_1(&"do_manage_benchmark".to_string().into()); - panic!("FIXME"); - }) - .block_data() - .solution_signature_threshold(); - let block = state.latest_block.as_ref().unwrap_or_else(|| { - web_sys::console::error_1(&"do_manage_benchmark2".to_string().into()); - panic!("FIXME"); - }); - let block_started = block.details.height; - let wasm_vm_config = block.config().wasm_vm.clone(); - if !nonce_iter.is_recompute() { + if !job.nonce_iter.is_recompute() { + let block_started = state.latest_block.as_ref().unwrap().details.height.clone(); (*state).benchmarks.insert( benchmark_id.clone(), Benchmark { id: benchmark_id.clone(), - settings: settings.clone(), + settings: job.settings.clone(), details: BenchmarkDetails { block_started, num_solutions: 0, @@ -707,19 +733,7 @@ async fn do_manage_benchmark() -> Result<()> { solutions_data: Some(Vec::new()), }, ); - (*state).job = Some(Job { - benchmark_id: benchmark_id.clone(), - settings, - duration: Duration { - start: time(), - end: time() + 20000, - now: time(), - }, - solution_signature_threshold, - wasm_blob, - nonce_iter, - wasm_vm_config, - }); + (*state).job = Some(job); drop(state); loop {