Update tig-benchmarker to be easier to configure

This commit is contained in:
FiveMovesAhead 2024-10-13 11:02:54 +01:00
parent 1edbcebf05
commit e99eca7028
12 changed files with 793 additions and 1208 deletions

View File

@ -25,12 +25,13 @@ Python scripts that implements a master/slave Benchmarker for TIG.
# in tig-benchmarker folder
pip install -r requirements.txt
```
8. Run a master
8. Edit config.json with your `player_id` and `api_key`
9. Run a master
```
# in tig-benchmarker folder
python3 master.py <player_id> <api_key> <path to config.json>
python3 master.py <path to config.json>
```
9. Connect at least 1 slave to your master
10. Connect at least 1 slave to your master
```
python3 slave.py <master_ip> <path to tig-worker>
```
@ -52,40 +53,33 @@ Python scripts that implements a master/slave Benchmarker for TIG.
# Optimising your Config
1. Your master should generate enough batches for your slaves to be always busy. But at the same time, you should minimise the delay between your precommit and proof submissions. Observe the elapsed ms of your benchmarks and adjust parameters accordingly to target a particular duration per benchmark (e.g. 15s)
* `max_unresolved_precommits` - how many precommits can be in progress
* `num_nonces` - number of nonces for a precommit for a particular challenge
* Increasing/decreasing above will lead to more/less batches in your backlog
1. `difficulty_sampler_config` allows you to set the `difficulty_range` for each challenge.
* Every block, each challenge recalculates its `base_frontier` and `scaled_frontier`
* The difficulties within these 2 frontiers are "sorted" into easiest to hardest (0.0 is easiest, 1.0 is hardest)
* Benchmarkers can set the `difficulty_range` from which to sample a difficulty. Examples:
* `[0.0, 1.0]` samples the full range of valid difficulties
* `[0.0, 0.1]` samples the easiest 10% of valid difficulties
* Key consideration: easier difficulties may result in more solutions given the same compute, but might not be a qualifier for long if the frontiers get harder
2. You want your slaves to be at 100% CPU utilization, but at the same time, batches should be "cheap" to repeat if a slave has issues. Observe the elapsed ms of batches and adjust parameters accordingly to target a particular duration per batch (e.g. 2s)
* `batch_size` - how many nonces are processed in a batch for a particular challenge. Must be a power of 2
2. `job_manager_config` allows you to set the `batch_size` for each challenge.
* `batch_size` is the number of nonces that are part of a batch. Must be a power of 2
* Recommend to pick a `batch_size` for your slave with lowest `num_workers` such that it takes a few seconds to compute (e.g. 5 seconds)
* `batch_size` shouldn't be too small, or else network latency between master and slave will affect performance
* To support slaves with different `num_workers`, see `slave_manager_config` below
3. Slaves on different compute may be better suited for certain challenges. You can define the challenge selection for a slave based on name patterns (patterns are checked in order):
* Any slave, any challenge (this should be the last entry)
```
{
"name_regex": ".*",
"challenge_selection": null
}
```
* Example: only vector_search
```
{
"name_regex": "vector_search-slave-.*",
"challenge_selection": ["vector_search"]
}
```
3. `precommit_manager_config` allows you to control your benchmarks:
* `max_pending_benchmarks` is the maximum number of pending benchmarks
* Key consideration: you want batches to always be available for your slaves, but at the same time if you submit benchmarks too slowly, it will have large delays before it will be active
* `num_nonces` is the number of nonces to compute per benchmark. Recommend to adjust based on the logs which tells you the average number of nonces to find a solution. Example log:
* `global qualifier difficulty stats for vehicle_routing: (#nonces: 43739782840, #solutions: 22376, avg_nonces_per_solution: 1954763)`
* `weight` affects how likely the challenge will be picked (weight of 0 will never be picked). Recommend to adjust if the logs warns you to benchmark a specific challenge to increase your cutoff. Example log:
* `recommend finding more solutions for challenge knapsack to avoid being cut off`
4. By default, the benchmarker uses smart challenge selection to maximise your cutoff and to minimise your imbalance. However, if you want to control the chance of a particular challenge being selected, you can set the weight in the algo_selection:
* Example:
```
"vehicle_routing": {
"algorithm": "clarke_wright",
"num_nonces": 1000,
"base_fee_limit": "10000000000000000",
"weight": 1.0 <--- weight is relative to other challenges. If other challenge's weights are null/undefined, then this challenge will always be picked
}
```
4. `slave_manager_config` allows you to control your slaves:
* When a slave makes a request, the manager iterates through each slave config one at a time until it finds a regex match. The most specific regexes should be earlier in the list, and the more general regexes should be latter in the list.
* `max_concurrent_batches` determines how many batches of that challenge a slave can fetch & process concurrently
* `max_concurrent_batches` also serves as a whitelist of challenges for that slave. If you don't want a slave to benchmark a specific challenge, remove its entry from the list. Example:
* `{"vector_search": 1}` means the slave will only be given `vector_search` batches
# Master
@ -98,20 +92,15 @@ The master does no benchmarking! You need to connect slaves
## Usage
```
usage: master.py [-h] [--api API] [--backup BACKUP] [--port PORT] [--verbose] player_id api_key config_path
usage: master.py [-h] [--verbose] config_path
TIG Benchmarker
positional arguments:
player_id Player ID
api_key API Key
config_path Path to the configuration JSON file
options:
-h, --help show this help message and exit
--api API API URL (default: https://mainnet-api.tig.foundation)
--backup BACKUP Folder to save pending submissions and other data
--port PORT Port to run the server on (default: 5115)
--verbose Print debug logs
```

View File

@ -1,59 +1,81 @@
{
"extensions": [
"data_fetcher",
"difficulty_sampler",
"submissions_manager",
"job_manager",
"precommit_manager",
"slave_manager"
],
"config": {
"job_manager": {
"satisfiability": {
"batch_size": 128
},
"vehicle_routing": {
"batch_size": 128
},
"knapsack": {
"batch_size": 128
},
"vector_search": {
"batch_size": 4
}
},
"precommit_manager": {
"max_unresolved_precommits": 5,
"algo_selection": {
"satisfiability": {
"algorithm": "schnoing",
"num_nonces": 1000,
"base_fee_limit": "10000000000000000"
},
"vehicle_routing": {
"algorithm": "clarke_wright",
"num_nonces": 1000,
"base_fee_limit": "10000000000000000"
},
"knapsack": {
"algorithm": "dynamic",
"num_nonces": 1000,
"base_fee_limit": "10000000000000000"
},
"vector_search": {
"algorithm": "optimal_ann",
"num_nonces": 16,
"base_fee_limit": "10000000000000000"
}
}
},
"slave_manager": {
"slaves": [
{
"name_regex": ".*",
"challenge_selection": null
}
"player_id": "0x0000000000000000000000000000000000000000",
"api_key": "00000000000000000000000000000000",
"api_url": "https://mainnet-api.tig.foundation",
"difficulty_sampler_config": {
"difficulty_ranges": {
"satisfiability": [
0.0,
0.5
],
"vehicle_routing": [
0.0,
0.5
],
"knapsack": [
0.0,
0.5
],
"vector_search": [
0.0,
0.5
]
}
},
"job_manager_config": {
"backup_folder": "jobs",
"batch_sizes": {
"satisfiability": 1024,
"vehicle_routing": 1024,
"knapsack": 1024,
"vector_search": 1024
}
},
"submissions_manager_config": {
"time_between_retries": 60000
},
"precommit_manager_config": {
"max_pending_benchmarks": 4,
"algo_selection": {
"satisfiability": {
"algorithm": "schnoing",
"num_nonces": 1000,
"weight": 1.0,
"base_fee_limit": "10000000000000000"
},
"vehicle_routing": {
"algorithm": "clarke_wright",
"num_nonces": 1000,
"weight": 1.0,
"base_fee_limit": "10000000000000000"
},
"knapsack": {
"algorithm": "dynamic",
"num_nonces": 1000,
"weight": 1.0,
"base_fee_limit": "10000000000000000"
},
"vector_search": {
"algorithm": "optimal_ann",
"num_nonces": 1000,
"weight": 1.0,
"base_fee_limit": "10000000000000000"
}
}
},
"slave_manager_config": {
"port": 5115,
"time_before_batch_retry": 60000,
"slaves": [
{
"name_regex": ".*",
"max_concurrent_batches": {
"satisfiability": 1,
"vehicle_routing": 1,
"knapsack": 1,
"vector_search": 1
}
}
]
}
}

View File

@ -1,59 +1,63 @@
import signal
import argparse
import asyncio
import importlib
import json
import logging
import os
import time
from tig_benchmarker.event_bus import process_events, emit
from tig_benchmarker.extensions.data_fetcher import *
from tig_benchmarker.extensions.difficulty_sampler import *
from tig_benchmarker.extensions.job_manager import *
from tig_benchmarker.extensions.precommit_manager import *
from tig_benchmarker.extensions.slave_manager import *
from tig_benchmarker.extensions.submissions_manager import *
from tig_benchmarker.utils import FromDict
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
async def main(
player_id: str,
api_key: str,
config: dict,
backup_folder: str,
api_url: str,
port: int
):
exit_event = asyncio.Event()
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, exit_event.set)
loop.add_signal_handler(signal.SIGTERM, exit_event.set)
extensions = {}
for ext_name in config["extensions"]:
logger.info(f"loading extension {ext_name}")
module = importlib.import_module(f"tig_benchmarker.extensions.{ext_name}")
extensions[ext_name] = module.Extension(
player_id=player_id,
api_key=api_key,
api_url=api_url,
backup_folder=backup_folder,
port=port,
exit_event=exit_event,
**config["config"]
)
@dataclass
class Config(FromDict):
player_id: str
api_key: str
api_url: str
difficulty_sampler_config: DifficultySamplerConfig
job_manager_config: JobManagerConfig
precommit_manager_config: PrecommitManagerConfig
slave_manager_config: SlaveManagerConfig
submissions_manager_config: SubmissionsManagerConfig
last_update = time.time()
while not exit_event.is_set():
now = time.time()
if now - last_update > 1:
last_update = now
await emit('update')
await process_events(extensions)
await asyncio.sleep(0.1)
async def main(config: Config):
last_block_id = None
jobs = []
data_fetcher = DataFetcher(config.api_url, config.player_id)
difficulty_sampler = DifficultySampler(config.difficulty_sampler_config)
job_manager = JobManager(config.job_manager_config, jobs)
precommit_manager = PrecommitManager(config.precommit_manager_config, config.player_id, jobs)
submissions_manager = SubmissionsManager(config.submissions_manager_config, config.api_url, config.api_key, jobs)
slave_manager = SlaveManager(config.slave_manager_config, jobs)
slave_manager.start()
while True:
try:
data = await data_fetcher.run()
if data["block"].id != last_block_id:
last_block_id = data["block"].id
difficulty_sampler.on_new_block(**data)
job_manager.on_new_block(**data)
precommit_manager.on_new_block(**data)
job_manager.run()
samples = difficulty_sampler.run()
submit_precommit_req = precommit_manager.run(samples)
submissions_manager.run(submit_precommit_req)
except Exception as e:
import traceback
traceback.print_exc()
logger.error(f"{e}")
finally:
await asyncio.sleep(5)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Benchmarker")
parser.add_argument("player_id", help="Player ID")
parser.add_argument("api_key", help="API Key")
parser.add_argument("config_path", help="Path to the configuration JSON file")
parser.add_argument("--api", default="https://mainnet-api.tig.foundation", help="API URL (default: https://mainnet-api.tig.foundation)")
parser.add_argument("--backup", default="backup", help="Folder to save pending submissions and other data")
parser.add_argument("--port", type=int, default=5115, help="Port to run the server on (default: 5115)")
parser.add_argument("--verbose", action='store_true', help="Print debug logs")
args = parser.parse_args()
@ -68,8 +72,5 @@ if __name__ == "__main__":
sys.exit(1)
with open(args.config_path, "r") as f:
config = json.load(f)
if not os.path.exists(args.backup):
logger.info(f"creating backup folder at {args.backup}")
os.makedirs(args.backup, exist_ok=True)
asyncio.run(main(args.player_id, args.api_key, config, args.backup, args.api, args.port))
config = Config.from_dict(config)
asyncio.run(main(config))

View File

@ -3,7 +3,8 @@ import json
import os
import logging
import randomname
import requests
import aiohttp
import asyncio
import subprocess
import time
@ -12,7 +13,71 @@ logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
def now():
return int(time.time() * 1000)
def main(
async def download_wasm(session, download_url, wasm_path):
if not os.path.exists(wasm_path):
start = now()
logger.info(f"downloading WASM from {download_url}")
async with session.get(download_url) as resp:
if resp.status != 200:
raise Exception(f"status {resp.status} when downloading WASM: {await resp.text()}")
with open(wasm_path, 'wb') as f:
f.write(await resp.read())
logger.debug(f"downloading WASM: took {now() - start}ms")
logger.debug(f"WASM Path: {wasm_path}")
async def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers):
start = now()
cmd = [
tig_worker_path, "compute_batch",
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
wasm_path,
"--mem", str(batch["wasm_vm_config"]["max_memory"]),
"--fuel", str(batch["wasm_vm_config"]["max_fuel"]),
"--workers", str(num_workers),
]
if batch["sampled_nonces"]:
cmd += ["--sampled", *map(str, batch["sampled_nonces"])]
logger.info(f"computing batch: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"tig-worker failed: {stderr.decode()}")
result = json.loads(stdout.decode())
logger.info(f"computing batch took {now() - start}ms")
logger.debug(f"batch result: {result}")
return result
async def process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers):
try:
batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}"
logger.info(f"Processing batch {batch_id}: {batch}")
# Step 2: Download WASM
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
await download_wasm(session, batch['download_url'], wasm_path)
# Step 3: Run tig-worker
result = await run_tig_worker(tig_worker_path, batch, wasm_path, num_workers)
# Step 4: Submit results
start = now()
submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}"
logger.info(f"posting results to {submit_url}")
async with session.post(submit_url, json=result, headers=headers) as resp:
if resp.status != 200:
raise Exception(f"status {resp.status} when posting results to master: {await resp.text()}")
logger.debug(f"posting results took {now() - start} ms")
except Exception as e:
logger.error(f"Error processing batch {batch_id}: {e}")
async def main(
master_ip: str,
tig_worker_path: str,
download_wasms_folder: str,
@ -28,67 +93,29 @@ def main(
"User-Agent": slave_name
}
while True:
try:
# Step 1: Query for job
start = now()
get_batch_url = f"http://{master_ip}:{master_port}/get-batch"
logger.info(f"fetching job from {get_batch_url}")
resp = requests.get(get_batch_url, headers=headers)
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when fetching job: {resp.text}")
logger.debug(f"fetching job: took {now() - start}ms")
batch = resp.json()
batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}"
logger.debug(f"batch {batch_id}: {batch}")
# Step 2: Download WASM
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
if not os.path.exists(wasm_path):
async with aiohttp.ClientSession() as session:
while True:
try:
# Step 1: Query for job
start = now()
logger.info(f"downloading WASM from {batch['download_url']}")
resp = requests.get(batch['download_url'])
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}")
with open(wasm_path, 'wb') as f:
f.write(resp.content)
logger.debug(f"downloading WASM: took {now() - start}ms")
logger.debug(f"WASM Path: {wasm_path}")
# Step 3: Run tig-worker
start = now()
cmd = [
tig_worker_path, "compute_batch",
json.dumps(batch["settings"]),
batch["rand_hash"],
str(batch["start_nonce"]),
str(batch["num_nonces"]),
str(batch["batch_size"]),
wasm_path,
"--mem", str(batch["wasm_vm_config"]["max_memory"]),
"--fuel", str(batch["wasm_vm_config"]["max_fuel"]),
"--workers", str(num_workers),
]
if batch["sampled_nonces"]:
cmd += ["--sampled", *map(str, batch["sampled_nonces"])]
logger.info(f"computing batch: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
result = json.loads(result.stdout)
logger.info(f"computing batch took {now() - start}ms")
logger.debug(f"batch result: {result}")
# Step 4: Submit results
start = now()
submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}"
logger.info(f"posting results to {submit_url}")
resp = requests.post(submit_url, json=result, headers=headers)
if resp.status_code != 200:
raise Exception(f"status {resp.status_code} when posting results to master: {resp.text}")
logger.debug(f"posting results took {now() - start} seconds")
except Exception as e:
logger.error(e)
time.sleep(5)
get_batch_url = f"http://{master_ip}:{master_port}/get-batches"
logger.info(f"fetching job from {get_batch_url}")
async with session.get(get_batch_url, headers=headers) as resp:
if resp.status != 200:
raise Exception(f"status {resp.status} when fetching job: {await resp.text()}")
batches = await resp.json(content_type=None)
logger.debug(f"fetching job: took {now() - start}ms")
# Process batches concurrently
tasks = [
process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers)
for batch in batches
]
await asyncio.gather(*tasks)
except Exception as e:
logger.error(e)
await asyncio.sleep(5)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TIG Slave Benchmarker")
@ -107,4 +134,4 @@ if __name__ == "__main__":
level=logging.DEBUG if args.verbose else logging.INFO
)
main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port)
asyncio.run(main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port))

View File

@ -1,47 +0,0 @@
import asyncio
import logging
import os
import time
from asyncio import Queue
from collections import defaultdict
from typing import Callable, Dict, List, Optional
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
_queue = []
async def emit(event_name: str, **kwargs) -> None:
global _queue
_queue.append((event_name, kwargs))
async def _safe_execute(extension_name: str, event_name: str, handler: Callable, kwargs: dict) -> None:
try:
await handler(**kwargs)
except Exception as e:
import traceback
traceback.print_exc()
logger.error(f"'{extension_name}.on_{event_name}': {str(e)}")
async def process_events(extensions: dict):
global _queue
num_events = len(_queue)
event_names = set(
_queue[i][0]
for i in range(num_events)
)
handlers = {
e: [
(ext_name, getattr(ext, f'on_{e}'))
for ext_name, ext in extensions.items()
if hasattr(ext, f'on_{e}')
]
for e in event_names
}
await asyncio.gather(
*[
_safe_execute(ext_name, _queue[i][0], h, _queue[i][1])
for i in range(num_events)
for (ext_name, h) in handlers[_queue[i][0]]
]
)
_queue = _queue[num_events:]

View File

@ -3,7 +3,6 @@ import asyncio
import json
import logging
import os
from tig_benchmarker.event_bus import *
from tig_benchmarker.structs import *
from tig_benchmarker.utils import *
from typing import Dict, Any
@ -25,25 +24,21 @@ async def _get(url: str) -> Dict[str, Any]:
logger.error(err_msg)
raise Exception(err_msg)
class Extension:
def __init__(self, api_url: str, player_id: str, **kwargs):
class DataFetcher:
def __init__(self, api_url: str, player_id: str):
self.api_url = api_url
self.player_id = player_id
self.last_fetch = 0
self._cache = None
async def on_update(self):
now_ = now()
if now_ - self.last_fetch < 10000:
return
self.last_fetch = now_
async def run(self) -> dict:
logger.debug("fetching latest block")
block_data = await _get(f"{self.api_url}/get-block")
block = Block.from_dict(block_data["block"])
if self._cache is not None and block.id == self._cache["block"].id:
logger.debug("no new block data")
return
return self._cache
logger.info(f"new block @ height {block.details.height}, fetching data")
tasks = [
@ -63,32 +58,18 @@ class Extension:
precommits = {b["benchmark_id"]: Precommit.from_dict(b) for b in benchmarks_data["precommits"]}
benchmarks = {b["id"]: Benchmark.from_dict(b) for b in benchmarks_data["benchmarks"]}
proofs = {p["benchmark_id"]: Proof.from_dict(p) for p in benchmarks_data["proofs"]}
frauds = {f["benchmark_id"]: Fraud.from_dict(f) for f in benchmarks_data["frauds"]}
for benchmark_id, precommit in precommits.items():
benchmark = benchmarks.get(benchmark_id, None)
proof = proofs.get(benchmark_id, None)
if proof is not None:
if self._cache is None or benchmark_id not in self._cache["proofs"]:
await emit(
"proof_confirmed",
precommit=precommit,
benchmark=benchmark,
proof=proof,
)
elif benchmark is not None:
if self._cache is None or benchmark_id not in self._cache["benchmarks"]:
await emit(
"benchmark_confirmed",
precommit=precommit,
benchmark=benchmark,
)
elif self._cache is None or benchmark_id not in self._cache["precommits"]:
await emit(
"precommit_confirmed",
precommit=precommit
)
frauds = {f["benchmark_id"]: Fraud.from_dict(f) for f in benchmarks_data["frauds"]}
challenges = {c["id"]: Challenge.from_dict(c) for c in challenges_data["challenges"]}
tasks = [
_get(f"{self.api_url}/get-difficulty-data?block_id={block.id}&challenge_id={c_id}")
for c_id in challenges
]
difficulty_data = await asyncio.gather(*tasks)
difficulty_data = {
c_id: [DifficultyData.from_dict(x) for x in d["data"]]
for c_id, d in zip(challenges, difficulty_data)
}
self._cache = {
"block": block,
@ -99,6 +80,7 @@ class Extension:
"benchmarks": benchmarks,
"proofs": proofs,
"frauds": frauds,
"challenges": challenges
"challenges": challenges,
"difficulty_data": difficulty_data
}
await emit("new_block", **self._cache)
return self._cache

View File

@ -1,211 +1,145 @@
import asyncio
import logging
import math
import numpy as np
import os
from tig_benchmarker.event_bus import *
import random
from tig_benchmarker.structs import *
from tig_benchmarker.utils import FromDict
from typing import List, Tuple, Dict
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
Point = List[int]
Frontier = List[Point]
def calc_valid_difficulties(block_data: ChallengeBlockData) -> List[Point]:
"""
Calculates a list of all difficulty combinations within the base and scaled frontiers
"""
hardest_difficulty = np.max([
np.max(list(block_data.scaled_frontier), axis=0),
np.max(list(block_data.base_frontier), axis=0),
], axis=0)
min_difficulty = np.max([
np.min(list(block_data.scaled_frontier), axis=0),
np.min(list(block_data.base_frontier), axis=0),
], axis=0)
weights = np.zeros(hardest_difficulty - min_difficulty + 1, dtype=float)
lower_cutoff_points = np.array(list(block_data.base_frontier)) - min_difficulty
upper_cutoff_points = np.array(list(block_data.scaled_frontier)) - min_difficulty
if block_data.scaling_factor < 1.0:
lower_cutoff_points, upper_cutoff_points = upper_cutoff_points, lower_cutoff_points
lower_cutoff_points = lower_cutoff_points[np.argsort(lower_cutoff_points[:, 0]), :]
upper_cutoff_points = upper_cutoff_points[np.argsort(upper_cutoff_points[:, 0]), :]
lower_cutoff_idx = 0
lower_cutoff = lower_cutoff_points[lower_cutoff_idx]
upper_cutoff_idx = 0
upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx]
if len(upper_cutoff_points) > 1:
upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1]
else:
upper_cutoff2 = upper_cutoff1
for i in range(weights.shape[0]):
if lower_cutoff_idx + 1 < len(lower_cutoff_points) and i == lower_cutoff_points[lower_cutoff_idx + 1, 0]:
lower_cutoff_idx += 1
lower_cutoff = lower_cutoff_points[lower_cutoff_idx]
if upper_cutoff_idx + 1 < len(upper_cutoff_points) and i == upper_cutoff_points[upper_cutoff_idx + 1, 0]:
upper_cutoff_idx += 1
upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx]
if upper_cutoff_idx + 1 < len(upper_cutoff_points):
upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1]
else:
upper_cutoff2 = upper_cutoff1
if i >= lower_cutoff[0]:
start = lower_cutoff[1]
else:
start = lower_cutoff[1] + 1
if i <= upper_cutoff2[0]:
weights[i, start:upper_cutoff2[1] + 1] = 1.0
if i < upper_cutoff2[0]:
weights[i, start:upper_cutoff1[1]] = 1.0
if i == upper_cutoff1[0]:
weights[i, upper_cutoff1[1]] = 1.0
valid_difficulties = np.stack(np.where(weights), axis=1) + min_difficulty
return valid_difficulties.tolist()
def calc_pareto_frontier(points: List[Point]) -> Frontier:
"""
Calculates a single Pareto frontier from a list of points
Adapted from https://stackoverflow.com/questions/32791911/fast-calculation-of-pareto-front-in-python
"""
points_ = points
points = np.array(points)
frontier_idxs = np.arange(points.shape[0])
n_points = points.shape[0]
next_point_index = 0 # Next index in the frontier_idxs array to search for
while next_point_index < len(points):
nondominated_point_mask = np.any(points < points[next_point_index], axis=1)
nondominated_point_mask[np.all(points == points[next_point_index], axis=1)] = True
frontier_idxs = frontier_idxs[nondominated_point_mask] # Remove dominated points
points = points[nondominated_point_mask]
next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1
return [points_[idx] for idx in frontier_idxs]
def calc_all_frontiers(points: List[Point]) -> List[Frontier]:
"""
Calculates a list of Pareto frontiers from a list of points
"""
buckets = {}
r = np.max(points, axis=0) - np.min(points, axis=0)
dim1, dim2 = (1, 0) if r[0] > r[1] else (0, 1)
for p in points:
if p[dim1] not in buckets:
buckets[p[dim1]] = []
buckets[p[dim1]].append(p)
for bucket in buckets.values():
bucket.sort(reverse=True, key=lambda x: x[dim2])
frontiers = []
while len(buckets) > 0:
points = [bucket[-1] for bucket in buckets.values()]
frontier = calc_pareto_frontier(points)
for p in frontier:
x = p[dim1]
buckets[x].pop()
if len(buckets[x]) == 0:
buckets.pop(x)
frontiers.append(frontier)
return frontiers
@dataclass
class DifficultySamplerConfig(FromDict):
num_samples: int = 100
padding_factor: float = 0.2
decay: float = 0.7
initial_solutions_weight: float = 500.0
solutions_multiplier: float = 10.0
difficulty_ranges: Dict[str, Tuple[float, float]]
def __post_init__(self):
for c_name, (start, end) in self.difficulty_ranges.items():
if start < 0 or start > 1 or end < 0 or end > 1 or start > end:
raise ValueError(f"Invalid difficulty range for challenge {c_name}. Must be (start, end) where '0 <= start <= end <= 1'")
class DifficultySampler:
def __init__(self, config: DifficultySamplerConfig):
self.config = config
self.min_difficulty = None
self.padding = None
self.dimensions = None
self.weights = np.empty((0,0,2), dtype=float)
self.distribution = None
self.valid_difficulties = {}
self.frontiers = {}
def sample(self):
if self.distribution is None:
raise ValueError("You must update sampler first")
p = self.distribution.flatten()
idx = np.random.choice(len(p), p=p)
num_cols = self.dimensions[1] + self.padding[1]
x = idx // num_cols
y = idx % num_cols
return [int(x + self.min_difficulty[0]), int(y + self.min_difficulty[1])]
def update_with_block_data(self, min_difficulty: List[int], block_data):
assert len(min_difficulty) == 2, "Only difficulty with 2 parameters are supported"
min_difficulty = np.array(min_difficulty)
if self.min_difficulty is None:
left_pad = np.zeros(2, dtype=int)
else:
left_pad = min_difficulty - self.min_difficulty
self.min_difficulty = min_difficulty
self.update_dimensions_and_padding(block_data)
size = self.dimensions + self.padding
self.resize_weights(left_pad, size)
self.update_valid_range(block_data)
self.update_distributions()
def update_with_solutions(self, difficulty: List[int], num_solutions: int):
center = np.array(difficulty) - self.min_difficulty
x_min = max(0, center[0] - self.padding[0])
x_max = min(self.weights.shape[0] - 1, center[0] + self.padding[0])
y_min = max(0, center[1] - self.padding[1])
y_max = min(self.weights.shape[1] - 1, center[1] + self.padding[1])
if x_min > x_max or y_min > y_max:
return
y, x = np.meshgrid(
np.arange(y_min, y_max + 1, dtype=float),
np.arange(x_min, x_max + 1, dtype=float)
)
position = np.stack((x, y), axis=-1)
dist = np.linalg.norm((position - center) / self.padding, axis=-1)
decay = dist * (1.0 - self.config.decay) + self.config.decay
delta = (1.0 - decay) * num_solutions * self.config.solutions_multiplier
decay[np.where(dist > 1.0)] = 1.0
delta[np.where(dist > 1.0)] = 0.0
self.weights[x_min:x_max + 1, y_min:y_max + 1, 1] *= decay
self.weights[x_min:x_max + 1, y_min:y_max + 1, 1] += delta
def update_valid_range(self, block_data):
lower_cutoff_points = np.array(list(block_data.base_frontier)) - self.min_difficulty
upper_cutoff_points = np.array(list(block_data.scaled_frontier)) - self.min_difficulty
if block_data.scaling_factor < 1.0:
lower_cutoff_points, upper_cutoff_points = upper_cutoff_points, lower_cutoff_points
lower_cutoff_points = lower_cutoff_points[np.argsort(lower_cutoff_points[:, 0]), :]
upper_cutoff_points = upper_cutoff_points[np.argsort(upper_cutoff_points[:, 0]), :]
lower_cutoff_idx = 0
lower_cutoff = lower_cutoff_points[lower_cutoff_idx]
upper_cutoff_idx = 0
upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx]
if len(upper_cutoff_points) > 1:
upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1]
else:
upper_cutoff2 = upper_cutoff1
self.weights[:, :, 0] = 0.0
for i in range(self.weights.shape[0]):
if lower_cutoff_idx + 1 < len(lower_cutoff_points) and i == lower_cutoff_points[lower_cutoff_idx + 1, 0]:
lower_cutoff_idx += 1
lower_cutoff = lower_cutoff_points[lower_cutoff_idx]
if upper_cutoff_idx + 1 < len(upper_cutoff_points) and i == upper_cutoff_points[upper_cutoff_idx + 1, 0]:
upper_cutoff_idx += 1
upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx]
if upper_cutoff_idx + 1 < len(upper_cutoff_points):
upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1]
else:
upper_cutoff2 = upper_cutoff1
if i >= lower_cutoff[0]:
start = lower_cutoff[1]
else:
start = lower_cutoff[1] + 1
if i <= upper_cutoff2[0]:
self.weights[i, start:upper_cutoff2[1] + 1, 0] = 1.0
if i < upper_cutoff2[0]:
self.weights[i, start:upper_cutoff1[1], 0] = 1.0
if i == upper_cutoff1[0]:
self.weights[i, upper_cutoff1[1], 0] = 1.0
def update_distributions(self):
distribution = np.prod(self.weights, axis=2)
distribution /= np.sum(distribution)
self.distribution = distribution
def resize_weights(self, left_pad: np.ndarray, size: np.ndarray):
default_values = [0.0, self.config.initial_solutions_weight]
if left_pad[0] > 0:
pad = np.full((left_pad[0], self.weights.shape[1], 2), default_values)
self.weights = np.vstack((pad, self.weights))
elif left_pad[0] < 0:
self.weights = self.weights[-left_pad[0]:, :, :]
if left_pad[1] > 0:
pad = np.full((self.weights.shape[0], left_pad[1], 2), default_values)
self.weights = np.hstack((pad, self.weights))
elif left_pad[1] < 0:
self.weights = self.weights[:, -left_pad[1]:, :]
right_pad = size - self.weights.shape[:2]
if right_pad[0] > 0:
pad = np.full((right_pad[0], self.weights.shape[1], 2), default_values)
self.weights = np.vstack((self.weights, pad))
elif right_pad[0] < 0:
self.weights = self.weights[:size[0], :, :]
if right_pad[1] > 0:
pad = np.full((self.weights.shape[0], right_pad[1], 2), default_values)
self.weights = np.hstack((self.weights, pad))
elif right_pad[1] < 0:
self.weights = self.weights[:, :size[1], :]
def update_dimensions_and_padding(self, block_data):
hardest_difficulty = np.max([
np.max(list(block_data.scaled_frontier), axis=0),
np.max(list(block_data.base_frontier), axis=0),
], axis=0)
if block_data.qualifier_difficulties is not None and len(block_data.qualifier_difficulties) > 0:
hardest_difficulty = np.max([
hardest_difficulty,
np.max(list(block_data.qualifier_difficulties), axis=0)
], axis=0)
self.dimensions = hardest_difficulty - self.min_difficulty + 1
self.padding = np.ceil(self.dimensions * self.config.padding_factor).astype(int)
class Extension:
def __init__(self, **kwargs):
if (difficulty_sampler := kwargs.get("difficulty_sampler", None)):
self.config = DifficultySamplerConfig.from_dict(difficulty_sampler)
else:
self.config = DifficultySamplerConfig()
self.samplers = {}
self.lock = True
async def on_new_block(
self,
block: Block,
challenges: Dict[str, Challenge],
**kwargs # ignore other data
):
logger.debug("new block, updating difficulty samplers")
for challenge in challenges.values():
if challenge.block_data is None:
def on_new_block(self, challenges: Dict[str, Challenge], **kwargs):
for c in challenges.values():
if c.block_data is None:
continue
if challenge.id not in self.samplers:
self.samplers[challenge.id] = (challenge.details.name, DifficultySampler(self.config))
logger.debug(f"updating sampler for challenge {challenge.details.name}")
self.samplers[challenge.id][1].update_with_block_data(
min_difficulty=[
param["min_value"]
for param in block.config["difficulty"]["parameters"][challenge.id]
],
block_data=challenge.block_data
)
logger.debug(f"emitting {self.config.num_samples} difficulty samples for challenge {challenge.details.name}")
await emit(
"difficulty_samples",
challenge_id=challenge.id,
block_id=block.id,
samples=[self.samplers[challenge.id][1].sample() for _ in range(self.config.num_samples)]
)
self.lock = False
logger.debug(f"Calculating valid difficulties and frontiers for challenge {c.details.name}")
self.valid_difficulties[c.details.name] = calc_valid_difficulties(c.block_data)
self.frontiers[c.details.name] = calc_all_frontiers(self.valid_difficulties[c.details.name])
async def on_benchmark_confirmed(self, precommit: Precommit, benchmark: Benchmark):
while self.lock:
await asyncio.sleep(0.5)
challenge_id = precommit.settings.challenge_id
num_solutions = benchmark.details.num_solutions
if challenge_id not in self.samplers:
logger.warning(f"no sampler for challenge {challenge_id}")
else:
challenge_name, sampler = self.samplers[challenge_id]
logger.debug(f"updating sampler for challenge {challenge_name} with {num_solutions} solutions @ difficulty {precommit.settings.difficulty}")
sampler.update_with_solutions(
difficulty=precommit.settings.difficulty,
num_solutions=num_solutions
)
def run(self) -> Dict[str, Point]:
samples = {}
for c_name, frontiers in self.frontiers.items():
difficulty_range = self.config.difficulty_ranges[c_name] # FIXME
idx1 = math.floor(difficulty_range[0] * (len(frontiers) - 1))
idx2 = math.ceil(difficulty_range[1] * (len(frontiers) - 1))
difficulties = [p for frontier in frontiers[idx1:idx2 + 1] for p in frontier]
difficulty = random.choice(difficulties)
samples[c_name] = difficulty
logger.debug(f"Sampled difficulty {difficulty} for challenge {c_name}")
return samples

View File

@ -3,7 +3,6 @@ import os
import json
import logging
from dataclasses import dataclass
from tig_benchmarker.event_bus import *
from tig_benchmarker.merkle_tree import MerkleHash, MerkleBranch, MerkleTree
from tig_benchmarker.structs import *
from tig_benchmarker.utils import *
@ -16,275 +15,172 @@ class Job(FromDict):
benchmark_id: str
settings: BenchmarkSettings
num_nonces: int
batch_size: int
num_batches: int
rand_hash: str
sampled_nonces: List[int]
sampled_nonces_by_batch_idx: Dict[int, List[int]]
wasm_vm_config: Dict[str, int]
download_url: str
solution_nonces: List[int]
batch_merkle_roots: List[Optional[MerkleHash]]
merkle_proofs: Dict[int, MerkleProof]
last_retry_time: List[int]
start_time: int
batch_size: int
challenge: str
sampled_nonces: Optional[List[int]] = field(default_factory=list)
merkle_root: Optional[MerkleHash] = None
solution_nonces: List[int] = field(default_factory=list)
merkle_proofs: Dict[int, MerkleProof] = field(default_factory=dict)
solution_nonces: List[int] = field(default_factory=list)
batch_merkle_proofs: Dict[int, MerkleProof] = field(default_factory=dict)
batch_merkle_roots: List[Optional[MerkleHash]] = None
last_benchmark_submit_time: int = 0
last_proof_submit_time: int = 0
last_batch_retry_time: List[int] = None
def __post_init__(self):
self.batch_merkle_roots = [None] * self.num_batches
self.last_batch_retry_time = [0] * self.num_batches
@property
def num_batches(self) -> int:
return (self.num_nonces + self.batch_size - 1) // self.batch_size
@property
def sampled_nonces_by_batch_idx(self) -> Dict[int, List[int]]:
ret = {}
for nonce in self.sampled_nonces:
batch_idx = nonce // self.batch_size
ret.setdefault(batch_idx, []).append(nonce)
return ret
@dataclass
class JobManagerConfig(FromDict):
batch_size: int
ms_delay_between_batch_retries: Optional[int] = None
backup_folder: str
batch_sizes: Dict[str, int]
class Extension:
def __init__(self, backup_folder: str, job_manager: dict, **kwargs):
self.backup_folder = backup_folder
self.config = {k: JobManagerConfig.from_dict(v) for k, v in job_manager.items()}
for challenge_name, config in self.config.items():
batch_size = config.batch_size
assert (batch_size & (batch_size - 1) == 0) and batch_size != 0, f"batch_size {batch_size} for challenge {challenge_name} is not a power of 2"
self.jobs = {}
self.wasm_vm_config = {}
self.challenge_id_2_name = {}
self.download_urls = {}
self.benchmark_ready = set()
self.proof_ready = set()
self.lock = True
self._restore_jobs()
async def on_new_block(
self,
block: Block,
precommits: Dict[str, Precommit],
wasms: Dict[str, Wasm],
challenges: Dict[str, Challenge],
**kwargs
):
self.wasm_vm_config = block.config["wasm_vm"]
self.download_urls = {
w.algorithm_id: w.details.download_url
for w in wasms.values()
}
prune_jobs = [
job
for benchmark_id, job in self.jobs.items()
if benchmark_id not in precommits
]
for job in prune_jobs:
self._prune_job(job)
for c in challenges.values():
self.challenge_id_2_name[c.id] = c.details.name
self.lock = False
async def on_precommit_confirmed(self, precommit: Precommit, **kwargs):
while self.lock:
await asyncio.sleep(0.1)
benchmark_id = precommit.benchmark_id
if benchmark_id not in self.jobs:
c_name = self.challenge_id_2_name[precommit.settings.challenge_id]
batch_size = self.config[c_name].batch_size
num_batches = (precommit.details.num_nonces + batch_size - 1) // batch_size
job = Job(
benchmark_id=benchmark_id,
settings=precommit.settings,
num_nonces=precommit.details.num_nonces,
num_batches=num_batches,
rand_hash=precommit.state.rand_hash,
sampled_nonces_by_batch_idx={},
sampled_nonces=[],
wasm_vm_config=self.wasm_vm_config,
download_url=self.download_urls[precommit.settings.algorithm_id],
batch_size=batch_size,
solution_nonces=[],
batch_merkle_roots=[None] * num_batches,
merkle_proofs={},
last_retry_time=[0] * num_batches,
start_time=now()
)
self.jobs[benchmark_id] = job
self._save_job(job)
async def on_benchmark_confirmed(self, precommit: Precommit, benchmark: Optional[Benchmark], **kwargs):
await self.on_precommit_confirmed(precommit, **kwargs)
if benchmark is not None:
job = self.jobs[precommit.benchmark_id]
for nonce in benchmark.state.sampled_nonces:
batch_idx = nonce // job.batch_size
job.sampled_nonces_by_batch_idx.setdefault(batch_idx, []).append(nonce)
job.last_retry_time[batch_idx] = 0
job.sampled_nonces = benchmark.state.sampled_nonces
async def on_proof_confirmed(self, proof: Proof, **kwargs):
if (job := self.jobs.get(proof.benchmark_id, None)) is not None:
self._prune_job(job)
async def on_update(self):
if self.lock:
return
for benchmark_id, job in self.jobs.items():
if (
benchmark_id not in self.proof_ready and
len(job.sampled_nonces) > 0 and
set(job.sampled_nonces) == set(job.merkle_proofs) and
all(x is not None for x in job.batch_merkle_roots)
):
self.proof_ready.add(benchmark_id)
await self._emit_proof(job)
elif (
benchmark_id not in self.benchmark_ready and
all(x is not None for x in job.batch_merkle_roots)
):
self.benchmark_ready.add(benchmark_id)
await self._emit_benchmark(job)
else:
await self._emit_batches(job)
async def on_batch_result(
self,
benchmark_id: str,
start_nonce: int,
solution_nonces: List[int],
merkle_root: MerkleHash,
merkle_proofs: List[MerkleProof],
**kwargs
):
merkle_root = MerkleHash.from_str(merkle_root)
merkle_proofs = [MerkleProof.from_dict(x) for x in merkle_proofs]
if benchmark_id not in self.jobs:
logger.warning(f"job not found for benchmark {benchmark_id}")
else:
job = self.jobs[benchmark_id]
batch_idx = start_nonce // job.batch_size
# validate batch result (does not check the output data)
logger.debug(f"validating batch results for {benchmark_id} @ index {batch_idx}")
assert start_nonce % job.batch_size == 0, "start_nonce not aligned with batch size"
assert all(start_nonce <= n < start_nonce + job.num_nonces for n in solution_nonces), "solution nonces not in batch"
left = set(job.sampled_nonces_by_batch_idx.get(batch_idx, []))
right = set(x.leaf.nonce for x in merkle_proofs)
if len(left) > 0 and len(right) == 0:
logger.warning(f"no merkle proofs for batch {batch_idx} of {benchmark_id}")
return
assert left == right, f"sampled nonces {left} do not match proofs {right}"
assert all(x.branch.calc_merkle_root(
hashed_leaf=x.leaf.to_merkle_hash(),
branch_idx=x.leaf.nonce - start_nonce, # branch idx of batch tree
) == merkle_root for x in merkle_proofs), "merkle proofs do not match merkle root"
num_nonces = min(job.batch_size, job.num_nonces - start_nonce)
job.solution_nonces.extend(solution_nonces)
job.batch_merkle_roots[batch_idx] = merkle_root
job.merkle_proofs.update({x.leaf.nonce: x for x in merkle_proofs})
self._save_job(job)
def _restore_jobs(self):
path = os.path.join(self.backup_folder, "jobs")
if not os.path.exists(path):
logger.info(f"creating backup folder {path}")
os.makedirs(path, exist_ok=True)
for file in os.listdir(path):
if not file.endswith('.json'):
class JobManager:
def __init__(self, config: JobManagerConfig, jobs: List[Job]):
self.config = config
self.jobs = jobs
os.makedirs(self.config.backup_folder, exist_ok=True)
for file in os.listdir(self.config.backup_folder):
if not file.endswith(".json"):
continue
file_path = os.path.join(path, file)
file_path = f"{self.config.backup_folder}/{file}"
logger.info(f"restoring job from {file_path}")
with open(file_path) as f:
job = Job.from_dict(json.load(f))
self.jobs[job.benchmark_id] = job
self.jobs.append(job)
def _save_job(self, job: Job):
path = os.path.join(self.backup_folder, "jobs", f"{job.benchmark_id}.json")
with open(path, 'w') as f:
json.dump(job.to_dict(), f)
def _prune_job(self, job: Job):
path = os.path.join(self.backup_folder, "jobs", f"{job.benchmark_id}.json")
logger.debug(f"pruning job {path}")
self.jobs.pop(job.benchmark_id, None)
if os.path.exists(path):
os.remove(path)
if job.benchmark_id in self.benchmark_ready:
self.benchmark_ready.remove(job.benchmark_id)
if job.benchmark_id in self.proof_ready:
self.proof_ready.remove(job.benchmark_id)
async def _emit_proof(self, job: Job):
# join merkle_proof for the job tree (all batches) with merkle_proof of the batch tree
depth_offset = (job.batch_size - 1).bit_length()
tree = MerkleTree(
job.batch_merkle_roots,
1 << (job.num_batches - 1).bit_length()
)
proofs = {}
for batch_idx in job.sampled_nonces_by_batch_idx:
upper_stems = [
(d + depth_offset, h)
for d, h in tree.calc_merkle_branch(batch_idx).stems
]
for nonce in set(job.sampled_nonces_by_batch_idx[batch_idx]):
proof = job.merkle_proofs[nonce]
proofs[nonce] = MerkleProof(
leaf=proof.leaf,
branch=MerkleBranch(proof.branch.stems + upper_stems)
)
c_name = self.challenge_id_2_name[job.settings.challenge_id]
logger.info(f"proof {job.benchmark_id} ready: (challenge: {c_name}, elapsed: {now() - job.start_time}ms)")
await emit(
"proof_ready",
benchmark_id=job.benchmark_id,
merkle_proofs=list(proofs.values())
)
async def _emit_benchmark(self, job: Job):
tree = MerkleTree(
job.batch_merkle_roots,
1 << (job.num_batches - 1).bit_length()
)
root = tree.calc_merkle_root()
c_name = self.challenge_id_2_name[job.settings.challenge_id]
logger.info(f"benchmark {job.benchmark_id} ready: (challenge: {c_name}, num_solutions: {len(job.solution_nonces)}, elapsed: {now() - job.start_time}ms)")
await emit(
"benchmark_ready",
benchmark_id=job.benchmark_id,
merkle_root=root,
solution_nonces=list(set(job.solution_nonces))
)
async def _emit_batches(self, job: Job):
c_name = self.challenge_id_2_name[job.settings.challenge_id]
ms_delay_between_batch_retries = self.config[c_name].ms_delay_between_batch_retries or 30000
now_ = now()
retry_batch_idxs = [
batch_idx
for batch_idx in range(job.num_batches)
def on_new_block(
self,
block: Block,
precommits: Dict[str, Precommit],
benchmarks: Dict[str, Benchmark],
proofs: Dict[str, Proof],
challenges: Dict[str, Challenge],
wasms: Dict[str, Wasm],
**kwargs
):
job_idxs = {
j.benchmark_id: idx
for idx, j in enumerate(self.jobs)
}
# create jobs from confirmed precommits
challenge_id_2_name = {
c.id: c.details.name
for c in challenges.values()
}
for benchmark_id, x in precommits.items():
if (
now_ - job.last_retry_time[batch_idx] >= ms_delay_between_batch_retries and
(
job.batch_merkle_roots[batch_idx] is None or (
len(job.sampled_nonces) > 0 and
not set(job.sampled_nonces_by_batch_idx.get(batch_idx, [])).issubset(set(job.merkle_proofs))
)
)
benchmark_id in job_idxs or
benchmark_id in proofs
):
continue
logger.info(f"creating job from confirmed precommit {benchmark_id}")
c_name = challenge_id_2_name[x.settings.challenge_id]
job = Job(
benchmark_id=benchmark_id,
settings=x.settings,
num_nonces=x.details.num_nonces,
rand_hash=x.state.rand_hash,
wasm_vm_config=block.config["wasm_vm"],
batch_size=self.config.batch_sizes[c_name],
challenge=c_name,
download_url=next((w.details.download_url for w in wasms.values() if w.algorithm_id == x.settings.algorithm_id), None)
)
job_idxs[benchmark_id] = len(self.jobs)
self.jobs.append(job)
# update jobs from confirmed benchmarks
for benchmark_id, x in benchmarks.items():
if benchmark_id in proofs:
continue
logger.info(f"updating job from confirmed benchmark {benchmark_id}")
job = self.jobs[job_idxs[benchmark_id]]
job.sampled_nonces = x.state.sampled_nonces
for batch_idx in job.sampled_nonces_by_batch_idx:
job.last_batch_retry_time[batch_idx] = 0
# prune jobs from confirmed proofs
prune_idxs = [
job_idxs[benchmark_id]
for benchmark_id in proofs
if benchmark_id in job_idxs
] + [
job_idxs[benchmark_id]
for benchmark_id in job_idxs
if benchmark_id not in precommits
]
num_finished = sum(x is not None for x in job.batch_merkle_roots)
if num_finished != len(job.batch_merkle_roots):
c_name = self.challenge_id_2_name[job.settings.challenge_id]
logger.info(f"precommit {job.benchmark_id}: (challenge: {c_name}, progress: {num_finished} of {len(job.batch_merkle_roots)} batches, elapsed: {now_ - job.start_time}ms)")
if len(retry_batch_idxs) == 0:
return
for batch_idx in retry_batch_idxs:
job.last_retry_time[batch_idx] = now_
await emit(
"new_batch",
benchmark_id=job.benchmark_id,
settings=job.settings.to_dict(),
num_nonces=min(
job.batch_size,
job.num_nonces - batch_idx * job.batch_size
),
start_nonce=batch_idx * job.batch_size,
batch_size=job.batch_size,
rand_hash=job.rand_hash,
sampled_nonces=job.sampled_nonces_by_batch_idx.get(batch_idx, []),
wasm_vm_config=job.wasm_vm_config,
download_url=job.download_url,
)
for idx in sorted(set(prune_idxs), reverse=True):
job = self.jobs[idx]
logger.info(f"pruning job {job.benchmark_id}")
if os.path.exists(f"{self.config.backup_folder}/{job.benchmark_id}.json"):
os.remove(f"{self.config.backup_folder}/{job.benchmark_id}.json")
self.jobs.pop(idx)
def run(self):
now = int(time.time() * 1000)
for job in self.jobs:
if job.merkle_root is not None:
continue
num_batches_ready = sum(x is not None for x in job.batch_merkle_roots)
logger.info(f"benchmark {job.benchmark_id}: (batches: {num_batches_ready} of {job.num_batches} ready, #solutions: {len(job.solution_nonces)})")
if num_batches_ready != job.num_batches:
continue
start_time = min(job.last_batch_retry_time)
logger.info(f"benchmark {job.benchmark_id}: ready, took {(now - start_time) / 1000} seconds")
tree = MerkleTree(
job.batch_merkle_roots,
1 << (job.num_batches - 1).bit_length()
)
job.merkle_root = tree.calc_merkle_root()
for job in self.jobs:
if (
len(job.sampled_nonces) == 0 or # benchmark not confirmed
len(job.merkle_proofs) == len(job.sampled_nonces) # already processed
):
continue
logger.info(f"proof {job.benchmark_id}: (merkle_proof: {len(job.batch_merkle_proofs)} of {len(job.sampled_nonces)} ready)")
if len(job.batch_merkle_proofs) != len(job.sampled_nonces): # not finished
continue
logger.info(f"proof {job.benchmark_id}: ready")
depth_offset = (job.batch_size - 1).bit_length()
tree = MerkleTree(
job.batch_merkle_roots,
1 << (job.num_batches - 1).bit_length()
)
proofs = {}
sampled_nonces_by_batch_idx = job.sampled_nonces_by_batch_idx
for batch_idx in sampled_nonces_by_batch_idx:
upper_stems = [
(d + depth_offset, h)
for d, h in tree.calc_merkle_branch(batch_idx).stems
]
for nonce in set(sampled_nonces_by_batch_idx[batch_idx]):
proof = job.batch_merkle_proofs[nonce]
job.merkle_proofs[nonce] = MerkleProof(
leaf=proof.leaf,
branch=MerkleBranch(proof.branch.stems + upper_stems)
)
for job in self.jobs:
file_path = f"{self.config.backup_folder}/{job.benchmark_id}.json"
logger.debug(f"backing up job to {file_path}")
with open(file_path, "w") as f:
json.dump(job.to_dict(), f)

View File

@ -3,7 +3,8 @@ import os
import logging
import random
from dataclasses import dataclass
from tig_benchmarker.event_bus import *
from tig_benchmarker.extensions.job_manager import Job
from tig_benchmarker.extensions.submissions_manager import SubmitPrecommitRequest
from tig_benchmarker.structs import *
from tig_benchmarker.utils import FromDict
from typing import Dict, List, Optional, Set
@ -15,146 +16,123 @@ class AlgorithmSelectionConfig(FromDict):
algorithm: str
base_fee_limit: PreciseNumber
num_nonces: int
weight: Optional[float] = None
weight: float
@dataclass
class PrecommitManagerConfig(FromDict):
max_unresolved_precommits: int
max_pending_benchmarks: int
algo_selection: Dict[str, AlgorithmSelectionConfig]
class Extension:
def __init__(self, player_id: str, backup_folder: str, precommit_manager: dict, **kwargs):
class PrecommitManager:
def __init__(self, config: PrecommitManagerConfig, player_id: str, jobs: List[Job]):
self.config = config
self.player_id = player_id
self.config = PrecommitManagerConfig.from_dict(precommit_manager)
self.jobs = jobs
self.last_block_id = None
self.num_precommits_this_block = 0
self.num_unresolved_precommits = None
self.curr_base_fees = {}
self.difficulty_samples = {}
self.num_precommits_submitted = 0
self.algorithm_name_2_id = {}
self.challenge_name_2_id = {}
self.percent_qualifiers_by_challenge = {}
self.num_solutions_by_challenge = {}
self.lock = True
self.curr_base_fees = {}
async def on_new_block(
self,
block: Block,
challenges: Dict[str, Challenge],
algorithms: Dict[str, Algorithm],
precommits: Dict[str, Precommit],
def on_new_block(
self,
block: Block,
precommits: Dict[str, Precommit],
benchmarks: Dict[str, Benchmark],
proofs: Dict[str, Proof],
challenges: Dict[str, Challenge],
algorithms: Dict[str, Algorithm],
player: Optional[Player],
difficulty_data: Dict[str, List[DifficultyData]],
**kwargs
):
if self.last_block_id == block.id:
return
self.num_precommits_this_block = 0
self.num_unresolved_precommits = sum(1 for benchmark_id in precommits if benchmark_id not in proofs)
self.last_block_id = block.id
for c in challenges.values():
c_name = c.details.name
assert c_name in self.config.algo_selection, f"missing algorithm selection for challenge {c_name}"
self.challenge_name_2_id[c_name] = c.id
a_name = self.config.algo_selection[c_name].algorithm
a = next(
(
a for a in algorithms.values()
if a.details.challenge_id == c.id and a.details.name == a_name
),
None
)
assert a is not None, f"selected non-existent algorithm {a_name} for challenge {c_name}"
self.algorithm_name_2_id[f"{c_name}_{a_name}"] = a.id
if c.block_data is not None:
self.curr_base_fees[c_name] = c.block_data.base_fee
logger.info(f"current base_fees: {self.curr_base_fees}")
if (
player is None or
player.block_data is None or
player.block_data.num_qualifiers_by_challenge is None
):
self.percent_qualifiers_by_challenge = {
c.details.name: 0
for c in challenges.values()
if c.block_data is not None
}
else:
self.percent_qualifiers_by_challenge = {
c.details.name: player.block_data.num_qualifiers_by_challenge.get(c.id, 0) / c.block_data.num_qualifiers
for c in challenges.values()
if c.block_data is not None
}
logger.info(f"percent_qualifiers_by_challenge: {self.percent_qualifiers_by_challenge}")
self.num_solutions_by_challenge = {
c.details.name: 0
self.num_precommits_submitted = 0
self.challenge_name_2_id = {
c.details.name: c.id
for c in challenges.values()
}
self.algorithm_name_2_id = {
f"{challenges[a.details.challenge_id].details.name}_{a.details.name}": a.id
for a in algorithms.values()
}
self.curr_base_fees = {
c.details.name: c.block_data.base_fee
for c in challenges.values()
if c.block_data is not None
}
for benchmark_id, benchmark in benchmarks.items():
precommit = precommits[benchmark_id]
benchmark_stats_by_challenge = {
c.details.name: {
"solutions": 0,
"nonces": 0,
"qualifiers": 0
}
for c in challenges.values()
if c.block_data is not None
}
for benchmark in benchmarks.values():
precommit = precommits[benchmark.id]
c_name = challenges[precommit.settings.challenge_id].details.name
num_solutions = benchmark.details.num_solutions
self.num_solutions_by_challenge[c_name] += num_solutions
logger.info(f"num_solutions_by_challenge: {self.num_solutions_by_challenge}")
self.lock = False
benchmark_stats_by_challenge[c_name]["solutions"] += benchmark.details.num_solutions
benchmark_stats_by_challenge[c_name]["nonces"] += precommit.details.num_nonces
async def on_difficulty_samples(self, challenge_id: str, samples: list, **kwargs):
self.difficulty_samples[challenge_id] = samples
if player is not None:
logger.info(f"player earnings: (latest: {player.block_data.reward.to_float()}, round: {player.block_data.round_earnings.to_float()})")
logger.info(f"player stats: (cutoff: {player.block_data.cutoff}, imbalance: {player.block_data.imbalance.to_float() * 100}%)")
for c_id, num_qualifiers in player.block_data.num_qualifiers_by_challenge.items():
c_name = challenges[c_id].details.name
benchmark_stats_by_challenge[c_name]["qualifiers"] = num_qualifiers
for c_name, x in benchmark_stats_by_challenge.items():
avg_nonces_per_solution = (x["nonces"] // x["solutions"]) if x["solutions"] > 0 else 0
logger.info(f"benchmark stats for {c_name}: (#nonces: {x['nonces']}, #solutions: {x['solutions']}, #qualifiers: {x['qualifiers']}, avg_nonces_per_solution: {avg_nonces_per_solution})")
async def on_update(self):
if self.lock:
if player is not None and any(x['qualifiers'] == player.block_data.cutoff for x in benchmark_stats_by_challenge.values()):
c_name = min(benchmark_stats_by_challenge, key=lambda x: benchmark_stats_by_challenge[x]['solutions'])
logger.warning(f"recommend finding more solutions for challenge {c_name} to avoid being cut off")
aggregate_difficulty_data = {
c_id: {
"nonces": sum(
x.num_nonces if x.difficulty in challenges[c_id].block_data.qualifier_difficulties else 0
for x in difficulty_data
),
"solutions": sum(
x.num_solutions if x.difficulty in challenges[c_id].block_data.qualifier_difficulties else 0
for x in difficulty_data
),
}
for c_id, difficulty_data in difficulty_data.items()
}
for c_id, x in aggregate_difficulty_data.items():
avg_nonces_per_solution = (x["nonces"] // x["solutions"]) if x["solutions"] > 0 else 0
logger.info(f"global qualifier difficulty stats for {challenges[c_id].details.name}: (#nonces: {x['nonces']}, #solutions: {x['solutions']}, avg_nonces_per_solution: {avg_nonces_per_solution})")
def run(self, difficulty_samples: Dict[str, List[int]]) -> SubmitPrecommitRequest:
num_pending_benchmarks = sum(1 for job in self.jobs if job.merkle_root is None) + self.num_precommits_submitted
if num_pending_benchmarks >= self.config.max_pending_benchmarks:
logger.debug(f"number of pending benchmarks has reached max of {self.config.max_pending_benchmarks}")
return
if self.num_precommits_this_block + self.num_unresolved_precommits >= self.config.max_unresolved_precommits:
logger.debug(f"reached max unresolved precommits: {self.config.max_unresolved_precommits}")
return
algo_selection = [
(c_name, selection)
for c_name, selection in self.config.algo_selection.items()
if self.curr_base_fees[c_name] <= selection.base_fee_limit
selections = [
(c_name, x) for c_name, x in self.config.algo_selection.items()
if self.curr_base_fees[c_name] <= x.base_fee_limit
]
if len(algo_selection) == 0:
logger.warning("no challenges within base fee limit")
return
if any(x[1].weight is not None for x in algo_selection):
logger.debug(f"using config weights to randomly select a challenge + algorithm: {self.config.algo_selection}")
selection = random.choices(algo_selection, weights=[x[1].weight or 1e-12 for x in algo_selection])[0]
elif (max_percent := max(v for v in self.percent_qualifiers_by_challenge.values())) > 0:
logger.debug(f"using percent qualifiers to randomly select a challenge + algorithm: {self.percent_qualifiers_by_challenge}")
selection = random.choices(
algo_selection,
weights=[max_percent - self.percent_qualifiers_by_challenge.get(x[0], 0) + 1e-12 for x in algo_selection]
)[0]
else:
logger.debug(f"using number of solutions to randomly select a challenge + algorithm: {self.num_solutions_by_challenge}")
max_solution = max(v for v in self.num_solutions_by_challenge.values())
selection = random.choices(
algo_selection,
weights=[max_solution - self.num_solutions_by_challenge.get(x[0], 0) + 1e-12 for x in algo_selection]
)[0]
c_name = selection[0]
c_id = self.challenge_name_2_id[c_name]
a_name = selection[1].algorithm
a_id = self.algorithm_name_2_id[f"{c_name}_{a_name}"]
num_nonces = selection[1].num_nonces
difficulty_samples = self.difficulty_samples.get(c_id, [])
if len(difficulty_samples) == 0:
return
difficulty = difficulty_samples.pop()
settings = BenchmarkSettings(
player_id=self.player_id,
block_id=self.last_block_id,
algorithm_id=a_id,
challenge_id=c_id,
difficulty=difficulty
if len(selections) == 0:
logger.warning("No challenges under base fee limit")
return None
logger.debug(f"Selecting challenge from: {[(c_name, x.weight) for c_name, x in selections]}")
selection = random.choices(selections, weights=[x.weight for _, x in selections])[0]
c_id = self.challenge_name_2_id[selection[0]]
a_id = self.algorithm_name_2_id[f"{selection[0]}_{selection[1].algorithm}"]
self.num_precommits_submitted += 1
req = SubmitPrecommitRequest(
settings=BenchmarkSettings(
challenge_id=c_id,
algorithm_id=a_id,
player_id=self.player_id,
block_id=self.last_block_id,
difficulty=difficulty_samples[selection[0]]
),
num_nonces=selection[1].num_nonces
)
logger.debug(f"created precommit: (settings: {settings}, num_nonces: {num_nonces})")
self.num_precommits_this_block += 1
await emit(
"precommit_ready",
settings=settings,
num_nonces=num_nonces
)
logger.info(f"Created precommit (challenge: {selection[0]}, algorithm: {selection[1].algorithm}, difficulty: {req.settings.difficulty}, num_nonces: {req.num_nonces})")
return req

View File

@ -3,115 +3,109 @@ import os
import json
import logging
import re
from dataclasses import dataclass
from collections import deque
from enum import Enum
import signal
from quart import Quart, request, jsonify
from hypercorn.config import Config
from hypercorn.asyncio import serve
from tig_benchmarker.event_bus import *
from tig_benchmarker.extensions.job_manager import Job
from tig_benchmarker.structs import *
from tig_benchmarker.utils import *
from typing import Dict, List, Optional, Set
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
class Status(Enum):
QUEUED = 0
PROCESSING = 1
FINISHED = 2
PRIORITY_QUEUED = 3
PRIORITY_PROCESSING = 4
PRIORITY_FINISHED = 5
@dataclass
class Batch(FromDict):
benchmark_id: str
start_nonce: int
num_nonces: int
settings: BenchmarkSettings
sampled_nonces: List[int]
wasm_vm_config: dict
download_url: str
rand_hash: str
batch_size: int
@dataclass
class BatchResult(FromDict):
merkle_root: MerkleHash
solution_nonces: List[int]
merkle_proofs: List[MerkleProof]
@dataclass
class SlaveConfig(FromDict):
name_regex: str
challenge_selection: Optional[List[str]]
max_concurrent_batches: Dict[str, int]
@dataclass
class SlaveManagerConfig(FromDict):
port: int
time_before_batch_retry: int
slaves: List[SlaveConfig]
class Extension:
def __init__(self, port: int, exit_event: asyncio.Event, slave_manager: dict, **kwargs):
self.port = port
self.config = SlaveManagerConfig.from_dict(slave_manager)
self.exit_event = exit_event
self.batch_status = {}
self.batches = {}
self.priority_batches = {}
self.challenge_name_2_id = {}
self.last_update = 0
self.lock = True
self._start_server()
class SlaveManager:
def __init__(self, config: SlaveManagerConfig, jobs: List[Job]):
self.config = config
self.jobs = jobs
def _start_server(self):
def start(self):
app = Quart(__name__)
@app.route('/get-batch', methods=['GET'])
async def get_batch():
if self.lock:
return "Slave manager is not ready", 503
@app.route('/get-batches', methods=['GET'])
async def get_batches():
if (slave_name := request.headers.get('User-Agent', None)) is None:
return "User-Agent header is required", 403
if not any(re.match(slave.name_regex, slave_name) for slave in self.config.slaves):
logger.warning(f"slave {slave_name} does not match any regex. rejecting get-batch request")
logger.warning(f"slave {slave_name} does not match any regex. rejecting get-batches request")
return "Unregistered slave", 403
if (slave := next((slave for slave in self.config.slaves if re.match(slave.name_regex, slave_name)), None)) is not None:
challenge_selection = slave.challenge_selection
else:
challenge_selection = None
batch = None
while batch is None:
if challenge_selection is not None:
for c_name in challenge_selection:
c_id = self.challenge_name_2_id[c_name]
if len(self.priority_batches.get(c_id, [])):
batch, _ = self.priority_batches[c_id].popleft()
break
elif len(self.batches.get(c_id, [])):
batch, _ = self.batches[c_id].popleft()
break
else:
return "No batches available", 404
else:
if (non_empty_queues := [q for q in self.priority_batches.values() if q]):
batch, _ = min(non_empty_queues, key=lambda q: q[0][1]).popleft()
elif (non_empty_queues := [q for q in self.batches.values() if q]):
batch, _ = min(non_empty_queues, key=lambda q: q[0][1]).popleft()
else:
return "No batches available", 404
slave = next((slave for slave in self.config.slaves if re.match(slave.name_regex, slave_name)), None)
benchmark_id = batch['benchmark_id']
start_nonce = batch['start_nonce']
is_priority = len(batch['sampled_nonces']) > 0
batch_status = self.batch_status.get(benchmark_id, {}).get(start_nonce, None)
if batch_status is None:
batch = None
elif is_priority and batch_status in {
Status.PRIORITY_FINISHED
}:
batch = None
elif not is_priority and batch_status in {
Status.FINISHED,
Status.PRIORITY_QUEUED,
Status.PRIORITY_PROCESSING,
Status.PRIORITY_FINISHED
}:
batch = None
now = int(time.time() * 1000)
batches = []
selected_challenge = None
max_concurrent_batches = None
for job in self.jobs:
if (
job.challenge not in slave.max_concurrent_batches or
(selected_challenge is not None and job.challenge != selected_challenge)
):
continue
sampled_nonces_by_batch_idx = job.sampled_nonces_by_batch_idx
for batch_idx in range(job.num_batches):
if not (
now - job.last_batch_retry_time[batch_idx] > self.config.time_before_batch_retry and
(
job.batch_merkle_roots[batch_idx] is None or
not set(sampled_nonces_by_batch_idx.get(batch_idx, [])).issubset(job.merkle_proofs)
)
):
continue
selected_challenge = job.challenge
max_concurrent_batches = slave.max_concurrent_batches[job.challenge]
start_nonce = batch_idx * job.batch_size
batches.append(Batch(
benchmark_id=job.benchmark_id,
start_nonce=start_nonce,
num_nonces=min(job.batch_size, job.num_nonces - start_nonce),
settings=job.settings.to_dict(),
sampled_nonces=sampled_nonces_by_batch_idx.get(batch_idx, []),
wasm_vm_config=job.wasm_vm_config,
download_url=job.download_url,
rand_hash=job.rand_hash,
batch_size=job.batch_size
))
if len(batches) >= max_concurrent_batches:
break
if len(batches) >= max_concurrent_batches:
break
batch_id = f"{benchmark_id}_{start_nonce}"
if is_priority:
self.batch_status[benchmark_id][start_nonce] = Status.PRIORITY_PROCESSING
logger.debug(f"{slave_name} got priority batch {batch_id}")
if len(batches) == 0:
logger.debug(f"{slave_name} get-batches: None available")
return "No batches available", 503
else:
self.batch_status[benchmark_id][start_nonce] = Status.PROCESSING
logger.debug(f"{slave_name} got batch {batch_id}")
return jsonify(batch)
logger.debug(f"{slave_name} get-batches: Assigning {len(batches)} {selected_challenge} batches")
return jsonify([b.to_dict() for b in batches])
@app.route('/submit-batch-result/<batch_id>', methods=['POST'])
async def submit_batch_result(batch_id):
@ -121,81 +115,27 @@ class Extension:
logger.warning(f"slave {slave_name} does not match any regex. rejecting submit-batch-result request")
benchmark_id, start_nonce = batch_id.split("_")
start_nonce = int(start_nonce)
batch_status = self.batch_status.get(benchmark_id, {}).get(start_nonce, None)
data = await request.json
is_priority = len(data["merkle_proofs"]) > 0
if batch_status is None:
return "Redundant result", 404
elif is_priority and batch_status == Status.PRIORITY_FINISHED:
return "Redundant result", 404
elif not is_priority and batch_status in {
Status.FINISHED,
Status.PRIORITY_QUEUED,
Status.PRIORITY_PROCESSING,
Status.PRIORITY_FINISHED
}:
return "Redundant result", 404
if is_priority:
self.batch_status[benchmark_id][start_nonce] = Status.PRIORITY_FINISHED
logger.debug(f"{slave_name} returned priority batch {batch_id}")
else:
self.batch_status[benchmark_id][start_nonce] = Status.FINISHED
logger.debug(f"{slave_name} returned batch {batch_id}")
await emit("batch_result", benchmark_id=benchmark_id, start_nonce=start_nonce, **data)
result = BatchResult.from_dict(await request.json)
job = next((job for job in self.jobs if job.benchmark_id == benchmark_id), None)
logger.debug(f"{slave_name} submit-batch-result: (benchmark_id: {benchmark_id}, start_nonce: {start_nonce}, #solutions: {len(result.solution_nonces)}, #proofs: {len(result.merkle_proofs)})")
if job is None:
logger.warning(f"{slave_name} submit-batch-result: no job found with benchmark_id {benchmark_id}")
return "Invalid benchmark_id", 400
batch_idx = start_nonce // job.batch_size
job.batch_merkle_roots[batch_idx] = result.merkle_root
job.solution_nonces = list(set(job.solution_nonces + result.solution_nonces))
job.batch_merkle_proofs.update({
x.nonce: x
for x in result.merkle_proofs
})
return "OK"
config = Config()
config.bind = [f"0.0.0.0:{self.port}"]
config.bind = [f"0.0.0.0:{self.config.port}"]
self._server_task = asyncio.create_task(serve(app, config, shutdown_trigger=self.exit_event.wait))
logger.info(f"webserver started on {config.bind[0]}")
async def on_new_batch(self, **batch):
benchmark_id = batch['benchmark_id']
start_nonce = batch['start_nonce']
challenge_id = batch['settings']['challenge_id']
is_priority = len(batch['sampled_nonces']) > 0
batch_status = self.batch_status.get(benchmark_id, {}).get(start_nonce, None)
if is_priority and batch_status in {
Status.PRIORITY_QUEUED,
Status.PRIORITY_FINISHED
}:
return
elif not is_priority and batch_status in {
Status.QUEUED,
Status.FINISHED,
Status.PRIORITY_QUEUED,
Status.PRIORITY_PROCESSING,
Status.PRIORITY_FINISHED
}:
return
now_ = now()
if is_priority:
self.batch_status.setdefault(benchmark_id, {})[start_nonce] = Status.PRIORITY_QUEUED
self.priority_batches.setdefault(challenge_id, deque()).append((batch, now_))
else:
self.batch_status.setdefault(benchmark_id, {})[start_nonce] = Status.QUEUED
self.batches.setdefault(challenge_id, deque()).append((batch, now_))
async def on_update(self):
now_ = now()
if now_ - self.last_update < 10000:
return
self.last_update = now_
logger.info(f"#batches in queue (normal: {sum(len(x) for x in self.batches.values())}, priority: {sum(len(x) for x in self.priority_batches.values())})")
async def on_new_block(self, precommits: Dict[str, Precommit], challenges: Dict[str, Challenge], **kwargs):
for benchmark_id in list(self.batch_status):
if benchmark_id in precommits:
continue
self.batch_status.pop(benchmark_id)
self.challenge_name_2_id = {c.details.name: c.id for c in challenges.values()}
challenge_names = set(self.challenge_name_2_id)
for slave in self.config.slaves:
if slave.challenge_selection is None:
continue
assert set(slave.challenge_selection).issubset(challenge_names), f"challenge_selection for slave regex '{slave.name_regex}' is not a subset of {challenge_names}"
self.lock = False
exit_event = asyncio.Event()
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, exit_event.set)
loop.add_signal_handler(signal.SIGTERM, exit_event.set)
asyncio.create_task(serve(app, config, shutdown_trigger=exit_event.wait))
logger.info(f"webserver started on {config.bind[0]}")

View File

@ -1,8 +1,9 @@
import aiohttp
import asyncio
import logging
import json
import os
from tig_benchmarker.event_bus import *
from tig_benchmarker.extensions.job_manager import Job
from tig_benchmarker.structs import *
from tig_benchmarker.utils import *
from typing import Union
@ -27,202 +28,70 @@ class SubmitProofRequest(FromDict):
@dataclass
class SubmissionsManagerConfig(FromDict):
clear_precommits_submission_on_new_block: bool = True
max_retries: Optional[int] = None
ms_delay_between_retries: int = 60000
time_between_retries: int
@dataclass
class PendingSubmission:
last_retry_time: int
retries: int
request: Union[SubmitPrecommitRequest, SubmitBenchmarkRequest, SubmitProofRequest]
class Extension:
def __init__(self, api_url: str, api_key: str, backup_folder: str, **kwargs):
class SubmissionsManager:
def __init__(self, config: SubmissionsManagerConfig, api_url: str, api_key: str, jobs: List[Job]):
self.config = config
self.jobs = jobs
self.api_url = api_url
self.api_key = api_key
self.backup_folder = backup_folder
if (submissions_manager := kwargs.get("submissions_manager", None)):
self.config = SubmissionsManagerConfig.from_dict(submissions_manager)
else:
self.config = SubmissionsManagerConfig()
self.pending_submissions = {
"precommit": [],
"benchmark": [],
"proof": []
}
self.last_submit = 0
self._restore_pending_submissions()
def _restore_pending_submissions(self):
pending_benchmarks = []
pending_proofs = []
for submission_type in ["benchmark", "proof"]:
path = os.path.join(self.backup_folder, submission_type)
if not os.path.exists(path):
logger.info(f"creating backup folder {path}")
os.makedirs(path, exist_ok=True)
for file in os.listdir(path):
if not file.endswith(".json"):
continue
file_path = os.path.join(path, file)
logger.info(f"restoring {submission_type} from {file_path}")
with open(file_path) as f:
d = json.load(f)
submission = PendingSubmission(
last_retry_time=0,
retries=0,
request=SubmitBenchmarkRequest.from_dict(d) if submission_type == "benchmark" else SubmitProofRequest.from_dict(d)
)
self.pending_submissions[submission_type].append(submission)
async def on_precommit_ready(self, settings: BenchmarkSettings, num_nonces: int, **kwargs):
self.pending_submissions["precommit"].append(
PendingSubmission(
last_retry_time=0,
retries=0,
request=SubmitPrecommitRequest(settings=settings, num_nonces=num_nonces)
)
)
async def on_benchmark_ready(self, benchmark_id: str, merkle_root: MerkleHash, solution_nonces: Set[int], **kwargs):
if any(
x.request.benchmark_id == benchmark_id
for x in self.pending_submissions["benchmark"]
):
logger.warning(f"benchmark {benchmark_id} already pending submission")
else:
request = SubmitBenchmarkRequest(
benchmark_id=benchmark_id,
merkle_root=merkle_root,
solution_nonces=solution_nonces
)
self.pending_submissions["benchmark"].append(
PendingSubmission(
last_retry_time=0,
retries=0,
request=request
)
)
with open(os.path.join(self.backup_folder, "benchmark", f"{benchmark_id}.json"), "w") as f:
json.dump(request.to_dict(), f)
async def on_proof_ready(self, benchmark_id: str, merkle_proofs: List[MerkleProof], **kwargs):
if any(
x.request.benchmark_id == benchmark_id
for x in self.pending_submissions["proof"]
):
logger.warning(f"proof {benchmark_id} already in pending submissions")
else:
request = SubmitProofRequest(
benchmark_id=benchmark_id,
merkle_proofs=merkle_proofs
)
self.pending_submissions["proof"].append(
PendingSubmission(
last_retry_time=0,
retries=0,
request=request
)
)
with open(os.path.join(self.backup_folder, "proof", f"{benchmark_id}.json"), "w") as f:
json.dump(request.to_dict(), f)
async def on_new_block(
self,
block: Block,
precommits: Dict[str, Precommit],
**kwargs
):
if self.config.clear_precommits_submission_on_new_block:
logger.debug(f"clearing {len(self.pending_submissions['precommit'])} pending precommits")
self.pending_submissions["precommit"].clear()
for submission_type in ["benchmark", "proof"]:
filtered_submissions = []
for submission in self.pending_submissions[submission_type]:
if (
submission.request.benchmark_id not in precommits or # is expired
submission.request.benchmark_id in kwargs[submission_type + "s"] # is confirmed
):
self._prune_pending_submission(submission_type, submission.request.benchmark_id)
else:
filtered_submissions.append(submission)
self.pending_submissions[submission_type] = filtered_submissions
def _prune_pending_submission(self, submission_type: str, benchmark_id: str):
logger.debug(f"removing {submission_type} {benchmark_id} from pending submissions")
path = os.path.join(self.backup_folder, submission_type, f"{benchmark_id}.json")
if os.path.exists(path):
os.remove(path)
async def on_update(self):
if (
len(self.pending_submissions['precommit']) == 0 and
len(self.pending_submissions['benchmark']) == 0 and
len(self.pending_submissions['proof']) == 0
):
return
now_ = now()
if now_ - self.last_submit < 5500:
return
self.last_submit = now_
logger.debug(f"pending submissions: (#precommits: {len(self.pending_submissions['precommit'])}, #benchmarks: {len(self.pending_submissions['benchmark'])}, #proofs: {len(self.pending_submissions['proof'])})")
now_ = now()
for submission_type in ["precommit", "benchmark", "proof"]:
if len(self.pending_submissions[submission_type]) > 0:
self.pending_submissions[submission_type] = sorted(self.pending_submissions[submission_type], key=lambda x: x.last_retry_time)
if now_ - self.pending_submissions[submission_type][0].last_retry_time < self.config.ms_delay_between_retries:
logger.debug(f"no {submission_type} ready for submission")
else:
s = self.pending_submissions[submission_type].pop(0)
asyncio.create_task(self.submit(s))
if submission_type == "precommit":
logger.info(f"submitting precommit")
else:
logger.info(f"submitting {submission_type} '{s.request.benchmark_id}'")
async def submit(
self,
submission: PendingSubmission,
):
async def _post(self, submission_type: str, req: Union[SubmitPrecommitRequest, SubmitBenchmarkRequest, SubmitProofRequest]):
headers = {
"X-Api-Key": self.api_key,
"Content-Type": "application/json",
"User-Agent": "tig-benchmarker-py/v0.2"
}
if isinstance(submission.request, SubmitPrecommitRequest):
submission_type = "precommit"
elif isinstance(submission.request, SubmitBenchmarkRequest):
submission_type = "benchmark"
elif isinstance(submission.request, SubmitProofRequest):
submission_type = "proof"
if submission_type == "precommit":
logger.info(f"submitting {submission_type}")
else:
raise ValueError(f"Invalid request type: {type(submission.request)}")
d = submission.request.to_dict()
logger.debug(f"submitting {submission_type}: {d}")
logger.info(f"submitting {submission_type} '{req.benchmark_id}'")
logger.debug(f"{req}")
async with aiohttp.ClientSession() as session:
async with session.post(f"{self.api_url}/submit-{submission_type}", json=d, headers=headers) as resp:
async with session.post(f"{self.api_url}/submit-{submission_type}", json=req.to_dict(), headers=headers) as resp:
text = await resp.text()
if resp.status == 200:
logger.info(f"submitted {submission_type} successfully")
if submission_type != "precommit":
self._prune_pending_submission(submission_type, submission.request.benchmark_id)
await emit(f"submit_{submission_type}_success", text=text, **d)
elif resp.headers.get("Content-Type") == "text/plain":
logger.error(f"status {resp.status} when submitting {submission_type}: {text}")
else:
if resp.headers.get("Content-Type") == "text/plain":
logger.error(f"status {resp.status} when submitting {submission_type}: {text}")
else:
logger.error(f"status {resp.status} when submitting {submission_type}")
if 500 <= resp.status <= 599 and (
self.config.max_retries is None or
submission.retries + 1 < self.config.max_retries
):
submission.retries += 1
submission.last_retry_time = now()
self.pending_submissions[submission_type].append(submission)
await emit(f"submit_{submission_type}_error", text=text, status=resp.status, request=submission.request)
logger.error(f"status {resp.status} when submitting {submission_type}")
def run(self, submit_precommit_req: Optional[SubmitPrecommitRequest]):
now = int(time.time() * 1000)
if submit_precommit_req is None:
logger.debug("no precommit to submit")
else:
asyncio.create_task(self._post("precommit", submit_precommit_req))
for job in self.jobs:
if (
job.merkle_root is not None and
len(job.sampled_nonces) == 0 and
now - job.last_benchmark_submit_time > self.config.time_between_retries
):
job.last_benchmark_submit_time = now
asyncio.create_task(self._post("benchmark", SubmitBenchmarkRequest(
benchmark_id=job.benchmark_id,
merkle_root=job.merkle_root.to_str(),
solution_nonces=job.solution_nonces
)))
break
else:
logger.debug("no benchmark to submit")
for job in self.jobs:
if (
len(job.sampled_nonces) > 0 and
len(job.merkle_proofs) == len(job.sampled_nonces) and
now - job.last_proof_submit_time > self.config.time_between_retries
):
job.last_proof_submit_time = now
asyncio.create_task(self._post("proof", SubmitProofRequest(
benchmark_id=job.benchmark_id,
merkle_proofs=list(job.merkle_proofs.values())
)))
break
else:
logger.debug("no proof to submit")

View File

@ -275,13 +275,7 @@ class TopUp(FromDict):
state: TopUpState
@dataclass
class QueryData(FromDict):
block: Block
algorithms: Dict[str, Algorithm]
wasms: Dict[str, Wasm]
player: Optional[Player]
precommits: Dict[str, Precommit]
benchmarks: Dict[str, Benchmark]
proofs: Dict[str, Proof]
frauds: Dict[str, Fraud]
challenges: Dict[str, Challenge]
class DifficultyData(FromDict):
num_solutions: int
num_nonces: int
difficulty: Point