Deprecate rust crates tig-api and tig-benchmarker.

This commit is contained in:
FiveMovesAhead 2024-09-22 15:28:02 +08:00
parent 0f375d1780
commit 3cb372f9d5
22 changed files with 0 additions and 34506 deletions

View File

@ -1,8 +1,6 @@
[workspace]
members = [
"tig-algorithms",
"tig-api",
"tig-benchmarker",
"tig-challenges",
"tig-protocol",
"tig-structs",

View File

@ -1,23 +0,0 @@
[package]
name = "tig-api"
version = "0.1.0"
readme = "README.md"
license = "https://github.com/tig-foundation/tig-monorepo/tree/main/docs/agreements/end_user_license_agreement.pdf"
authors.workspace = true
repository.workspace = true
edition.workspace = true
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
anyhow = "1.0.81"
query_map = { version = "0.7.0", features = ["url-query"] }
serde = { version = "1.0.196", features = ["derive"] }
serde_json = { version = "1.0.113" }
tig-utils = { path = "../tig-utils" }
tig-structs = { path = "../tig-structs" }
[features]
request = ["tig-utils/request"]
request-js = ["tig-utils/request-js"]

View File

@ -1,17 +0,0 @@
# tig-api
A Rust crate for making requests to TIG's API.
Developers must either enable feature `request` (uses `reqwest`) or `request-js` (uses `web-sys`)
## API Url
* Mainnet https://mainnet-api.tig.foundation
* Testnet https://testnet-api.tig.foundation
## API Documentation
Our API's swagger can be found @ [https://swagger.tig.foundation/](https://swagger.tig.foundation/)
# License
[End User License Agreement](../docs/agreements/end_user_license_agreement.pdf)

View File

@ -1,137 +0,0 @@
#[cfg(not(any(feature = "request", feature = "request-js")))]
compile_error!("Either feature `request` or `request-js` must be enabled");
#[cfg(all(feature = "request", feature = "request-js"))]
compile_error!("features `request` and `request-js` are mutually exclusive");
use anyhow::{anyhow, Result};
use query_map::QueryMap;
use serde::de::DeserializeOwned;
use std::{collections::HashMap, vec};
pub use tig_structs::api::*;
use tig_utils::{dejsonify, get, jsonify, post};
pub struct Api {
pub api_url: String,
pub api_key: String,
}
impl Api {
pub fn new(api_url: String, api_key: String) -> Self {
Self { api_url, api_key }
}
async fn get<T>(&self, path: String) -> Result<T>
where
T: DeserializeOwned,
{
let resp = get::<String>(
format!("{}/{}", self.api_url, path).as_str(),
Some(
vec![
("x-api-key".to_string(), self.api_key.clone()),
("user-agent".to_string(), "TIG API".to_string()),
]
.into_iter()
.collect(),
),
)
.await?;
dejsonify::<T>(&resp).map_err(|e| anyhow!("Failed to dejsonify: {}", e))
}
async fn post<T>(&self, path: String, body: String) -> Result<T>
where
T: DeserializeOwned,
{
let resp = post::<String>(
format!("{}/{}", self.api_url, path).as_str(),
body.as_str(),
Some(
vec![
("x-api-key".to_string(), self.api_key.clone()),
("user-agent".to_string(), "TIG API".to_string()),
("accept".to_string(), "application/json".to_string()),
("content-type".to_string(), "application/json".to_string()),
]
.into_iter()
.collect(),
),
)
.await?;
dejsonify::<T>(&resp).map_err(|e| anyhow!("Failed to dejsonify: {}", e))
}
pub async fn get_challenges(&self, req: GetChallengesReq) -> Result<GetChallengesResp> {
let mut query = HashMap::<String, String>::new();
query.insert("block_id".to_string(), req.block_id);
let query = QueryMap::from(query);
self.get(format!("get-challenges?{}", query.to_query_string()))
.await
}
pub async fn get_algorithms(&self, req: GetAlgorithmsReq) -> Result<GetAlgorithmsResp> {
let mut query = HashMap::<String, String>::new();
query.insert("block_id".to_string(), req.block_id);
let query = QueryMap::from(query);
self.get(format!("get-algorithms?{}", query.to_query_string()))
.await
}
pub async fn get_players(&self, req: GetPlayersReq) -> Result<GetPlayersResp> {
let mut query = HashMap::<String, String>::new();
query.insert("block_id".to_string(), req.block_id);
query.insert("player_type".to_string(), req.player_type.to_string());
let query = QueryMap::from(query);
self.get(format!("get-players?{}", query.to_query_string()))
.await
}
pub async fn get_benchmarks(&self, req: GetBenchmarksReq) -> Result<GetBenchmarksResp> {
let mut query = HashMap::<String, String>::new();
query.insert("block_id".to_string(), req.block_id);
query.insert("player_id".to_string(), req.player_id);
let query = QueryMap::from(query);
self.get(format!("get-benchmarks?{}", query.to_query_string()))
.await
}
pub async fn get_benchmark_data(
&self,
req: GetBenchmarkDataReq,
) -> Result<GetBenchmarkDataResp> {
let mut query = HashMap::<String, String>::new();
query.insert("benchmark_id".to_string(), req.benchmark_id);
let query = QueryMap::from(query);
self.get(format!("get-benchmark-data?{}", query.to_query_string()))
.await
}
pub async fn get_block(&self, req: GetBlockReq) -> Result<GetBlockResp> {
let mut query = HashMap::<String, String>::new();
if let Some(id) = req.id {
query.insert("id".to_string(), id);
}
if let Some(height) = req.height {
query.insert("height".to_string(), height.to_string());
}
if let Some(round) = req.round {
query.insert("round".to_string(), round.to_string());
}
let query = QueryMap::from(query);
self.get(format!("get-block?{}", query.to_query_string()))
.await
}
pub async fn submit_algorithm(&self, req: SubmitAlgorithmReq) -> Result<SubmitAlgorithmResp> {
self.post("submit-algorithm".to_string(), jsonify(&req))
.await
}
pub async fn submit_benchmark(&self, req: SubmitBenchmarkReq) -> Result<SubmitBenchmarkResp> {
self.post("submit-benchmark".to_string(), jsonify(&req))
.await
}
pub async fn submit_proof(&self, req: SubmitProofReq) -> Result<SubmitProofResp> {
self.post("submit-proof".to_string(), jsonify(&req)).await
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,310 +0,0 @@
use rand::{
distributions::{Distribution, WeightedIndex},
rngs::StdRng,
};
use tig_structs::core::*;
const PADDING_FACTOR: f32 = 0.2;
const DECAY: f32 = 0.7;
const INITIAL_SOLUTIONS_WEIGHT: f32 = 500.0;
const SOLUTIONS_MULTIPLIER: f32 = 10.0;
#[derive(Debug, Clone)]
pub struct Weights {
pub qualifier: f32,
pub solutions: f32,
pub within_range: bool,
}
impl Weights {
pub fn new() -> Self {
Self {
qualifier: 1.0,
solutions: INITIAL_SOLUTIONS_WEIGHT,
within_range: false,
}
}
}
#[derive(Debug, Clone)]
pub struct DifficultySampler {
pub min_difficulty: Vec<i32>,
pub padding: Vec<usize>,
pub dimensions: Vec<usize>,
pub weights: Vec<Vec<Weights>>,
pub distribution: Option<WeightedIndex<f32>>,
}
impl DifficultySampler {
pub fn new() -> Self {
Self {
min_difficulty: Vec::new(),
padding: Vec::new(),
dimensions: Vec::new(),
weights: Vec::new(),
distribution: None,
}
}
pub fn sample(&self, rng: &mut StdRng) -> Vec<i32> {
// samples an index from the distribution
let idx = self
.distribution
.clone()
.expect("You must update sampler first")
.sample(rng);
// convert index into difficulty
let num_cols = self.dimensions[1] + self.padding[1];
let x = idx / num_cols;
let y = idx % num_cols;
vec![
x as i32 + self.min_difficulty[0],
y as i32 + self.min_difficulty[1],
]
}
pub fn update_with_block_data(
&mut self,
min_difficulty: &Vec<i32>,
block_data: &ChallengeBlockData,
) {
assert_eq!(
min_difficulty.len(),
2,
"Only difficulty with 2 parameters are supported"
);
let left_pad = (0..2)
.into_iter()
.map(|i| match self.min_difficulty.get(i) {
Some(x) => x - min_difficulty[i],
None => 0,
})
.collect();
self.min_difficulty = min_difficulty.clone();
self.update_dimensions_and_padding(block_data);
let size = (0..2)
.into_iter()
.map(|i| self.dimensions[i] + self.padding[i])
.collect();
self.resize_weights(&left_pad, &size);
self.update_qualifier_weights(block_data);
self.update_valid_range(block_data);
self.update_distributions();
}
pub fn update_with_solutions(&mut self, difficulty: &Vec<i32>, num_solutions: u32) {
if difficulty[0] < self.min_difficulty[0]
|| difficulty[1] < self.min_difficulty[1]
|| difficulty[0]
>= self.min_difficulty[0] + self.dimensions[0] as i32 + self.padding[0] as i32
|| difficulty[1]
>= self.min_difficulty[1] + self.dimensions[1] as i32 + self.padding[1] as i32
{
return;
}
let (x, y) = (
(difficulty[0] - self.min_difficulty[0]) as usize,
(difficulty[1] - self.min_difficulty[1]) as usize,
);
for x_offset in 0..self.padding[0] {
for y_offset in 0..self.padding[1] {
let dist = ((x_offset as f32 / self.padding[0] as f32).powf(2.0)
+ (y_offset as f32 / self.padding[1] as f32).powf(2.0))
.sqrt();
if dist > 1.0 {
break;
}
let decay = dist * (1.0 - DECAY) + DECAY;
let delta = (1.0 - decay) * num_solutions as f32 * SOLUTIONS_MULTIPLIER;
if x + x_offset < self.weights.len() && y + y_offset < self.weights[x].len() {
self.weights[x + x_offset][y + y_offset].solutions *= decay;
self.weights[x + x_offset][y + y_offset].solutions += delta;
}
if x_offset != 0 && x >= x_offset && y + y_offset < self.weights[x].len() {
self.weights[x - x_offset][y + y_offset].solutions *= decay;
self.weights[x - x_offset][y + y_offset].solutions += delta;
}
if x + x_offset < self.weights.len() && y_offset != 0 && y >= y_offset {
self.weights[x + x_offset][y - y_offset].solutions *= decay;
self.weights[x + x_offset][y - y_offset].solutions += delta;
}
if x_offset != 0 && y_offset != 0 && x >= x_offset && y >= y_offset {
self.weights[x - x_offset][y - y_offset].solutions *= decay;
self.weights[x - x_offset][y - y_offset].solutions += delta;
}
}
}
}
fn update_valid_range(&mut self, block_data: &ChallengeBlockData) {
let mut lower_cutoff_points: Vec<Vec<usize>> = block_data
.base_frontier()
.iter()
.map(|x| {
vec![
(x[0] - self.min_difficulty[0]) as usize,
(x[1] - self.min_difficulty[1]) as usize,
]
})
.collect();
let mut upper_cutoff_points: Vec<Vec<usize>> = block_data
.scaled_frontier()
.iter()
.map(|x| {
vec![
(x[0] - self.min_difficulty[0]) as usize,
(x[1] - self.min_difficulty[1]) as usize,
]
})
.collect();
lower_cutoff_points.sort_by(|a, b| a[0].cmp(&b[0]));
upper_cutoff_points.sort_by(|a, b| a[0].cmp(&b[0]));
if *block_data.scaling_factor() < 1.0 {
(lower_cutoff_points, upper_cutoff_points) = (upper_cutoff_points, lower_cutoff_points);
}
let mut lower_cutoff_idx = 0;
let mut lower_cutoff = lower_cutoff_points.get(0).unwrap().clone();
let mut upper_cutoff_idx = 0;
let mut upper_cutoff1 = upper_cutoff_points.get(0).unwrap().clone();
let mut upper_cutoff2 = upper_cutoff_points.get(1).unwrap_or(&upper_cutoff1).clone();
for (i, row) in self.weights.iter_mut().enumerate() {
if lower_cutoff_idx + 1 < lower_cutoff_points.len()
&& i == lower_cutoff_points[lower_cutoff_idx + 1][0]
{
lower_cutoff = lower_cutoff_points[lower_cutoff_idx + 1].clone();
lower_cutoff_idx += 1;
}
if upper_cutoff_idx + 1 < upper_cutoff_points.len()
&& i == upper_cutoff_points[upper_cutoff_idx + 1][0]
{
upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx + 1].clone();
upper_cutoff2 = upper_cutoff_points
.get(upper_cutoff_idx + 2)
.unwrap_or(&upper_cutoff1)
.clone();
upper_cutoff_idx += 1;
}
for (j, w) in row.iter_mut().enumerate() {
let within_lower =
j > lower_cutoff[1] || (j == lower_cutoff[1] && i >= lower_cutoff[0]);
let within_upper = (j <= upper_cutoff2[1] && i <= upper_cutoff2[0])
|| (j < upper_cutoff1[1] && i < upper_cutoff2[0])
|| (j == upper_cutoff1[1] && i == upper_cutoff1[0]);
w.within_range = within_lower && within_upper;
}
}
}
fn update_distributions(&mut self) {
let mut distribution = Vec::<f32>::new();
for row in self.weights.iter() {
for w in row.iter() {
distribution.push(if w.within_range {
w.qualifier * w.solutions
} else {
0.0
});
}
}
self.distribution = Some(WeightedIndex::new(&distribution).unwrap());
}
fn update_qualifier_weights(&mut self, block_data: &ChallengeBlockData) {
let mut cutoff_points: Vec<Vec<usize>> = block_data
.cutoff_frontier()
.iter()
.map(|x| {
vec![
(x[0] - self.min_difficulty[0]) as usize,
(x[1] - self.min_difficulty[1]) as usize,
]
})
.collect();
cutoff_points.sort_by(|a, b| a[0].cmp(&b[0]));
let mut cutoff_idx = 0;
let mut cutoff = cutoff_points.get(0).unwrap_or(&vec![0, 0]).clone(); // every point is a qualifier if there is no cutoff
for (i, row) in self.weights.iter_mut().enumerate() {
if cutoff_idx + 1 < cutoff_points.len() && i == cutoff_points[cutoff_idx + 1][0] {
cutoff = cutoff_points[cutoff_idx + 1].clone();
cutoff_idx += 1;
}
for (j, w) in row.iter_mut().enumerate() {
w.qualifier *= 0.9;
if j > cutoff[1] || (j == cutoff[1] && i >= cutoff[0]) {
w.qualifier += 0.1;
}
}
}
}
fn resize_weights(&mut self, left_pad: &Vec<i32>, size: &Vec<usize>) {
if left_pad[0] > 0 {
self.weights
.splice(0..0, vec![Vec::new(); left_pad[0] as usize]);
} else if left_pad[0] < 0 {
self.weights.drain(0..(left_pad[0].abs() as usize));
}
if left_pad[1] > 0 {
let padding_vec = vec![Weights::new(); left_pad[1] as usize];
for row in self.weights.iter_mut() {
row.splice(0..0, padding_vec.clone());
}
} else if left_pad[1] < 0 {
for row in self.weights.iter_mut() {
row.drain(0..(left_pad[1].abs() as usize));
}
}
if self.weights.len() != size[0] {
self.weights.resize_with(size[0], || Vec::new());
}
for row in self.weights.iter_mut() {
if row.len() != size[1] {
row.resize(size[1], Weights::new());
}
}
}
fn update_dimensions_and_padding(&mut self, block_data: &ChallengeBlockData) {
let hardest_difficulty: Vec<i32> = (0..2)
.into_iter()
.map(|i| {
let v2 = block_data
.scaled_frontier()
.iter()
.map(|x| x[i])
.max()
.unwrap();
let v3 = block_data
.base_frontier()
.iter()
.map(|x| x[i])
.max()
.unwrap();
match block_data
.qualifier_difficulties()
.iter()
.map(|x| x[i])
.max()
{
Some(v1) => v1.max(v2).max(v3),
None => v2.max(v3),
}
})
.collect();
self.dimensions = (0..2)
.into_iter()
.map(|i| (hardest_difficulty[i] - self.min_difficulty[i] + 1) as usize)
.collect();
self.padding = self
.dimensions
.iter()
.map(|x| (*x as f32 * PADDING_FACTOR).ceil() as usize)
.collect();
}
}

View File

@ -1,37 +0,0 @@
use super::{api, Job, Result};
use moka::future::{Cache, CacheBuilder};
use once_cell::sync::OnceCell;
use tig_utils::get;
use tokio::sync::Mutex;
static CACHE: OnceCell<Mutex<Cache<String, Vec<u8>>>> = OnceCell::new();
pub async fn execute(job: &Job) -> Result<Vec<u8>> {
let cache = CACHE
.get_or_init(|| {
Mutex::new(
CacheBuilder::new(100)
.time_to_live(std::time::Duration::from_secs(7200))
.build(),
)
})
.lock()
.await;
if let Some(wasm_blob) = cache.get(&job.settings.algorithm_id).await {
Ok(wasm_blob)
} else {
let wasm = get::<Vec<u8>>(
&job.download_url,
Some(vec![
("user-agent".to_string(), "TIG Benchmarker v0.2".to_string()),
("x-api-key".to_string(), api().api_key.clone()),
]),
)
.await
.map_err(|e| format!("Failed to download wasm from {}: {:?}", job.download_url, e))?;
(*cache)
.insert(job.settings.algorithm_id.clone(), wasm.clone())
.await;
Ok(wasm)
}
}

View File

@ -1,36 +0,0 @@
use super::{state, Job, QueryData, Result, State};
use crate::utils::time;
pub async fn execute() -> Result<Option<Job>> {
let mut state = state().lock().await;
let State {
query_data,
pending_benchmark_jobs,
..
} = &mut *state;
let QueryData {
proofs,
benchmarks,
frauds,
..
} = &query_data;
let now = time();
let mut pending_benchmark_ids = pending_benchmark_jobs
.iter()
.filter(|(_, job)| now >= job.timestamps.submit)
.map(|(id, _)| id.clone())
.collect::<Vec<String>>();
pending_benchmark_ids.sort_by_key(|id| pending_benchmark_jobs[id].timestamps.submit);
for benchmark_id in pending_benchmark_ids {
let job = pending_benchmark_jobs.remove(&benchmark_id).unwrap();
if benchmarks.contains_key(&benchmark_id)
|| proofs.contains_key(&benchmark_id)
|| frauds.contains_key(&benchmark_id)
|| job.solutions_data.lock().await.len() == 0
{
continue;
}
return Ok(Some(job));
}
Ok(None)
}

View File

@ -1,60 +0,0 @@
use crate::utils::time;
use super::{state, Job, QueryData, Result, State, Timestamps};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
pub async fn execute(benchmark_duration_ms: u64) -> Result<Option<Job>> {
let State {
query_data,
submitted_proof_ids,
pending_proof_jobs,
..
} = &*state().lock().await;
let QueryData {
latest_block,
benchmarks,
proofs,
frauds,
download_urls,
..
} = query_data;
for (benchmark_id, benchmark) in benchmarks.iter() {
if !pending_proof_jobs.contains_key(benchmark_id)
&& !submitted_proof_ids.contains(benchmark_id)
&& !frauds.contains_key(benchmark_id)
&& !proofs.contains_key(benchmark_id)
&& benchmark.state.is_some()
{
let sampled_nonces = benchmark.state().sampled_nonces.clone().ok_or_else(|| {
format!(
"Expecting benchmark '{}' to have sampled_nonces",
benchmark_id
)
})?;
let start = time();
let end = start + benchmark_duration_ms;
let submit = end;
return Ok(Some(Job {
benchmark_id: benchmark.id.clone(),
download_url: download_urls
.get(&benchmark.settings.algorithm_id)
.cloned()
.ok_or_else(|| {
format!(
"Expecting download_url for algorithm '{}'",
benchmark.settings.algorithm_id
)
})?,
settings: benchmark.settings.clone(),
solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud
sampled_nonces: Some(sampled_nonces),
wasm_vm_config: latest_block.config().wasm_vm.clone(),
weight: 0.0,
timestamps: Timestamps { start, end, submit },
solutions_data: Arc::new(Mutex::new(HashMap::new())),
}));
}
}
Ok(None)
}

View File

@ -1,39 +0,0 @@
use super::{state, Job, QueryData, Result, State};
use crate::utils::time;
pub async fn execute() -> Result<Option<Job>> {
let mut state = state().lock().await;
let State {
query_data,
pending_proof_jobs,
..
} = &mut *state;
let QueryData {
proofs,
benchmarks,
frauds,
..
} = &query_data;
let now = time();
let mut pending_proof_ids = pending_proof_jobs
.iter()
.filter(|(_, job)| now >= job.timestamps.submit)
.map(|(id, _)| id.clone())
.collect::<Vec<String>>();
pending_proof_ids.sort_by_key(|id| pending_proof_jobs[id].timestamps.submit);
for benchmark_id in pending_proof_ids {
if let Some(sampled_nonces) = benchmarks
.get(&benchmark_id)
.and_then(|b| b.state.as_ref())
.and_then(|s| s.sampled_nonces.as_ref())
{
let mut job = pending_proof_jobs.remove(&benchmark_id).unwrap();
if proofs.contains_key(&benchmark_id) || frauds.contains_key(&benchmark_id) {
continue;
}
job.sampled_nonces = Some(sampled_nonces.clone());
return Ok(Some(job));
}
}
Ok(None)
}

View File

@ -1,466 +0,0 @@
mod difficulty_sampler;
pub mod download_wasm;
mod find_benchmark_to_submit;
mod find_proof_to_recompute;
mod find_proof_to_submit;
mod query_data;
pub mod select_job;
mod setup_jobs;
mod submit_benchmark;
mod submit_proof;
#[cfg(not(feature = "cuda"))]
pub mod run_benchmark;
#[cfg(feature = "cuda")]
#[path = "cuda_run_benchmark.rs"]
pub mod run_benchmark;
use crate::utils::{sleep, time, Result};
use difficulty_sampler::DifficultySampler;
use once_cell::sync::OnceCell;
use rand::{rngs::StdRng, Rng, SeedableRng};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tig_api::Api;
use tig_structs::{
config::{MinMaxDifficulty, WasmVMConfig},
core::*,
};
use tokio::{sync::Mutex, task::yield_now};
#[derive(Serialize, Clone, Debug)]
pub struct QueryData {
pub latest_block: Block,
pub player_data: Option<PlayerBlockData>,
pub challenges: Vec<Challenge>,
pub download_urls: HashMap<String, String>,
pub algorithms_by_challenge: HashMap<String, Vec<Algorithm>>,
pub benchmarks: HashMap<String, Benchmark>,
pub proofs: HashMap<String, Proof>,
pub frauds: HashMap<String, Fraud>,
}
pub struct NonceIterator {
start: u64,
current: u64,
end: u64,
}
impl NonceIterator {
pub fn new(start: u64, end: u64) -> Self {
NonceIterator {
start,
current: start,
end,
}
}
pub fn num_attempts(&self) -> u64 {
self.current - self.start
}
}
impl Iterator for NonceIterator {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.current < self.end {
let result = Some(self.current);
self.current += 1;
result
} else {
None
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Timestamps {
pub start: u64,
pub end: u64,
pub submit: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
pub download_url: String,
pub benchmark_id: String,
pub settings: BenchmarkSettings,
pub solution_signature_threshold: u32,
pub sampled_nonces: Option<Vec<u64>>,
pub wasm_vm_config: WasmVMConfig,
pub weight: f64,
pub timestamps: Timestamps,
#[serde(skip)]
pub solutions_data: Arc<Mutex<HashMap<u64, SolutionData>>>,
}
impl Job {
pub fn create_nonce_iterators(&self, num_workers: u32) -> Vec<Arc<Mutex<NonceIterator>>> {
match &self.sampled_nonces {
Some(sampled_nonces) => sampled_nonces
.iter()
.map(|&n| Arc::new(Mutex::new(NonceIterator::new(n, n + 1))))
.collect(),
None => {
let mut rng = StdRng::seed_from_u64(time());
let offset = u64::MAX / (num_workers as u64 + 1);
let random_offset = rng.gen_range(0..offset);
(0..num_workers)
.map(|i| {
let start = random_offset + offset * i as u64;
let end = start + offset;
Arc::new(Mutex::new(NonceIterator::new(start, end)))
})
.collect()
}
}
}
}
#[derive(Debug, Clone)]
pub struct State {
pub query_data: QueryData,
pub available_jobs: HashMap<String, Job>,
pub pending_benchmark_jobs: HashMap<String, Job>,
pub pending_proof_jobs: HashMap<String, Job>,
pub submitted_proof_ids: HashSet<String>,
pub difficulty_samplers: HashMap<String, DifficultySampler>,
}
static STATE: OnceCell<Mutex<State>> = OnceCell::new();
static API: OnceCell<Api> = OnceCell::new();
static PLAYER_ID: OnceCell<String> = OnceCell::new();
pub fn api() -> &'static Api {
API.get().expect("API should be initialised")
}
pub fn player_id() -> &'static String {
PLAYER_ID.get().expect("PLAYER_ID should be initialised")
}
pub fn state() -> &'static Mutex<State> {
STATE.get().expect("STATE should be initialised")
}
pub async fn proof_submitter() {
async fn do_work() -> Result<()> {
println!("[proof_submitter]: checking for any proofs to submit");
if let Some(mut job) = find_proof_to_submit::execute().await? {
println!(
"[proof_submitter]: submitting proof for benchmark {}",
job.benchmark_id
);
{
let mut state = (*state()).lock().await;
state.submitted_proof_ids.insert(job.benchmark_id.clone());
}
{
let solutions_data = job.solutions_data.lock().await;
if !job
.sampled_nonces
.as_ref()
.unwrap()
.iter()
.all(|n| solutions_data.contains_key(n))
{
return Err(format!("failed to find solutions for every sampled nonces"));
}
}
match submit_proof::execute(&job).await {
Ok(_) => {
println!(
"[proof_submitter]: successfully submitted proof for benchmark {}",
job.benchmark_id
);
}
Err(e) => {
println!("[proof_submitter]: failed to submit proof: {:?}", e);
// FIXME hacky way to check for 4xx status
if !e.contains("status: 4") {
println!("[proof_submitter]: re-queueing proof for another submit attempt 10s later");
job.timestamps.submit = time() + 10000;
let mut state = (*state()).lock().await;
state.submitted_proof_ids.remove(&job.benchmark_id);
state
.pending_proof_jobs
.insert(job.benchmark_id.clone(), job);
}
}
}
} else {
println!("[proof_submitter]: no proofs to submit");
}
Ok(())
}
loop {
if let Err(e) = do_work().await {
println!("[proof_submitter]: error: {:?}", e);
}
println!("[proof_submitter]: sleeping 5s");
sleep(5000).await;
}
}
pub async fn benchmark_submitter() {
async fn do_work() -> Result<()> {
println!("[benchmark_submitter]: checking for any benchmarks to submit");
if let Some(mut job) = find_benchmark_to_submit::execute().await? {
let num_solutions = {
let solutions_data = job.solutions_data.lock().await;
solutions_data.len()
};
println!(
"[benchmark_submitter]: submitting benchmark {:?} with {} solutions",
job.settings, num_solutions
);
match submit_benchmark::execute(&job).await {
Ok(benchmark_id) => {
job.benchmark_id = benchmark_id.clone();
job.timestamps.submit = time();
println!(
"[benchmark_submitter]: successfully submitted benchmark {}",
benchmark_id
);
{
let mut state = (*state()).lock().await;
state.pending_proof_jobs.insert(benchmark_id, job);
}
}
Err(e) => {
println!("[benchmark_submitter]: failed to submit benchmark: {:?}", e);
// FIXME hacky way to check for 4xx status
if !e.contains("status: 4") {
println!("[benchmark_submitter]: re-queueing benchmark for another submit attempt 10s later");
job.timestamps.submit = time() + 10000;
let mut state = (*state()).lock().await;
state
.pending_benchmark_jobs
.insert(job.benchmark_id.clone(), job);
}
}
}
} else {
println!("[benchmark_submitter]: no benchmarks to submit");
}
Ok(())
}
loop {
if let Err(e) = do_work().await {
println!("[benchmark_submitter]: error: {:?}", e);
}
println!("[benchmark_submitter]: sleeping 5s");
sleep(5000).await;
}
}
pub async fn data_fetcher() {
async fn do_work() -> Result<()> {
println!("[data_fetcher]: fetching latest data");
let new_query_data = query_data::execute().await?;
if {
let state = (*state()).lock().await;
state.query_data.latest_block.id != new_query_data.latest_block.id
} {
println!(
"[data_fetcher]: got new block {} @ {}",
new_query_data.latest_block.id, new_query_data.latest_block.details.height
);
{
let mut state = (*state()).lock().await;
(*state).query_data = new_query_data;
}
println!("[data_fetcher]: updating difficulty samplers");
{
let mut state = state().lock().await;
let State {
query_data,
difficulty_samplers,
..
} = &mut (*state);
for challenge in query_data.challenges.iter() {
let difficulty_sampler = difficulty_samplers
.entry(challenge.id.clone())
.or_insert_with(|| DifficultySampler::new());
let min_difficulty = query_data.latest_block.config().difficulty.parameters
[&challenge.id]
.min_difficulty();
difficulty_sampler
.update_with_block_data(&min_difficulty, challenge.block_data());
}
}
} else {
println!("[data_fetcher]: no new data");
}
Ok(())
}
loop {
if let Err(e) = do_work().await {
println!("[data_fetcher]: error: {:?}", e);
}
println!("[data_fetcher]: sleeping 10s");
sleep(10000).await;
}
}
pub async fn benchmarker(
selected_algorithms: HashMap<String, String>,
num_workers: u32,
benchmark_duration_ms: u64,
submit_delay_ms: u64,
) {
async fn do_work(
selected_algorithms: &HashMap<String, String>,
num_workers: u32,
benchmark_duration_ms: u64,
submit_delay_ms: u64,
) -> Result<()> {
println!("[benchmarker]: setting up jobs");
let jobs = setup_jobs::execute(selected_algorithms, benchmark_duration_ms, submit_delay_ms)
.await?;
for (i, job) in jobs.values().enumerate() {
println!(
"[benchmarker]: job {}: {:?}, weight: {}",
i, job.settings, job.weight
);
}
{
let mut state = state().lock().await;
state.available_jobs = jobs.clone();
}
println!("[benchmarker]: finding proofs to re-compute");
let job = {
if let Some(job) =
find_proof_to_recompute::execute(benchmark_duration_ms + 5000).await?
{
println!(
"[benchmarker]: found proof to recompute: {}",
job.benchmark_id
);
let mut state = state().lock().await;
state.query_data.proofs.insert(
job.benchmark_id.clone(),
Proof {
benchmark_id: job.benchmark_id.clone(),
state: None,
solutions_data: Some(Vec::new()),
},
);
job
} else {
println!("[benchmarker]: no proofs to recompute");
println!("[benchmarker]: weighted sampling one of the available jobs");
select_job::execute(&jobs).await?
}
};
println!(
"[benchmarker]: downloading algorithm {}",
job.download_url.split("/").last().unwrap()
);
let wasm = download_wasm::execute(&job).await?;
println!("[benchmarker]: starting benchmark {:?}", job.settings);
let nonce_iterators = job.create_nonce_iterators(num_workers);
run_benchmark::execute(nonce_iterators.clone(), &job, &wasm).await;
loop {
{
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
let num_solutions = job.solutions_data.lock().await.len();
let elapsed = time() - job.timestamps.start;
if num_workers > 0 || job.sampled_nonces.is_some() {
println!(
"[benchmarker]: #solutions: {}, #instances: {}, elapsed: {}ms",
num_solutions, num_attempts, elapsed
);
}
if time() >= job.timestamps.end
|| job
.sampled_nonces
.as_ref()
.is_some_and(|sampled_nonces| num_solutions == sampled_nonces.len())
{
break;
}
}
sleep(500).await;
}
{
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
let mut state = state().lock().await;
let num_solutions = job.solutions_data.lock().await.len() as u32;
if job.sampled_nonces.is_some() {
state
.pending_proof_jobs
.insert(job.benchmark_id.clone(), job);
} else if num_attempts > 0 {
state
.difficulty_samplers
.get_mut(&job.settings.challenge_id)
.unwrap()
.update_with_solutions(&job.settings.difficulty, num_solutions);
}
let jobs = state.available_jobs.drain().collect::<HashMap<_, _>>();
state.pending_benchmark_jobs.extend(jobs);
}
Ok(())
}
loop {
if let Err(e) = do_work(
&selected_algorithms,
num_workers,
benchmark_duration_ms,
submit_delay_ms,
)
.await
{
println!("[benchmarker]: error: {:?}", e);
println!("[benchmarker]: sleeping 5s");
sleep(5000).await;
} else {
yield_now().await;
}
}
}
pub async fn setup(api_url: String, api_key: String, player_id: String) {
API.get_or_init(|| Api::new(api_url, api_key));
PLAYER_ID.get_or_init(|| player_id);
let query_data = query_data::execute().await.expect("Failed to query data");
let mut difficulty_samplers = HashMap::new();
for challenge in query_data.challenges.iter() {
let difficulty_sampler = difficulty_samplers
.entry(challenge.id.clone())
.or_insert_with(|| DifficultySampler::new());
let min_difficulty =
query_data.latest_block.config().difficulty.parameters[&challenge.id].min_difficulty();
difficulty_sampler.update_with_block_data(&min_difficulty, challenge.block_data());
}
STATE.get_or_init(|| {
Mutex::new(State {
query_data,
difficulty_samplers,
available_jobs: HashMap::new(),
pending_benchmark_jobs: HashMap::new(),
pending_proof_jobs: HashMap::new(),
submitted_proof_ids: HashSet::new(),
})
});
}

View File

@ -1,151 +0,0 @@
use super::{api, player_id, QueryData, Result};
use once_cell::sync::OnceCell;
use std::collections::HashMap;
use tig_api::*;
use tig_structs::core::*;
use tokio::{join, sync::Mutex};
static CACHE: OnceCell<Mutex<HashMap<String, QueryData>>> = OnceCell::new();
pub async fn execute() -> Result<QueryData> {
let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new()));
let latest_block = query_latest_block().await?;
let latest_block_id = latest_block.id.clone();
let mut cache = cache.lock().await;
if !cache.contains_key(&latest_block.id) {
cache.clear();
let results = join!(
query_algorithms(latest_block.id.clone()),
query_player_data(latest_block.id.clone()),
query_benchmarks(latest_block.id.clone()),
query_challenges(latest_block.id.clone()),
);
let (algorithms_by_challenge, download_urls) = results.0?;
let player_data = results.1?;
let (benchmarks, proofs, frauds) = results.2?;
let challenges = results
.3?
.into_iter()
.filter(|c| {
c.state()
.round_active
.as_ref()
.is_some_and(|r| *r <= latest_block.details.round)
})
.collect();
cache.insert(
latest_block.id.clone(),
QueryData {
latest_block,
algorithms_by_challenge,
player_data,
download_urls,
benchmarks,
proofs,
frauds,
challenges,
},
);
}
Ok(cache.get(&latest_block_id).unwrap().clone())
}
async fn query_latest_block() -> Result<Block> {
let GetBlockResp { block, .. } = api()
.get_block(GetBlockReq {
id: None,
round: None,
height: None,
include_data: false,
})
.await
.map_err(|e| format!("Failed to query latest block: {:?}", e))?;
Ok(block.ok_or_else(|| format!("Expecting latest block to exist"))?)
}
async fn query_benchmarks(
block_id: String,
) -> Result<(
HashMap<String, Benchmark>,
HashMap<String, Proof>,
HashMap<String, Fraud>,
)> {
let GetBenchmarksResp {
benchmarks,
proofs,
frauds,
..
} = api()
.get_benchmarks(GetBenchmarksReq {
block_id: block_id.clone(),
player_id: player_id().clone(),
})
.await
.map_err(|e| format!("Failed to get benchmarks: {:?}", e))?;
Ok((
benchmarks.into_iter().map(|x| (x.id.clone(), x)).collect(),
proofs
.into_iter()
.map(|x| (x.benchmark_id.clone(), x))
.collect(),
frauds
.into_iter()
.map(|x| (x.benchmark_id.clone(), x))
.collect(),
))
}
async fn query_player_data(block_id: String) -> Result<Option<PlayerBlockData>> {
let GetPlayersResp { players, .. } = api()
.get_players(GetPlayersReq {
block_id: block_id.clone(),
player_type: PlayerType::Benchmarker,
})
.await
.map_err(|e| format!("Failed to query players: {:?}", e))?;
let player_id = player_id().clone();
match players.into_iter().find(|x| x.id == player_id) {
Some(player) => {
Ok(Some(player.block_data.ok_or_else(|| {
format!("Expecting player to have block_data")
})?))
}
None => Ok(None),
}
}
async fn query_challenges(block_id: String) -> Result<Vec<Challenge>> {
let GetChallengesResp { challenges, .. } = api()
.get_challenges(GetChallengesReq {
block_id: block_id.clone(),
})
.await
.map_err(|e| format!("Failed to query challenges: {:?}", e))?;
Ok(challenges)
}
async fn query_algorithms(
block_id: String,
) -> Result<(HashMap<String, Vec<Algorithm>>, HashMap<String, String>)> {
let GetAlgorithmsResp {
algorithms, wasms, ..
} = api()
.get_algorithms(GetAlgorithmsReq {
block_id: block_id.clone(),
})
.await
.map_err(|e| format!("Failed to query algorithms: {:?}", e))?;
let algorithms_by_challenge: HashMap<String, Vec<Algorithm>> =
algorithms.into_iter().fold(HashMap::new(), |mut acc, x| {
acc.entry(x.details.challenge_id.clone())
.or_default()
.push(x.clone());
acc
});
let download_urls = wasms
.into_iter()
.filter(|x| x.details.download_url.is_some())
.map(|x| (x.algorithm_id, x.details.download_url.unwrap()))
.collect();
Ok((algorithms_by_challenge, download_urls))
}

File diff suppressed because it is too large Load Diff

View File

@ -1,22 +0,0 @@
use std::collections::HashMap;
use super::{Job, Result};
use crate::utils::time;
use rand::{distributions::WeightedIndex, rngs::StdRng, SeedableRng};
use rand_distr::Distribution;
pub async fn execute(available_jobs: &HashMap<String, Job>) -> Result<Job> {
let benchmark_ids = available_jobs.keys().cloned().collect::<Vec<String>>();
let weights = benchmark_ids
.iter()
.map(|benchmark_id| available_jobs[benchmark_id].weight.clone())
.collect::<Vec<f64>>();
if weights.len() == 0 {
return Err("No jobs available".to_string());
}
let dist = WeightedIndex::new(&weights)
.map_err(|e| format!("Failed to create WeightedIndex: {}", e))?;
let mut rng = StdRng::seed_from_u64(time());
let index = dist.sample(&mut rng);
Ok(available_jobs[&benchmark_ids[index]].clone())
}

View File

@ -1,117 +0,0 @@
use super::{player_id, state, Job, QueryData, Result, State, Timestamps};
use crate::utils::time;
use rand::{rngs::StdRng, SeedableRng};
use std::{collections::HashMap, sync::Arc};
use tig_structs::core::*;
use tokio::sync::Mutex;
pub async fn execute(
selected_algorithms: &HashMap<String, String>,
benchmark_duration_ms: u64,
submit_delay_ms: u64,
) -> Result<HashMap<String, Job>> {
let State {
query_data,
difficulty_samplers,
..
} = &(*state().lock().await);
let QueryData {
latest_block,
player_data,
challenges,
download_urls,
algorithms_by_challenge,
..
} = query_data;
let challenge_weights = get_challenge_weights(player_data, challenges);
let start = time();
let end = start + benchmark_duration_ms;
let submit = end + submit_delay_ms;
let mut jobs = HashMap::new();
for (challenge_name, algorithm_name) in selected_algorithms.iter() {
let challenge = challenges
.iter()
.find(|c| c.details.name == *challenge_name)
.ok_or_else(|| {
format!(
"Your selected_algorithms contains a non-existent challenge '{}'",
challenge_name
)
})?;
let algorithm = algorithms_by_challenge[&challenge.id]
.iter()
.find(|a| download_urls.contains_key(&a.id) && a.details.name == *algorithm_name)
.ok_or_else(|| {
format!(
"Your selected_algorithms contains a non-existent algorithm '{}'",
algorithm_name
)
})?;
let mut rng = StdRng::seed_from_u64(time() as u64);
let difficulty = difficulty_samplers[&challenge.id].sample(&mut rng);
let job = Job {
benchmark_id: format!("{}_{}_{}", challenge_name, algorithm_name, time()),
download_url: download_urls.get(&algorithm.id).cloned().ok_or_else(|| {
format!("Expecting download_url for algorithm '{}'", algorithm.id)
})?,
settings: BenchmarkSettings {
player_id: player_id().clone(),
block_id: latest_block.id.clone(),
challenge_id: challenge.id.clone(),
algorithm_id: algorithm.id.clone(),
difficulty,
},
solution_signature_threshold: *challenge.block_data().solution_signature_threshold(),
sampled_nonces: None,
wasm_vm_config: latest_block.config().wasm_vm.clone(),
weight: challenge_weights[&challenge.id],
timestamps: Timestamps { start, end, submit },
solutions_data: Arc::new(Mutex::new(HashMap::new())),
};
jobs.insert(job.benchmark_id.clone(), job);
}
Ok(jobs)
}
fn get_challenge_weights(
player_data: &Option<PlayerBlockData>,
challenges: &Vec<Challenge>,
) -> HashMap<String, f64> {
let num_qualifiers_by_challenge = match player_data
.as_ref()
.map(|x| x.num_qualifiers_by_challenge.as_ref())
{
Some(Some(num_qualifiers_by_challenge)) => num_qualifiers_by_challenge.clone(),
_ => HashMap::new(),
};
let percent_qualifiers_by_challenge: HashMap<String, f64> = challenges
.iter()
.map(|c| {
let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0);
let challenge_num_qualifiers = *c.block_data().num_qualifiers();
let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 {
0f64
} else {
(player_num_qualifiers as f64) / (challenge_num_qualifiers as f64)
};
(c.id.clone(), percent)
})
.collect();
let max_percent_qualifiers = *percent_qualifiers_by_challenge
.values()
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap();
let challenge_weights = percent_qualifiers_by_challenge
.into_iter()
.map(|(challenge_id, p)| {
(
challenge_id,
4.0 * max_percent_qualifiers / 3.0 - p + 1e-10f64,
)
})
.collect();
challenge_weights
}

View File

@ -1,26 +0,0 @@
use super::{api, Job, Result};
use tig_api::SubmitBenchmarkReq;
use tig_structs::core::SolutionMetaData;
pub async fn execute(job: &Job) -> Result<String> {
let req = {
let solutions_data = job.solutions_data.lock().await;
SubmitBenchmarkReq {
settings: job.settings.clone(),
solutions_meta_data: solutions_data
.values()
.map(|x| SolutionMetaData::from(x.clone()))
.collect(),
solution_data: solutions_data.values().next().cloned().unwrap(),
}
};
match api().submit_benchmark(req.clone()).await {
Ok(resp) => {
return match resp.verified {
Ok(_) => Ok(resp.benchmark_id),
Err(e) => Err(format!("Benchmark flagged as fraud: {}", e)),
}
}
Err(e) => Err(format!("Failed to submit benchmark: {:?}", e)),
}
}

View File

@ -1,34 +0,0 @@
use std::collections::HashSet;
use super::{api, Job, Result};
use tig_api::SubmitProofReq;
pub async fn execute(job: &Job) -> Result<()> {
let req = {
let solutions_data = job.solutions_data.lock().await;
let sampled_nonces: HashSet<u64> = job
.sampled_nonces
.as_ref()
.expect("Expected sampled nonces")
.iter()
.cloned()
.collect();
SubmitProofReq {
benchmark_id: job.benchmark_id.clone(),
solutions_data: solutions_data
.iter()
.filter(|(n, _)| sampled_nonces.contains(n))
.map(|(_, s)| s.clone())
.collect(),
}
};
match api().submit_proof(req.clone()).await {
Ok(resp) => {
return match resp.verified {
Ok(_) => Ok(()),
Err(e) => Err(format!("Proof flagged as fraud: {}", e)),
}
}
Err(e) => Err(format!("Failed to submit proof: {:?}", e)),
}
}

View File

@ -1,334 +0,0 @@
mod benchmarker;
mod utils;
use benchmarker::{Job, State};
use clap::{value_parser, Arg, Command};
use std::collections::HashMap;
use tig_structs::core::*;
use tig_utils::{dejsonify, get, jsonify, post};
use tokio::{spawn, task::yield_now};
use utils::{sleep, time, Result};
use warp::Filter;
fn cli() -> Command {
Command::new("TIG Benchmarker")
.about("Standalone benchmarker")
.arg_required_else_help(true)
.arg(
Arg::new("PLAYER_ID")
.help("Your wallet address")
.required(true)
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("API_KEY")
.help("Your API Key")
.required(true)
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("SELECTED_ALGORITHMS")
.help("Json string with your algorithm selection")
.required(true)
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("workers")
.long("workers")
.help("(Optional) Set number of workers")
.default_value("4")
.value_parser(value_parser!(u32)),
)
.arg(
Arg::new("duration")
.long("duration")
.help("(Optional) Set duration (in milliseconds) of a benchmark")
.default_value("10000")
.value_parser(value_parser!(u32)),
)
.arg(
Arg::new("delay")
.long("delay")
.help("(Optional) Set delay (in milliseconds) between benchmark finishing and submitting")
.default_value("5000")
.value_parser(value_parser!(u32)),
)
.arg(
Arg::new("api")
.long("api")
.help("(Optional) Set api_url")
.default_value("https://mainnet-api.tig.foundation")
.value_parser(value_parser!(String)),
)
.arg(
Arg::new("port")
.long("port")
.help("(Optional) Set port for cluster communication")
.default_value("5115")
.value_parser(value_parser!(u16)),
)
.arg(
Arg::new("master")
.long("master")
.help("(Optional) Set hostname for master node to connect to")
.value_parser(value_parser!(String)),
)
}
#[tokio::main]
async fn main() {
let matches = cli().get_matches();
let selected_algorithms = matches
.get_one::<String>("SELECTED_ALGORITHMS")
.unwrap()
.clone();
let num_workers = *matches.get_one::<u32>("workers").unwrap();
let port = *matches.get_one::<u16>("port").unwrap();
let duration = *matches.get_one::<u32>("duration").unwrap();
let delay = *matches.get_one::<u32>("delay").unwrap();
let api_url = matches.get_one::<String>("api").unwrap().clone();
let api_key = matches.get_one::<String>("API_KEY").unwrap().clone();
let player_id = matches.get_one::<String>("PLAYER_ID").unwrap().clone();
if let Some(master) = matches.get_one::<String>("master") {
slave(
api_url,
api_key,
player_id,
master,
port,
num_workers,
selected_algorithms,
)
.await;
} else {
master(
api_url,
api_key,
player_id,
num_workers,
duration,
delay,
selected_algorithms,
port,
)
.await
}
}
async fn slave(
api_url: String,
api_key: String,
player_id: String,
master: &String,
port: u16,
num_workers: u32,
selected_algorithms: String,
) {
println!("[slave] setting up");
benchmarker::setup(api_url, api_key, player_id).await;
println!(
"[slave] parsing selected algorithms from: {:?}",
selected_algorithms
);
let selected_algorithms = serde_json::from_str::<HashMap<String, String>>(&selected_algorithms)
.unwrap_or_else(|err| panic!("Failed to parse {:?}: {}", selected_algorithms, err));
async fn do_work(
master_url: &String,
num_workers: u32,
selected_algorithms: &HashMap<String, String>,
) -> Result<()> {
println!("[slave] fetching jobs from master at: {}", master_url);
let available_jobs = match get::<String>(&format!("{}/jobs", master_url), None).await {
Ok(resp) => dejsonify::<HashMap<String, Job>>(&resp).unwrap(),
Err(e) => {
return Err(format!("failed to fetch jobs from master: {:?}", e));
}
};
println!("[slave] fetched {} jobs", available_jobs.len());
for (i, job) in available_jobs.values().enumerate() {
println!(
"[slave] job {}: {:?}, weight: {}",
i, job.settings, job.weight
);
}
println!(
"[slave] filtering jobs that match selected_algorithms: {:?}",
selected_algorithms
);
let now = time();
let filtered_jobs: HashMap<String, Job> = available_jobs
.into_iter()
.filter(|(benchmark_id, job)| {
now < job.timestamps.end
&& selected_algorithms
.iter()
.any(|(challenge_name, algorithm_name)| {
benchmark_id
.starts_with(&format!("{}_{}", challenge_name, algorithm_name))
})
})
.collect();
if filtered_jobs.len() == 0 {
return Err("no jobs matching selected_algorithms".to_string());
}
let job = benchmarker::select_job::execute(&filtered_jobs).await?;
println!(
"[slave]: downloading algorithm {}",
job.download_url.split("/").last().unwrap()
);
let wasm = benchmarker::download_wasm::execute(&job).await?;
println!("[slave]: starting benchmark {:?}", job.settings);
let nonce_iterators = job.create_nonce_iterators(num_workers);
benchmarker::run_benchmark::execute(nonce_iterators.clone(), &job, &wasm).await;
let start = time();
while time() < job.timestamps.end {
{
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
let num_solutions = job.solutions_data.lock().await.len() as u32;
let elapsed = time() - job.timestamps.start;
println!(
"[slave]: #solutions: {}, #instances: {}, elapsed: {}ms",
num_solutions, num_attempts, elapsed
);
}
sleep(500).await;
}
let elapsed = time() - start;
if elapsed < 2500 {
println!("[slave]: sleeping for {}ms", 2500 - elapsed);
sleep(2500 - elapsed).await;
}
let mut num_attempts = 0;
for nonce_iterator in &nonce_iterators {
num_attempts += nonce_iterator.lock().await.num_attempts();
}
if num_attempts > 0 {
let solutions_data = &*job.solutions_data.lock().await;
println!(
"[slave]: posting {} solutions to master",
solutions_data.len()
);
post::<String>(
&format!("{}/solutions_data/{}", master_url, job.benchmark_id),
&jsonify(&solutions_data),
Some(vec![(
"Content-Type".to_string(),
"application/json".to_string(),
)]),
)
.await
.map_err(|e| format!("failed to post solutions to master: {:?}", e))?;
}
Ok(())
}
let master_url = format!("http://{}:{}", master, port);
loop {
if let Err(e) = do_work(&master_url, num_workers, &selected_algorithms).await {
println!("[slave]: error: {:?}", e);
println!("[slave]: sleeping 5s");
sleep(5000).await;
}
yield_now().await;
}
}
async fn master(
api_url: String,
api_key: String,
player_id: String,
num_workers: u32,
duration: u32,
delay: u32,
selected_algorithms: String,
port: u16,
) {
println!(
"[master] parsing selected algorithms from: {:?}",
selected_algorithms
);
let selected_algorithms = serde_json::from_str::<HashMap<String, String>>(&selected_algorithms)
.unwrap_or_else(|err| panic!("Failed to parse {:?}: {}", selected_algorithms, err));
println!("[master] setting up");
benchmarker::setup(api_url, api_key, player_id).await;
println!("[master] starting data_fetcher");
spawn(async {
benchmarker::data_fetcher().await;
});
println!("[master] starting benchmark_submitter");
spawn(async {
benchmarker::benchmark_submitter().await;
});
println!("[master] starting proof_submitter");
spawn(async {
benchmarker::proof_submitter().await;
});
println!("[master] starting benchmarker");
spawn(async move {
benchmarker::benchmarker(
selected_algorithms,
num_workers,
duration as u64,
delay as u64,
)
.await;
});
println!("[master] starting webserver on port {}", port);
spawn(async move {
let get_jobs = warp::path("jobs").and(warp::get()).and_then(|| async {
println!("[master] slave node fetching jobs",);
let state = (*benchmarker::state()).lock().await;
Ok::<_, warp::Rejection>(warp::reply::json(&state.available_jobs))
});
let post_solutions_data = warp::path!("solutions_data" / String)
.and(warp::post())
.and(warp::body::json())
.and_then(
|benchmark_id: String, solutions_data: HashMap<u64, SolutionData>| async move {
{
let num_solutions = solutions_data.len() as u32;
println!("[master] received {} solutions from slave", num_solutions,);
let mut state = (*benchmarker::state()).lock().await;
let State {
available_jobs,
pending_benchmark_jobs,
difficulty_samplers,
..
} = &mut *state;
if let Some(job) = available_jobs
.get_mut(&benchmark_id)
.or_else(|| pending_benchmark_jobs.get_mut(&benchmark_id))
{
println!("[master] adding solutions to benchmark {:?}", job.settings,);
difficulty_samplers
.get_mut(&job.settings.challenge_id)
.unwrap()
.update_with_solutions(&job.settings.difficulty, num_solutions);
job.solutions_data.lock().await.extend(solutions_data);
} else {
println!("[master] failed to find benchmark to add solutions to",);
}
}
Ok::<_, warp::Rejection>(warp::reply::with_status(
"solutions received",
warp::http::StatusCode::OK,
))
},
);
warp::serve(get_jobs.or(post_solutions_data))
.run(([0, 0, 0, 0], port))
.await;
});
loop {
sleep(30000).await;
}
}

View File

@ -1,15 +0,0 @@
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time;
pub type Result<T> = std::result::Result<T, String>;
pub async fn sleep(ms: u64) {
time::sleep(time::Duration::from_millis(ms)).await;
}
pub fn time() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}

View File

@ -1,175 +0,0 @@
use crate::{core::*, serializable_struct_with_getters};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
serializable_struct_with_getters! {
RequestApiKeyReq {
signature: String,
address: String,
gnosis_safe_setup_tx_hash: Option<String>,
}
}
serializable_struct_with_getters! {
RequestApiKeyResp {
api_key: String,
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum PlayerType {
Benchmarker,
Innovator,
}
impl PlayerType {
pub fn to_string(self) -> String {
match self {
PlayerType::Benchmarker => "benchmarker".to_string(),
PlayerType::Innovator => "innovator".to_string(),
}
}
pub fn from_string(s: String) -> Result<Self, String> {
match s.as_str() {
"benchmarker" => Ok(PlayerType::Benchmarker),
"innovator" => Ok(PlayerType::Innovator),
_ => Err("Invalid PlayerType".to_string()),
}
}
}
serializable_struct_with_getters! {
GetPlayersReq {
block_id: String,
player_type: PlayerType,
}
}
serializable_struct_with_getters! {
GetPlayersResp {
block_id: String,
block_details: BlockDetails,
players: Vec<Player>,
}
}
serializable_struct_with_getters! {
GetBlockReq {
id: Option<String>,
round: Option<u32>,
height: Option<u32>,
include_data: bool,
}
}
serializable_struct_with_getters! {
GetBlockResp {
block: Option<Block>,
}
}
serializable_struct_with_getters! {
GetChallengesReq {
block_id: String,
}
}
serializable_struct_with_getters! {
GetChallengesResp {
block_id: String,
block_details: BlockDetails,
challenges: Vec<Challenge>,
}
}
serializable_struct_with_getters! {
GetAlgorithmsReq {
block_id: String,
}
}
serializable_struct_with_getters! {
GetAlgorithmsResp {
block_id: String,
block_details: BlockDetails,
algorithms: Vec<Algorithm>,
wasms: Vec<Wasm>,
}
}
serializable_struct_with_getters! {
GetBenchmarksReq {
block_id: String,
player_id: String,
}
}
serializable_struct_with_getters! {
GetBenchmarksResp {
block_id: String,
block_details: BlockDetails,
precommits: Vec<Precommit>,
benchmarks: Vec<Benchmark>,
proofs: Vec<Proof>,
frauds: Vec<Fraud>,
}
}
serializable_struct_with_getters! {
GetBenchmarkDataReq {
benchmark_id: String,
}
}
serializable_struct_with_getters! {
GetBenchmarkDataResp {
precommit: Option<Precommit>,
benchmark: Option<Benchmark>,
proof: Option<Proof>,
fraud: Option<Fraud>,
}
}
serializable_struct_with_getters! {
SubmitBenchmarkReq {
settings: BenchmarkSettings,
merkle_root: String,
solution_nonces: HashSet<u64>,
}
}
serializable_struct_with_getters! {
SubmitBenchmarkResp {
benchmark_id: String,
verified: Result<(), String>,
}
}
serializable_struct_with_getters! {
SubmitProofReq {
benchmark_id: String,
merkle_data: Vec<MerkleProof>,
}
}
serializable_struct_with_getters! {
SubmitProofResp {
verified: Result<(), String>,
}
}
serializable_struct_with_getters! {
SubmitAlgorithmReq {
name: String,
challenge_id: String,
tx_hash: String,
code: String,
}
}
serializable_struct_with_getters! {
SubmitAlgorithmResp {
algorithm_id: String,
}
}

View File

@ -1,4 +1,3 @@
pub mod api;
pub mod config;
pub mod core;