Fix rare race condition where signature threshold is different from settings.

This commit is contained in:
FiveMovesAhead 2024-04-24 21:10:09 +08:00
parent 75422ce042
commit d56356e086

View File

@ -30,7 +30,6 @@ pub struct Job {
settings: BenchmarkSettings,
duration: Duration,
solution_signature_threshold: u32,
wasm_blob: Vec<u8>,
nonce_iter: NonceIterator,
wasm_vm_config: WasmVMConfig,
}
@ -117,6 +116,7 @@ impl Iterator for NonceIterator {
}
static STATE: OnceCell<Mutex<State>> = OnceCell::new();
static BLOBS: OnceCell<Mutex<HashMap<String, Vec<u8>>>> = OnceCell::new();
static API: OnceCell<Api> = OnceCell::new();
static PLAYER_ID: OnceCell<String> = OnceCell::new();
const BLOCK_DATA_POLLER_ID: &'static str = "Block data poller";
@ -150,6 +150,7 @@ pub async fn select_algorithm(challenge_id: String, algorithm_id: String) {
}
pub async fn setup(api_url: String, api_key: String, player_id: String, num_workers: u32) {
BLOBS.get_or_init(|| Mutex::new(HashMap::new()));
STATE.get_or_init(|| Mutex::new(State::new()));
API.get_or_init(|| Api::new(api_url, api_key));
PLAYER_ID.get_or_init(|| player_id);
@ -466,7 +467,7 @@ async fn update_block_data() -> Result<()> {
Ok(())
}
async fn find_settings_to_recompute() -> Option<(String, BenchmarkSettings, NonceIterator)> {
async fn find_settings_to_recompute() -> Option<Job> {
let state = mutex().lock().await;
for (benchmark_id, benchmark) in state.benchmarks.iter() {
if !state.proofs.contains_key(benchmark_id) && benchmark.state.is_some() {
@ -475,17 +476,27 @@ async fn find_settings_to_recompute() -> Option<(String, BenchmarkSettings, Nonc
panic!("FIXME");
});
println!("Sampled nonces: {:?}", sampled_nonces);
return Some((
benchmark_id.clone(),
benchmark.settings.clone(),
NonceIterator::new(Some(sampled_nonces)),
));
return Some(Job {
benchmark_id: benchmark.id.clone(),
settings: benchmark.settings.clone(),
duration: Duration {
start: time(),
end: time() + 5000,
now: time(),
},
solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud
nonce_iter: NonceIterator::new(Some(sampled_nonces)),
wasm_vm_config: WasmVMConfig {
max_memory: u64::MAX,
max_fuel: u64::MAX,
},
});
}
}
None
}
async fn pick_settings_to_benchmark() -> (String, BenchmarkSettings, NonceIterator) {
async fn pick_settings_to_benchmark() -> Job {
let block_id = get_latest_block_id().await;
let state = mutex().lock().await;
let num_qualifiers_by_challenge = match &state.benchmarker_data {
@ -558,9 +569,9 @@ async fn pick_settings_to_benchmark() -> (String, BenchmarkSettings, NonceIterat
&max_difficulty,
*block_data.scaling_factor(),
);
(
Alphanumeric.sample_string(&mut rng, 32),
BenchmarkSettings {
Job {
benchmark_id: Alphanumeric.sample_string(&mut rng, 32),
settings: BenchmarkSettings {
player_id: PLAYER_ID
.get()
.unwrap_or_else(|| {
@ -573,23 +584,48 @@ async fn pick_settings_to_benchmark() -> (String, BenchmarkSettings, NonceIterat
algorithm_id: selected_algorithm_id,
difficulty: random_difficulty,
},
NonceIterator::new(None),
)
duration: Duration {
start: time(),
end: time() + 30000,
now: time(),
},
solution_signature_threshold: *block_data.solution_signature_threshold(),
nonce_iter: NonceIterator::new(None),
wasm_vm_config: state
.latest_block
.as_ref()
.unwrap_or_else(|| {
web_sys::console::error_1(&"pick_settings_to_benchmark6".to_string().into());
panic!("FIXME");
})
.config()
.wasm_vm
.clone(),
}
}
async fn download_wasm_blob(algorithm_id: &String) -> Result<Vec<u8>> {
let state = mutex().lock().await;
let download_url = state
.download_urls
.get(algorithm_id.as_str())
.ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))?;
let wasm = get::<Vec<u8>>(download_url.as_str(), None)
.await
.map_err(|e| format!("Failed to download wasm from {}: {:?}", download_url, e))?;
Ok(wasm)
// lock it so that only do 1 download
let mut blobs = BLOBS.get().unwrap().lock().await;
if let Some(wasm_blob) = blobs.get(algorithm_id) {
Ok(wasm_blob.clone())
} else {
let state = mutex().lock().await;
let download_url = state
.download_urls
.get(algorithm_id.as_str())
.ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))?;
let wasm = get::<Vec<u8>>(download_url.as_str(), None)
.await
.map_err(|e| format!("Failed to download wasm from {}: {:?}", download_url, e))?;
(*blobs).insert(algorithm_id.clone(), wasm.clone());
Ok(wasm)
}
}
async fn do_benchmark() -> Result<()> {
let mut last_algorithm_id = "None".to_string();
let mut blob = Vec::new();
while let Some((job, Some(nonce))) = {
let mut state = mutex().lock().await;
if state.running {
@ -601,10 +637,14 @@ async fn do_benchmark() -> Result<()> {
None
}
} {
if last_algorithm_id != job.settings.algorithm_id {
blob = download_wasm_blob(&job.settings.algorithm_id).await?;
last_algorithm_id = job.settings.algorithm_id.clone();
}
if let Ok(solution_data) = compute_solution(
&job.settings,
nonce,
job.wasm_blob.as_slice(),
blob.as_slice(),
job.wasm_vm_config.max_memory,
job.wasm_vm_config.max_fuel,
)
@ -649,46 +689,32 @@ async fn do_benchmark() -> Result<()> {
async fn do_manage_benchmark() -> Result<()> {
update_status(MANAGER_ID, "Checking for any benchmarks to recompute").await;
let (benchmark_id, settings, nonce_iter) = if let Some(x) = find_settings_to_recompute().await {
let job = if let Some(x) = find_settings_to_recompute().await {
update_status(MANAGER_ID, "Found benchmark to recompute").await;
x
} else {
update_status(MANAGER_ID, "Picking new settings to benchmark").await;
pick_settings_to_benchmark().await
};
update_status(MANAGER_ID, &format!("{:?}", settings)).await;
update_status(MANAGER_ID, &format!("{:?}", job.settings)).await;
update_status(
MANAGER_ID,
&format!("Downloading algorithm: {}", settings.algorithm_id),
&format!("Downloading algorithm: {}", job.settings.algorithm_id),
)
.await;
let wasm_blob = download_wasm_blob(&settings.algorithm_id).await?;
download_wasm_blob(&job.settings.algorithm_id).await?;
update_status(MANAGER_ID, &format!("Setting up benchmark")).await;
let benchmark_id = job.benchmark_id.clone();
let mut state = mutex().lock().await;
let solution_signature_threshold = *state
.challenges
.iter()
.find(|x| x.id == settings.challenge_id)
.unwrap_or_else(|| {
web_sys::console::error_1(&"do_manage_benchmark".to_string().into());
panic!("FIXME");
})
.block_data()
.solution_signature_threshold();
let block = state.latest_block.as_ref().unwrap_or_else(|| {
web_sys::console::error_1(&"do_manage_benchmark2".to_string().into());
panic!("FIXME");
});
let block_started = block.details.height;
let wasm_vm_config = block.config().wasm_vm.clone();
if !nonce_iter.is_recompute() {
if !job.nonce_iter.is_recompute() {
let block_started = state.latest_block.as_ref().unwrap().details.height.clone();
(*state).benchmarks.insert(
benchmark_id.clone(),
Benchmark {
id: benchmark_id.clone(),
settings: settings.clone(),
settings: job.settings.clone(),
details: BenchmarkDetails {
block_started,
num_solutions: 0,
@ -707,19 +733,7 @@ async fn do_manage_benchmark() -> Result<()> {
solutions_data: Some(Vec::new()),
},
);
(*state).job = Some(Job {
benchmark_id: benchmark_id.clone(),
settings,
duration: Duration {
start: time(),
end: time() + 20000,
now: time(),
},
solution_signature_threshold,
wasm_blob,
nonce_iter,
wasm_vm_config,
});
(*state).job = Some(job);
drop(state);
loop {