From e99eca702834f401aa0bba7cdd736e954de11483 Mon Sep 17 00:00:00 2001 From: FiveMovesAhead Date: Sun, 13 Oct 2024 11:02:54 +0100 Subject: [PATCH] Update tig-benchmarker to be easier to configure --- tig-benchmarker/README.md | 69 ++- tig-benchmarker/config.json | 130 +++--- tig-benchmarker/master.py | 95 ++-- tig-benchmarker/slave.py | 153 ++++--- tig-benchmarker/tig_benchmarker/event_bus.py | 47 -- .../extensions/data_fetcher.py | 54 +-- .../extensions/difficulty_sampler.py | 322 ++++++-------- .../tig_benchmarker/extensions/job_manager.py | 412 +++++++----------- .../extensions/precommit_manager.py | 218 +++++---- .../extensions/slave_manager.py | 248 ++++------- .../extensions/submissions_manager.py | 239 +++------- tig-benchmarker/tig_benchmarker/structs.py | 14 +- 12 files changed, 793 insertions(+), 1208 deletions(-) delete mode 100644 tig-benchmarker/tig_benchmarker/event_bus.py diff --git a/tig-benchmarker/README.md b/tig-benchmarker/README.md index 57a4b64..a6598a9 100644 --- a/tig-benchmarker/README.md +++ b/tig-benchmarker/README.md @@ -25,12 +25,13 @@ Python scripts that implements a master/slave Benchmarker for TIG. # in tig-benchmarker folder pip install -r requirements.txt ``` -8. Run a master +8. Edit config.json with your `player_id` and `api_key` +9. Run a master ``` # in tig-benchmarker folder - python3 master.py + python3 master.py ``` -9. Connect at least 1 slave to your master +10. Connect at least 1 slave to your master ``` python3 slave.py ``` @@ -52,40 +53,33 @@ Python scripts that implements a master/slave Benchmarker for TIG. # Optimising your Config -1. Your master should generate enough batches for your slaves to be always busy. But at the same time, you should minimise the delay between your precommit and proof submissions. Observe the elapsed ms of your benchmarks and adjust parameters accordingly to target a particular duration per benchmark (e.g. 15s) - * `max_unresolved_precommits` - how many precommits can be in progress - * `num_nonces` - number of nonces for a precommit for a particular challenge - * Increasing/decreasing above will lead to more/less batches in your backlog +1. `difficulty_sampler_config` allows you to set the `difficulty_range` for each challenge. + * Every block, each challenge recalculates its `base_frontier` and `scaled_frontier` + * The difficulties within these 2 frontiers are "sorted" into easiest to hardest (0.0 is easiest, 1.0 is hardest) + * Benchmarkers can set the `difficulty_range` from which to sample a difficulty. Examples: + * `[0.0, 1.0]` samples the full range of valid difficulties + * `[0.0, 0.1]` samples the easiest 10% of valid difficulties + * Key consideration: easier difficulties may result in more solutions given the same compute, but might not be a qualifier for long if the frontiers get harder -2. You want your slaves to be at 100% CPU utilization, but at the same time, batches should be "cheap" to repeat if a slave has issues. Observe the elapsed ms of batches and adjust parameters accordingly to target a particular duration per batch (e.g. 2s) - * `batch_size` - how many nonces are processed in a batch for a particular challenge. Must be a power of 2 +2. `job_manager_config` allows you to set the `batch_size` for each challenge. + * `batch_size` is the number of nonces that are part of a batch. Must be a power of 2 + * Recommend to pick a `batch_size` for your slave with lowest `num_workers` such that it takes a few seconds to compute (e.g. 5 seconds) + * `batch_size` shouldn't be too small, or else network latency between master and slave will affect performance + * To support slaves with different `num_workers`, see `slave_manager_config` below -3. Slaves on different compute may be better suited for certain challenges. You can define the challenge selection for a slave based on name patterns (patterns are checked in order): - * Any slave, any challenge (this should be the last entry) - ``` - { - "name_regex": ".*", - "challenge_selection": null - } - ``` - * Example: only vector_search - ``` - { - "name_regex": "vector_search-slave-.*", - "challenge_selection": ["vector_search"] - } - ``` +3. `precommit_manager_config` allows you to control your benchmarks: + * `max_pending_benchmarks` is the maximum number of pending benchmarks + * Key consideration: you want batches to always be available for your slaves, but at the same time if you submit benchmarks too slowly, it will have large delays before it will be active + * `num_nonces` is the number of nonces to compute per benchmark. Recommend to adjust based on the logs which tells you the average number of nonces to find a solution. Example log: + * `global qualifier difficulty stats for vehicle_routing: (#nonces: 43739782840, #solutions: 22376, avg_nonces_per_solution: 1954763)` + * `weight` affects how likely the challenge will be picked (weight of 0 will never be picked). Recommend to adjust if the logs warns you to benchmark a specific challenge to increase your cutoff. Example log: + * `recommend finding more solutions for challenge knapsack to avoid being cut off` -4. By default, the benchmarker uses smart challenge selection to maximise your cutoff and to minimise your imbalance. However, if you want to control the chance of a particular challenge being selected, you can set the weight in the algo_selection: - * Example: - ``` - "vehicle_routing": { - "algorithm": "clarke_wright", - "num_nonces": 1000, - "base_fee_limit": "10000000000000000", - "weight": 1.0 <--- weight is relative to other challenges. If other challenge's weights are null/undefined, then this challenge will always be picked - } - ``` +4. `slave_manager_config` allows you to control your slaves: + * When a slave makes a request, the manager iterates through each slave config one at a time until it finds a regex match. The most specific regexes should be earlier in the list, and the more general regexes should be latter in the list. + * `max_concurrent_batches` determines how many batches of that challenge a slave can fetch & process concurrently + * `max_concurrent_batches` also serves as a whitelist of challenges for that slave. If you don't want a slave to benchmark a specific challenge, remove its entry from the list. Example: + * `{"vector_search": 1}` means the slave will only be given `vector_search` batches # Master @@ -98,20 +92,15 @@ The master does no benchmarking! You need to connect slaves ## Usage ``` -usage: master.py [-h] [--api API] [--backup BACKUP] [--port PORT] [--verbose] player_id api_key config_path +usage: master.py [-h] [--verbose] config_path TIG Benchmarker positional arguments: - player_id Player ID - api_key API Key config_path Path to the configuration JSON file options: -h, --help show this help message and exit - --api API API URL (default: https://mainnet-api.tig.foundation) - --backup BACKUP Folder to save pending submissions and other data - --port PORT Port to run the server on (default: 5115) --verbose Print debug logs ``` diff --git a/tig-benchmarker/config.json b/tig-benchmarker/config.json index 5aa7234..7dee750 100644 --- a/tig-benchmarker/config.json +++ b/tig-benchmarker/config.json @@ -1,59 +1,81 @@ { - "extensions": [ - "data_fetcher", - "difficulty_sampler", - "submissions_manager", - "job_manager", - "precommit_manager", - "slave_manager" - ], - "config": { - "job_manager": { - "satisfiability": { - "batch_size": 128 - }, - "vehicle_routing": { - "batch_size": 128 - }, - "knapsack": { - "batch_size": 128 - }, - "vector_search": { - "batch_size": 4 - } - }, - "precommit_manager": { - "max_unresolved_precommits": 5, - "algo_selection": { - "satisfiability": { - "algorithm": "schnoing", - "num_nonces": 1000, - "base_fee_limit": "10000000000000000" - }, - "vehicle_routing": { - "algorithm": "clarke_wright", - "num_nonces": 1000, - "base_fee_limit": "10000000000000000" - }, - "knapsack": { - "algorithm": "dynamic", - "num_nonces": 1000, - "base_fee_limit": "10000000000000000" - }, - "vector_search": { - "algorithm": "optimal_ann", - "num_nonces": 16, - "base_fee_limit": "10000000000000000" - } - } - }, - "slave_manager": { - "slaves": [ - { - "name_regex": ".*", - "challenge_selection": null - } + "player_id": "0x0000000000000000000000000000000000000000", + "api_key": "00000000000000000000000000000000", + "api_url": "https://mainnet-api.tig.foundation", + "difficulty_sampler_config": { + "difficulty_ranges": { + "satisfiability": [ + 0.0, + 0.5 + ], + "vehicle_routing": [ + 0.0, + 0.5 + ], + "knapsack": [ + 0.0, + 0.5 + ], + "vector_search": [ + 0.0, + 0.5 ] } + }, + "job_manager_config": { + "backup_folder": "jobs", + "batch_sizes": { + "satisfiability": 1024, + "vehicle_routing": 1024, + "knapsack": 1024, + "vector_search": 1024 + } + }, + "submissions_manager_config": { + "time_between_retries": 60000 + }, + "precommit_manager_config": { + "max_pending_benchmarks": 4, + "algo_selection": { + "satisfiability": { + "algorithm": "schnoing", + "num_nonces": 1000, + "weight": 1.0, + "base_fee_limit": "10000000000000000" + }, + "vehicle_routing": { + "algorithm": "clarke_wright", + "num_nonces": 1000, + "weight": 1.0, + "base_fee_limit": "10000000000000000" + }, + "knapsack": { + "algorithm": "dynamic", + "num_nonces": 1000, + "weight": 1.0, + "base_fee_limit": "10000000000000000" + }, + "vector_search": { + "algorithm": "optimal_ann", + "num_nonces": 1000, + "weight": 1.0, + "base_fee_limit": "10000000000000000" + } + } + }, + "slave_manager_config": { + "port": 5115, + "time_before_batch_retry": 60000, + "slaves": [ + { + "name_regex": ".*", + "max_concurrent_batches": { + "satisfiability": 1, + "vehicle_routing": 1, + "knapsack": 1, + "vector_search": 1 + } + } + ] } } \ No newline at end of file diff --git a/tig-benchmarker/master.py b/tig-benchmarker/master.py index ed0a4d8..584a1dd 100644 --- a/tig-benchmarker/master.py +++ b/tig-benchmarker/master.py @@ -1,59 +1,63 @@ -import signal import argparse import asyncio -import importlib import json import logging import os -import time -from tig_benchmarker.event_bus import process_events, emit +from tig_benchmarker.extensions.data_fetcher import * +from tig_benchmarker.extensions.difficulty_sampler import * +from tig_benchmarker.extensions.job_manager import * +from tig_benchmarker.extensions.precommit_manager import * +from tig_benchmarker.extensions.slave_manager import * +from tig_benchmarker.extensions.submissions_manager import * +from tig_benchmarker.utils import FromDict logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) -async def main( - player_id: str, - api_key: str, - config: dict, - backup_folder: str, - api_url: str, - port: int -): - exit_event = asyncio.Event() - loop = asyncio.get_running_loop() - loop.add_signal_handler(signal.SIGINT, exit_event.set) - loop.add_signal_handler(signal.SIGTERM, exit_event.set) - extensions = {} - for ext_name in config["extensions"]: - logger.info(f"loading extension {ext_name}") - module = importlib.import_module(f"tig_benchmarker.extensions.{ext_name}") - extensions[ext_name] = module.Extension( - player_id=player_id, - api_key=api_key, - api_url=api_url, - backup_folder=backup_folder, - port=port, - exit_event=exit_event, - **config["config"] - ) +@dataclass +class Config(FromDict): + player_id: str + api_key: str + api_url: str + difficulty_sampler_config: DifficultySamplerConfig + job_manager_config: JobManagerConfig + precommit_manager_config: PrecommitManagerConfig + slave_manager_config: SlaveManagerConfig + submissions_manager_config: SubmissionsManagerConfig - last_update = time.time() - while not exit_event.is_set(): - now = time.time() - if now - last_update > 1: - last_update = now - await emit('update') - await process_events(extensions) - await asyncio.sleep(0.1) +async def main(config: Config): + last_block_id = None + jobs = [] + data_fetcher = DataFetcher(config.api_url, config.player_id) + difficulty_sampler = DifficultySampler(config.difficulty_sampler_config) + job_manager = JobManager(config.job_manager_config, jobs) + precommit_manager = PrecommitManager(config.precommit_manager_config, config.player_id, jobs) + submissions_manager = SubmissionsManager(config.submissions_manager_config, config.api_url, config.api_key, jobs) + slave_manager = SlaveManager(config.slave_manager_config, jobs) + slave_manager.start() + + while True: + try: + data = await data_fetcher.run() + if data["block"].id != last_block_id: + last_block_id = data["block"].id + difficulty_sampler.on_new_block(**data) + job_manager.on_new_block(**data) + precommit_manager.on_new_block(**data) + job_manager.run() + samples = difficulty_sampler.run() + submit_precommit_req = precommit_manager.run(samples) + submissions_manager.run(submit_precommit_req) + except Exception as e: + import traceback + traceback.print_exc() + logger.error(f"{e}") + finally: + await asyncio.sleep(5) if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Benchmarker") - parser.add_argument("player_id", help="Player ID") - parser.add_argument("api_key", help="API Key") parser.add_argument("config_path", help="Path to the configuration JSON file") - parser.add_argument("--api", default="https://mainnet-api.tig.foundation", help="API URL (default: https://mainnet-api.tig.foundation)") - parser.add_argument("--backup", default="backup", help="Folder to save pending submissions and other data") - parser.add_argument("--port", type=int, default=5115, help="Port to run the server on (default: 5115)") parser.add_argument("--verbose", action='store_true', help="Print debug logs") args = parser.parse_args() @@ -68,8 +72,5 @@ if __name__ == "__main__": sys.exit(1) with open(args.config_path, "r") as f: config = json.load(f) - if not os.path.exists(args.backup): - logger.info(f"creating backup folder at {args.backup}") - os.makedirs(args.backup, exist_ok=True) - - asyncio.run(main(args.player_id, args.api_key, config, args.backup, args.api, args.port)) \ No newline at end of file + config = Config.from_dict(config) + asyncio.run(main(config)) \ No newline at end of file diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index ae37f99..5e472e7 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -3,7 +3,8 @@ import json import os import logging import randomname -import requests +import aiohttp +import asyncio import subprocess import time @@ -12,7 +13,71 @@ logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) def now(): return int(time.time() * 1000) -def main( +async def download_wasm(session, download_url, wasm_path): + if not os.path.exists(wasm_path): + start = now() + logger.info(f"downloading WASM from {download_url}") + async with session.get(download_url) as resp: + if resp.status != 200: + raise Exception(f"status {resp.status} when downloading WASM: {await resp.text()}") + with open(wasm_path, 'wb') as f: + f.write(await resp.read()) + logger.debug(f"downloading WASM: took {now() - start}ms") + logger.debug(f"WASM Path: {wasm_path}") + +async def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers): + start = now() + cmd = [ + tig_worker_path, "compute_batch", + json.dumps(batch["settings"]), + batch["rand_hash"], + str(batch["start_nonce"]), + str(batch["num_nonces"]), + str(batch["batch_size"]), + wasm_path, + "--mem", str(batch["wasm_vm_config"]["max_memory"]), + "--fuel", str(batch["wasm_vm_config"]["max_fuel"]), + "--workers", str(num_workers), + ] + if batch["sampled_nonces"]: + cmd += ["--sampled", *map(str, batch["sampled_nonces"])] + logger.info(f"computing batch: {' '.join(cmd)}") + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + if process.returncode != 0: + raise Exception(f"tig-worker failed: {stderr.decode()}") + result = json.loads(stdout.decode()) + logger.info(f"computing batch took {now() - start}ms") + logger.debug(f"batch result: {result}") + return result + +async def process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers): + try: + batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}" + logger.info(f"Processing batch {batch_id}: {batch}") + + # Step 2: Download WASM + wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") + await download_wasm(session, batch['download_url'], wasm_path) + + # Step 3: Run tig-worker + result = await run_tig_worker(tig_worker_path, batch, wasm_path, num_workers) + + # Step 4: Submit results + start = now() + submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}" + logger.info(f"posting results to {submit_url}") + async with session.post(submit_url, json=result, headers=headers) as resp: + if resp.status != 200: + raise Exception(f"status {resp.status} when posting results to master: {await resp.text()}") + logger.debug(f"posting results took {now() - start} ms") + + except Exception as e: + logger.error(f"Error processing batch {batch_id}: {e}") + +async def main( master_ip: str, tig_worker_path: str, download_wasms_folder: str, @@ -28,67 +93,29 @@ def main( "User-Agent": slave_name } - while True: - try: - # Step 1: Query for job - start = now() - get_batch_url = f"http://{master_ip}:{master_port}/get-batch" - logger.info(f"fetching job from {get_batch_url}") - resp = requests.get(get_batch_url, headers=headers) - if resp.status_code != 200: - raise Exception(f"status {resp.status_code} when fetching job: {resp.text}") - logger.debug(f"fetching job: took {now() - start}ms") - batch = resp.json() - batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}" - logger.debug(f"batch {batch_id}: {batch}") - - # Step 2: Download WASM - wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") - if not os.path.exists(wasm_path): + async with aiohttp.ClientSession() as session: + while True: + try: + # Step 1: Query for job start = now() - logger.info(f"downloading WASM from {batch['download_url']}") - resp = requests.get(batch['download_url']) - if resp.status_code != 200: - raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}") - with open(wasm_path, 'wb') as f: - f.write(resp.content) - logger.debug(f"downloading WASM: took {now() - start}ms") - logger.debug(f"WASM Path: {wasm_path}") - - # Step 3: Run tig-worker - start = now() - cmd = [ - tig_worker_path, "compute_batch", - json.dumps(batch["settings"]), - batch["rand_hash"], - str(batch["start_nonce"]), - str(batch["num_nonces"]), - str(batch["batch_size"]), - wasm_path, - "--mem", str(batch["wasm_vm_config"]["max_memory"]), - "--fuel", str(batch["wasm_vm_config"]["max_fuel"]), - "--workers", str(num_workers), - ] - if batch["sampled_nonces"]: - cmd += ["--sampled", *map(str, batch["sampled_nonces"])] - logger.info(f"computing batch: {' '.join(cmd)}") - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - result = json.loads(result.stdout) - logger.info(f"computing batch took {now() - start}ms") - logger.debug(f"batch result: {result}") - - # Step 4: Submit results - start = now() - submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}" - logger.info(f"posting results to {submit_url}") - resp = requests.post(submit_url, json=result, headers=headers) - if resp.status_code != 200: - raise Exception(f"status {resp.status_code} when posting results to master: {resp.text}") - logger.debug(f"posting results took {now() - start} seconds") - - except Exception as e: - logger.error(e) - time.sleep(5) + get_batch_url = f"http://{master_ip}:{master_port}/get-batches" + logger.info(f"fetching job from {get_batch_url}") + async with session.get(get_batch_url, headers=headers) as resp: + if resp.status != 200: + raise Exception(f"status {resp.status} when fetching job: {await resp.text()}") + batches = await resp.json(content_type=None) + logger.debug(f"fetching job: took {now() - start}ms") + + # Process batches concurrently + tasks = [ + process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers) + for batch in batches + ] + await asyncio.gather(*tasks) + + except Exception as e: + logger.error(e) + await asyncio.sleep(5) if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") @@ -107,4 +134,4 @@ if __name__ == "__main__": level=logging.DEBUG if args.verbose else logging.INFO ) - main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port) \ No newline at end of file + asyncio.run(main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port)) \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/event_bus.py b/tig-benchmarker/tig_benchmarker/event_bus.py deleted file mode 100644 index aa74006..0000000 --- a/tig-benchmarker/tig_benchmarker/event_bus.py +++ /dev/null @@ -1,47 +0,0 @@ -import asyncio -import logging -import os -import time -from asyncio import Queue -from collections import defaultdict -from typing import Callable, Dict, List, Optional - -logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) - -_queue = [] - -async def emit(event_name: str, **kwargs) -> None: - global _queue - _queue.append((event_name, kwargs)) - -async def _safe_execute(extension_name: str, event_name: str, handler: Callable, kwargs: dict) -> None: - try: - await handler(**kwargs) - except Exception as e: - import traceback - traceback.print_exc() - logger.error(f"'{extension_name}.on_{event_name}': {str(e)}") - -async def process_events(extensions: dict): - global _queue - num_events = len(_queue) - event_names = set( - _queue[i][0] - for i in range(num_events) - ) - handlers = { - e: [ - (ext_name, getattr(ext, f'on_{e}')) - for ext_name, ext in extensions.items() - if hasattr(ext, f'on_{e}') - ] - for e in event_names - } - await asyncio.gather( - *[ - _safe_execute(ext_name, _queue[i][0], h, _queue[i][1]) - for i in range(num_events) - for (ext_name, h) in handlers[_queue[i][0]] - ] - ) - _queue = _queue[num_events:] \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/extensions/data_fetcher.py b/tig-benchmarker/tig_benchmarker/extensions/data_fetcher.py index d11b360..b8fc42c 100644 --- a/tig-benchmarker/tig_benchmarker/extensions/data_fetcher.py +++ b/tig-benchmarker/tig_benchmarker/extensions/data_fetcher.py @@ -3,7 +3,6 @@ import asyncio import json import logging import os -from tig_benchmarker.event_bus import * from tig_benchmarker.structs import * from tig_benchmarker.utils import * from typing import Dict, Any @@ -25,25 +24,21 @@ async def _get(url: str) -> Dict[str, Any]: logger.error(err_msg) raise Exception(err_msg) -class Extension: - def __init__(self, api_url: str, player_id: str, **kwargs): +class DataFetcher: + def __init__(self, api_url: str, player_id: str): self.api_url = api_url self.player_id = player_id self.last_fetch = 0 self._cache = None - async def on_update(self): - now_ = now() - if now_ - self.last_fetch < 10000: - return - self.last_fetch = now_ + async def run(self) -> dict: logger.debug("fetching latest block") block_data = await _get(f"{self.api_url}/get-block") block = Block.from_dict(block_data["block"]) if self._cache is not None and block.id == self._cache["block"].id: logger.debug("no new block data") - return + return self._cache logger.info(f"new block @ height {block.details.height}, fetching data") tasks = [ @@ -63,32 +58,18 @@ class Extension: precommits = {b["benchmark_id"]: Precommit.from_dict(b) for b in benchmarks_data["precommits"]} benchmarks = {b["id"]: Benchmark.from_dict(b) for b in benchmarks_data["benchmarks"]} proofs = {p["benchmark_id"]: Proof.from_dict(p) for p in benchmarks_data["proofs"]} - frauds = {f["benchmark_id"]: Fraud.from_dict(f) for f in benchmarks_data["frauds"]} - for benchmark_id, precommit in precommits.items(): - benchmark = benchmarks.get(benchmark_id, None) - proof = proofs.get(benchmark_id, None) - if proof is not None: - if self._cache is None or benchmark_id not in self._cache["proofs"]: - await emit( - "proof_confirmed", - precommit=precommit, - benchmark=benchmark, - proof=proof, - ) - elif benchmark is not None: - if self._cache is None or benchmark_id not in self._cache["benchmarks"]: - await emit( - "benchmark_confirmed", - precommit=precommit, - benchmark=benchmark, - ) - elif self._cache is None or benchmark_id not in self._cache["precommits"]: - await emit( - "precommit_confirmed", - precommit=precommit - ) - + frauds = {f["benchmark_id"]: Fraud.from_dict(f) for f in benchmarks_data["frauds"]} challenges = {c["id"]: Challenge.from_dict(c) for c in challenges_data["challenges"]} + + tasks = [ + _get(f"{self.api_url}/get-difficulty-data?block_id={block.id}&challenge_id={c_id}") + for c_id in challenges + ] + difficulty_data = await asyncio.gather(*tasks) + difficulty_data = { + c_id: [DifficultyData.from_dict(x) for x in d["data"]] + for c_id, d in zip(challenges, difficulty_data) + } self._cache = { "block": block, @@ -99,6 +80,7 @@ class Extension: "benchmarks": benchmarks, "proofs": proofs, "frauds": frauds, - "challenges": challenges + "challenges": challenges, + "difficulty_data": difficulty_data } - await emit("new_block", **self._cache) + return self._cache diff --git a/tig-benchmarker/tig_benchmarker/extensions/difficulty_sampler.py b/tig-benchmarker/tig_benchmarker/extensions/difficulty_sampler.py index fa26b5f..d484129 100644 --- a/tig-benchmarker/tig_benchmarker/extensions/difficulty_sampler.py +++ b/tig-benchmarker/tig_benchmarker/extensions/difficulty_sampler.py @@ -1,211 +1,145 @@ import asyncio import logging +import math import numpy as np import os -from tig_benchmarker.event_bus import * +import random from tig_benchmarker.structs import * -from tig_benchmarker.utils import FromDict from typing import List, Tuple, Dict logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) +Point = List[int] +Frontier = List[Point] + +def calc_valid_difficulties(block_data: ChallengeBlockData) -> List[Point]: + """ + Calculates a list of all difficulty combinations within the base and scaled frontiers + """ + hardest_difficulty = np.max([ + np.max(list(block_data.scaled_frontier), axis=0), + np.max(list(block_data.base_frontier), axis=0), + ], axis=0) + min_difficulty = np.max([ + np.min(list(block_data.scaled_frontier), axis=0), + np.min(list(block_data.base_frontier), axis=0), + ], axis=0) + weights = np.zeros(hardest_difficulty - min_difficulty + 1, dtype=float) + lower_cutoff_points = np.array(list(block_data.base_frontier)) - min_difficulty + upper_cutoff_points = np.array(list(block_data.scaled_frontier)) - min_difficulty + if block_data.scaling_factor < 1.0: + lower_cutoff_points, upper_cutoff_points = upper_cutoff_points, lower_cutoff_points + + lower_cutoff_points = lower_cutoff_points[np.argsort(lower_cutoff_points[:, 0]), :] + upper_cutoff_points = upper_cutoff_points[np.argsort(upper_cutoff_points[:, 0]), :] + lower_cutoff_idx = 0 + lower_cutoff = lower_cutoff_points[lower_cutoff_idx] + upper_cutoff_idx = 0 + upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx] + if len(upper_cutoff_points) > 1: + upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1] + else: + upper_cutoff2 = upper_cutoff1 + + for i in range(weights.shape[0]): + if lower_cutoff_idx + 1 < len(lower_cutoff_points) and i == lower_cutoff_points[lower_cutoff_idx + 1, 0]: + lower_cutoff_idx += 1 + lower_cutoff = lower_cutoff_points[lower_cutoff_idx] + if upper_cutoff_idx + 1 < len(upper_cutoff_points) and i == upper_cutoff_points[upper_cutoff_idx + 1, 0]: + upper_cutoff_idx += 1 + upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx] + if upper_cutoff_idx + 1 < len(upper_cutoff_points): + upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1] + else: + upper_cutoff2 = upper_cutoff1 + if i >= lower_cutoff[0]: + start = lower_cutoff[1] + else: + start = lower_cutoff[1] + 1 + if i <= upper_cutoff2[0]: + weights[i, start:upper_cutoff2[1] + 1] = 1.0 + if i < upper_cutoff2[0]: + weights[i, start:upper_cutoff1[1]] = 1.0 + if i == upper_cutoff1[0]: + weights[i, upper_cutoff1[1]] = 1.0 + + valid_difficulties = np.stack(np.where(weights), axis=1) + min_difficulty + return valid_difficulties.tolist() + +def calc_pareto_frontier(points: List[Point]) -> Frontier: + """ + Calculates a single Pareto frontier from a list of points + Adapted from https://stackoverflow.com/questions/32791911/fast-calculation-of-pareto-front-in-python + """ + points_ = points + points = np.array(points) + frontier_idxs = np.arange(points.shape[0]) + n_points = points.shape[0] + next_point_index = 0 # Next index in the frontier_idxs array to search for + while next_point_index < len(points): + nondominated_point_mask = np.any(points < points[next_point_index], axis=1) + nondominated_point_mask[np.all(points == points[next_point_index], axis=1)] = True + frontier_idxs = frontier_idxs[nondominated_point_mask] # Remove dominated points + points = points[nondominated_point_mask] + next_point_index = np.sum(nondominated_point_mask[:next_point_index]) + 1 + return [points_[idx] for idx in frontier_idxs] + +def calc_all_frontiers(points: List[Point]) -> List[Frontier]: + """ + Calculates a list of Pareto frontiers from a list of points + """ + buckets = {} + r = np.max(points, axis=0) - np.min(points, axis=0) + dim1, dim2 = (1, 0) if r[0] > r[1] else (0, 1) + for p in points: + if p[dim1] not in buckets: + buckets[p[dim1]] = [] + buckets[p[dim1]].append(p) + for bucket in buckets.values(): + bucket.sort(reverse=True, key=lambda x: x[dim2]) + frontiers = [] + while len(buckets) > 0: + points = [bucket[-1] for bucket in buckets.values()] + frontier = calc_pareto_frontier(points) + for p in frontier: + x = p[dim1] + buckets[x].pop() + if len(buckets[x]) == 0: + buckets.pop(x) + frontiers.append(frontier) + return frontiers + +@dataclass class DifficultySamplerConfig(FromDict): - num_samples: int = 100 - padding_factor: float = 0.2 - decay: float = 0.7 - initial_solutions_weight: float = 500.0 - solutions_multiplier: float = 10.0 + difficulty_ranges: Dict[str, Tuple[float, float]] + + def __post_init__(self): + for c_name, (start, end) in self.difficulty_ranges.items(): + if start < 0 or start > 1 or end < 0 or end > 1 or start > end: + raise ValueError(f"Invalid difficulty range for challenge {c_name}. Must be (start, end) where '0 <= start <= end <= 1'") class DifficultySampler: def __init__(self, config: DifficultySamplerConfig): self.config = config - self.min_difficulty = None - self.padding = None - self.dimensions = None - self.weights = np.empty((0,0,2), dtype=float) - self.distribution = None + self.valid_difficulties = {} + self.frontiers = {} - def sample(self): - if self.distribution is None: - raise ValueError("You must update sampler first") - p = self.distribution.flatten() - idx = np.random.choice(len(p), p=p) - num_cols = self.dimensions[1] + self.padding[1] - x = idx // num_cols - y = idx % num_cols - return [int(x + self.min_difficulty[0]), int(y + self.min_difficulty[1])] - - def update_with_block_data(self, min_difficulty: List[int], block_data): - assert len(min_difficulty) == 2, "Only difficulty with 2 parameters are supported" - min_difficulty = np.array(min_difficulty) - - if self.min_difficulty is None: - left_pad = np.zeros(2, dtype=int) - else: - left_pad = min_difficulty - self.min_difficulty - self.min_difficulty = min_difficulty - self.update_dimensions_and_padding(block_data) - size = self.dimensions + self.padding - self.resize_weights(left_pad, size) - - self.update_valid_range(block_data) - self.update_distributions() - - def update_with_solutions(self, difficulty: List[int], num_solutions: int): - center = np.array(difficulty) - self.min_difficulty - - x_min = max(0, center[0] - self.padding[0]) - x_max = min(self.weights.shape[0] - 1, center[0] + self.padding[0]) - y_min = max(0, center[1] - self.padding[1]) - y_max = min(self.weights.shape[1] - 1, center[1] + self.padding[1]) - if x_min > x_max or y_min > y_max: - return - y, x = np.meshgrid( - np.arange(y_min, y_max + 1, dtype=float), - np.arange(x_min, x_max + 1, dtype=float) - ) - position = np.stack((x, y), axis=-1) - dist = np.linalg.norm((position - center) / self.padding, axis=-1) - decay = dist * (1.0 - self.config.decay) + self.config.decay - delta = (1.0 - decay) * num_solutions * self.config.solutions_multiplier - decay[np.where(dist > 1.0)] = 1.0 - delta[np.where(dist > 1.0)] = 0.0 - - self.weights[x_min:x_max + 1, y_min:y_max + 1, 1] *= decay - self.weights[x_min:x_max + 1, y_min:y_max + 1, 1] += delta - - def update_valid_range(self, block_data): - lower_cutoff_points = np.array(list(block_data.base_frontier)) - self.min_difficulty - upper_cutoff_points = np.array(list(block_data.scaled_frontier)) - self.min_difficulty - - if block_data.scaling_factor < 1.0: - lower_cutoff_points, upper_cutoff_points = upper_cutoff_points, lower_cutoff_points - - lower_cutoff_points = lower_cutoff_points[np.argsort(lower_cutoff_points[:, 0]), :] - upper_cutoff_points = upper_cutoff_points[np.argsort(upper_cutoff_points[:, 0]), :] - lower_cutoff_idx = 0 - lower_cutoff = lower_cutoff_points[lower_cutoff_idx] - upper_cutoff_idx = 0 - upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx] - if len(upper_cutoff_points) > 1: - upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1] - else: - upper_cutoff2 = upper_cutoff1 - self.weights[:, :, 0] = 0.0 - for i in range(self.weights.shape[0]): - if lower_cutoff_idx + 1 < len(lower_cutoff_points) and i == lower_cutoff_points[lower_cutoff_idx + 1, 0]: - lower_cutoff_idx += 1 - lower_cutoff = lower_cutoff_points[lower_cutoff_idx] - if upper_cutoff_idx + 1 < len(upper_cutoff_points) and i == upper_cutoff_points[upper_cutoff_idx + 1, 0]: - upper_cutoff_idx += 1 - upper_cutoff1 = upper_cutoff_points[upper_cutoff_idx] - if upper_cutoff_idx + 1 < len(upper_cutoff_points): - upper_cutoff2 = upper_cutoff_points[upper_cutoff_idx + 1] - else: - upper_cutoff2 = upper_cutoff1 - - if i >= lower_cutoff[0]: - start = lower_cutoff[1] - else: - start = lower_cutoff[1] + 1 - if i <= upper_cutoff2[0]: - self.weights[i, start:upper_cutoff2[1] + 1, 0] = 1.0 - if i < upper_cutoff2[0]: - self.weights[i, start:upper_cutoff1[1], 0] = 1.0 - if i == upper_cutoff1[0]: - self.weights[i, upper_cutoff1[1], 0] = 1.0 - - def update_distributions(self): - distribution = np.prod(self.weights, axis=2) - distribution /= np.sum(distribution) - self.distribution = distribution - - def resize_weights(self, left_pad: np.ndarray, size: np.ndarray): - default_values = [0.0, self.config.initial_solutions_weight] - if left_pad[0] > 0: - pad = np.full((left_pad[0], self.weights.shape[1], 2), default_values) - self.weights = np.vstack((pad, self.weights)) - elif left_pad[0] < 0: - self.weights = self.weights[-left_pad[0]:, :, :] - if left_pad[1] > 0: - pad = np.full((self.weights.shape[0], left_pad[1], 2), default_values) - self.weights = np.hstack((pad, self.weights)) - elif left_pad[1] < 0: - self.weights = self.weights[:, -left_pad[1]:, :] - right_pad = size - self.weights.shape[:2] - if right_pad[0] > 0: - pad = np.full((right_pad[0], self.weights.shape[1], 2), default_values) - self.weights = np.vstack((self.weights, pad)) - elif right_pad[0] < 0: - self.weights = self.weights[:size[0], :, :] - if right_pad[1] > 0: - pad = np.full((self.weights.shape[0], right_pad[1], 2), default_values) - self.weights = np.hstack((self.weights, pad)) - elif right_pad[1] < 0: - self.weights = self.weights[:, :size[1], :] - - def update_dimensions_and_padding(self, block_data): - hardest_difficulty = np.max([ - np.max(list(block_data.scaled_frontier), axis=0), - np.max(list(block_data.base_frontier), axis=0), - ], axis=0) - if block_data.qualifier_difficulties is not None and len(block_data.qualifier_difficulties) > 0: - hardest_difficulty = np.max([ - hardest_difficulty, - np.max(list(block_data.qualifier_difficulties), axis=0) - ], axis=0) - self.dimensions = hardest_difficulty - self.min_difficulty + 1 - self.padding = np.ceil(self.dimensions * self.config.padding_factor).astype(int) - -class Extension: - def __init__(self, **kwargs): - if (difficulty_sampler := kwargs.get("difficulty_sampler", None)): - self.config = DifficultySamplerConfig.from_dict(difficulty_sampler) - else: - self.config = DifficultySamplerConfig() - self.samplers = {} - self.lock = True - - async def on_new_block( - self, - block: Block, - challenges: Dict[str, Challenge], - **kwargs # ignore other data - ): - logger.debug("new block, updating difficulty samplers") - for challenge in challenges.values(): - if challenge.block_data is None: + def on_new_block(self, challenges: Dict[str, Challenge], **kwargs): + for c in challenges.values(): + if c.block_data is None: continue - if challenge.id not in self.samplers: - self.samplers[challenge.id] = (challenge.details.name, DifficultySampler(self.config)) - logger.debug(f"updating sampler for challenge {challenge.details.name}") - self.samplers[challenge.id][1].update_with_block_data( - min_difficulty=[ - param["min_value"] - for param in block.config["difficulty"]["parameters"][challenge.id] - ], - block_data=challenge.block_data - ) - logger.debug(f"emitting {self.config.num_samples} difficulty samples for challenge {challenge.details.name}") - await emit( - "difficulty_samples", - challenge_id=challenge.id, - block_id=block.id, - samples=[self.samplers[challenge.id][1].sample() for _ in range(self.config.num_samples)] - ) - self.lock = False + logger.debug(f"Calculating valid difficulties and frontiers for challenge {c.details.name}") + self.valid_difficulties[c.details.name] = calc_valid_difficulties(c.block_data) + self.frontiers[c.details.name] = calc_all_frontiers(self.valid_difficulties[c.details.name]) - async def on_benchmark_confirmed(self, precommit: Precommit, benchmark: Benchmark): - while self.lock: - await asyncio.sleep(0.5) - challenge_id = precommit.settings.challenge_id - num_solutions = benchmark.details.num_solutions - if challenge_id not in self.samplers: - logger.warning(f"no sampler for challenge {challenge_id}") - else: - challenge_name, sampler = self.samplers[challenge_id] - logger.debug(f"updating sampler for challenge {challenge_name} with {num_solutions} solutions @ difficulty {precommit.settings.difficulty}") - sampler.update_with_solutions( - difficulty=precommit.settings.difficulty, - num_solutions=num_solutions - ) \ No newline at end of file + def run(self) -> Dict[str, Point]: + samples = {} + for c_name, frontiers in self.frontiers.items(): + difficulty_range = self.config.difficulty_ranges[c_name] # FIXME + idx1 = math.floor(difficulty_range[0] * (len(frontiers) - 1)) + idx2 = math.ceil(difficulty_range[1] * (len(frontiers) - 1)) + difficulties = [p for frontier in frontiers[idx1:idx2 + 1] for p in frontier] + difficulty = random.choice(difficulties) + samples[c_name] = difficulty + logger.debug(f"Sampled difficulty {difficulty} for challenge {c_name}") + return samples \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/extensions/job_manager.py b/tig-benchmarker/tig_benchmarker/extensions/job_manager.py index fc21715..7138bce 100644 --- a/tig-benchmarker/tig_benchmarker/extensions/job_manager.py +++ b/tig-benchmarker/tig_benchmarker/extensions/job_manager.py @@ -3,7 +3,6 @@ import os import json import logging from dataclasses import dataclass -from tig_benchmarker.event_bus import * from tig_benchmarker.merkle_tree import MerkleHash, MerkleBranch, MerkleTree from tig_benchmarker.structs import * from tig_benchmarker.utils import * @@ -16,275 +15,172 @@ class Job(FromDict): benchmark_id: str settings: BenchmarkSettings num_nonces: int - batch_size: int - num_batches: int rand_hash: str - sampled_nonces: List[int] - sampled_nonces_by_batch_idx: Dict[int, List[int]] wasm_vm_config: Dict[str, int] download_url: str - solution_nonces: List[int] - batch_merkle_roots: List[Optional[MerkleHash]] - merkle_proofs: Dict[int, MerkleProof] - last_retry_time: List[int] - start_time: int + batch_size: int + challenge: str + sampled_nonces: Optional[List[int]] = field(default_factory=list) + merkle_root: Optional[MerkleHash] = None + solution_nonces: List[int] = field(default_factory=list) + merkle_proofs: Dict[int, MerkleProof] = field(default_factory=dict) + solution_nonces: List[int] = field(default_factory=list) + batch_merkle_proofs: Dict[int, MerkleProof] = field(default_factory=dict) + batch_merkle_roots: List[Optional[MerkleHash]] = None + last_benchmark_submit_time: int = 0 + last_proof_submit_time: int = 0 + last_batch_retry_time: List[int] = None + + def __post_init__(self): + self.batch_merkle_roots = [None] * self.num_batches + self.last_batch_retry_time = [0] * self.num_batches + + @property + def num_batches(self) -> int: + return (self.num_nonces + self.batch_size - 1) // self.batch_size + + @property + def sampled_nonces_by_batch_idx(self) -> Dict[int, List[int]]: + ret = {} + for nonce in self.sampled_nonces: + batch_idx = nonce // self.batch_size + ret.setdefault(batch_idx, []).append(nonce) + return ret @dataclass class JobManagerConfig(FromDict): - batch_size: int - ms_delay_between_batch_retries: Optional[int] = None + backup_folder: str + batch_sizes: Dict[str, int] -class Extension: - def __init__(self, backup_folder: str, job_manager: dict, **kwargs): - self.backup_folder = backup_folder - self.config = {k: JobManagerConfig.from_dict(v) for k, v in job_manager.items()} - for challenge_name, config in self.config.items(): - batch_size = config.batch_size - assert (batch_size & (batch_size - 1) == 0) and batch_size != 0, f"batch_size {batch_size} for challenge {challenge_name} is not a power of 2" - self.jobs = {} - self.wasm_vm_config = {} - self.challenge_id_2_name = {} - self.download_urls = {} - self.benchmark_ready = set() - self.proof_ready = set() - self.lock = True - self._restore_jobs() - - async def on_new_block( - self, - block: Block, - precommits: Dict[str, Precommit], - wasms: Dict[str, Wasm], - challenges: Dict[str, Challenge], - **kwargs - ): - self.wasm_vm_config = block.config["wasm_vm"] - self.download_urls = { - w.algorithm_id: w.details.download_url - for w in wasms.values() - } - prune_jobs = [ - job - for benchmark_id, job in self.jobs.items() - if benchmark_id not in precommits - ] - for job in prune_jobs: - self._prune_job(job) - for c in challenges.values(): - self.challenge_id_2_name[c.id] = c.details.name - self.lock = False - - async def on_precommit_confirmed(self, precommit: Precommit, **kwargs): - while self.lock: - await asyncio.sleep(0.1) - benchmark_id = precommit.benchmark_id - if benchmark_id not in self.jobs: - c_name = self.challenge_id_2_name[precommit.settings.challenge_id] - batch_size = self.config[c_name].batch_size - num_batches = (precommit.details.num_nonces + batch_size - 1) // batch_size - job = Job( - benchmark_id=benchmark_id, - settings=precommit.settings, - num_nonces=precommit.details.num_nonces, - num_batches=num_batches, - rand_hash=precommit.state.rand_hash, - sampled_nonces_by_batch_idx={}, - sampled_nonces=[], - wasm_vm_config=self.wasm_vm_config, - download_url=self.download_urls[precommit.settings.algorithm_id], - batch_size=batch_size, - solution_nonces=[], - batch_merkle_roots=[None] * num_batches, - merkle_proofs={}, - last_retry_time=[0] * num_batches, - start_time=now() - ) - self.jobs[benchmark_id] = job - self._save_job(job) - - async def on_benchmark_confirmed(self, precommit: Precommit, benchmark: Optional[Benchmark], **kwargs): - await self.on_precommit_confirmed(precommit, **kwargs) - if benchmark is not None: - job = self.jobs[precommit.benchmark_id] - for nonce in benchmark.state.sampled_nonces: - batch_idx = nonce // job.batch_size - job.sampled_nonces_by_batch_idx.setdefault(batch_idx, []).append(nonce) - job.last_retry_time[batch_idx] = 0 - job.sampled_nonces = benchmark.state.sampled_nonces - - async def on_proof_confirmed(self, proof: Proof, **kwargs): - if (job := self.jobs.get(proof.benchmark_id, None)) is not None: - self._prune_job(job) - - async def on_update(self): - if self.lock: - return - for benchmark_id, job in self.jobs.items(): - if ( - benchmark_id not in self.proof_ready and - len(job.sampled_nonces) > 0 and - set(job.sampled_nonces) == set(job.merkle_proofs) and - all(x is not None for x in job.batch_merkle_roots) - ): - self.proof_ready.add(benchmark_id) - await self._emit_proof(job) - elif ( - benchmark_id not in self.benchmark_ready and - all(x is not None for x in job.batch_merkle_roots) - ): - self.benchmark_ready.add(benchmark_id) - await self._emit_benchmark(job) - else: - await self._emit_batches(job) - - async def on_batch_result( - self, - benchmark_id: str, - start_nonce: int, - solution_nonces: List[int], - merkle_root: MerkleHash, - merkle_proofs: List[MerkleProof], - **kwargs - ): - merkle_root = MerkleHash.from_str(merkle_root) - merkle_proofs = [MerkleProof.from_dict(x) for x in merkle_proofs] - - if benchmark_id not in self.jobs: - logger.warning(f"job not found for benchmark {benchmark_id}") - else: - job = self.jobs[benchmark_id] - batch_idx = start_nonce // job.batch_size - # validate batch result (does not check the output data) - logger.debug(f"validating batch results for {benchmark_id} @ index {batch_idx}") - assert start_nonce % job.batch_size == 0, "start_nonce not aligned with batch size" - assert all(start_nonce <= n < start_nonce + job.num_nonces for n in solution_nonces), "solution nonces not in batch" - left = set(job.sampled_nonces_by_batch_idx.get(batch_idx, [])) - right = set(x.leaf.nonce for x in merkle_proofs) - if len(left) > 0 and len(right) == 0: - logger.warning(f"no merkle proofs for batch {batch_idx} of {benchmark_id}") - return - assert left == right, f"sampled nonces {left} do not match proofs {right}" - assert all(x.branch.calc_merkle_root( - hashed_leaf=x.leaf.to_merkle_hash(), - branch_idx=x.leaf.nonce - start_nonce, # branch idx of batch tree - ) == merkle_root for x in merkle_proofs), "merkle proofs do not match merkle root" - - num_nonces = min(job.batch_size, job.num_nonces - start_nonce) - job.solution_nonces.extend(solution_nonces) - job.batch_merkle_roots[batch_idx] = merkle_root - job.merkle_proofs.update({x.leaf.nonce: x for x in merkle_proofs}) - - self._save_job(job) - - def _restore_jobs(self): - path = os.path.join(self.backup_folder, "jobs") - if not os.path.exists(path): - logger.info(f"creating backup folder {path}") - os.makedirs(path, exist_ok=True) - for file in os.listdir(path): - if not file.endswith('.json'): +class JobManager: + def __init__(self, config: JobManagerConfig, jobs: List[Job]): + self.config = config + self.jobs = jobs + os.makedirs(self.config.backup_folder, exist_ok=True) + for file in os.listdir(self.config.backup_folder): + if not file.endswith(".json"): continue - file_path = os.path.join(path, file) + file_path = f"{self.config.backup_folder}/{file}" logger.info(f"restoring job from {file_path}") with open(file_path) as f: job = Job.from_dict(json.load(f)) - self.jobs[job.benchmark_id] = job + self.jobs.append(job) - def _save_job(self, job: Job): - path = os.path.join(self.backup_folder, "jobs", f"{job.benchmark_id}.json") - with open(path, 'w') as f: - json.dump(job.to_dict(), f) - - def _prune_job(self, job: Job): - path = os.path.join(self.backup_folder, "jobs", f"{job.benchmark_id}.json") - logger.debug(f"pruning job {path}") - self.jobs.pop(job.benchmark_id, None) - if os.path.exists(path): - os.remove(path) - if job.benchmark_id in self.benchmark_ready: - self.benchmark_ready.remove(job.benchmark_id) - if job.benchmark_id in self.proof_ready: - self.proof_ready.remove(job.benchmark_id) - - async def _emit_proof(self, job: Job): - # join merkle_proof for the job tree (all batches) with merkle_proof of the batch tree - depth_offset = (job.batch_size - 1).bit_length() - tree = MerkleTree( - job.batch_merkle_roots, - 1 << (job.num_batches - 1).bit_length() - ) - proofs = {} - for batch_idx in job.sampled_nonces_by_batch_idx: - upper_stems = [ - (d + depth_offset, h) - for d, h in tree.calc_merkle_branch(batch_idx).stems - ] - for nonce in set(job.sampled_nonces_by_batch_idx[batch_idx]): - proof = job.merkle_proofs[nonce] - proofs[nonce] = MerkleProof( - leaf=proof.leaf, - branch=MerkleBranch(proof.branch.stems + upper_stems) - ) - c_name = self.challenge_id_2_name[job.settings.challenge_id] - logger.info(f"proof {job.benchmark_id} ready: (challenge: {c_name}, elapsed: {now() - job.start_time}ms)") - await emit( - "proof_ready", - benchmark_id=job.benchmark_id, - merkle_proofs=list(proofs.values()) - ) - - async def _emit_benchmark(self, job: Job): - tree = MerkleTree( - job.batch_merkle_roots, - 1 << (job.num_batches - 1).bit_length() - ) - root = tree.calc_merkle_root() - c_name = self.challenge_id_2_name[job.settings.challenge_id] - logger.info(f"benchmark {job.benchmark_id} ready: (challenge: {c_name}, num_solutions: {len(job.solution_nonces)}, elapsed: {now() - job.start_time}ms)") - await emit( - "benchmark_ready", - benchmark_id=job.benchmark_id, - merkle_root=root, - solution_nonces=list(set(job.solution_nonces)) - ) - - async def _emit_batches(self, job: Job): - c_name = self.challenge_id_2_name[job.settings.challenge_id] - ms_delay_between_batch_retries = self.config[c_name].ms_delay_between_batch_retries or 30000 - now_ = now() - retry_batch_idxs = [ - batch_idx - for batch_idx in range(job.num_batches) + def on_new_block( + self, + block: Block, + precommits: Dict[str, Precommit], + benchmarks: Dict[str, Benchmark], + proofs: Dict[str, Proof], + challenges: Dict[str, Challenge], + wasms: Dict[str, Wasm], + **kwargs + ): + job_idxs = { + j.benchmark_id: idx + for idx, j in enumerate(self.jobs) + } + # create jobs from confirmed precommits + challenge_id_2_name = { + c.id: c.details.name + for c in challenges.values() + } + for benchmark_id, x in precommits.items(): if ( - now_ - job.last_retry_time[batch_idx] >= ms_delay_between_batch_retries and - ( - job.batch_merkle_roots[batch_idx] is None or ( - len(job.sampled_nonces) > 0 and - not set(job.sampled_nonces_by_batch_idx.get(batch_idx, [])).issubset(set(job.merkle_proofs)) - ) - ) + benchmark_id in job_idxs or + benchmark_id in proofs + ): + continue + logger.info(f"creating job from confirmed precommit {benchmark_id}") + c_name = challenge_id_2_name[x.settings.challenge_id] + job = Job( + benchmark_id=benchmark_id, + settings=x.settings, + num_nonces=x.details.num_nonces, + rand_hash=x.state.rand_hash, + wasm_vm_config=block.config["wasm_vm"], + batch_size=self.config.batch_sizes[c_name], + challenge=c_name, + download_url=next((w.details.download_url for w in wasms.values() if w.algorithm_id == x.settings.algorithm_id), None) ) + job_idxs[benchmark_id] = len(self.jobs) + self.jobs.append(job) + + # update jobs from confirmed benchmarks + for benchmark_id, x in benchmarks.items(): + if benchmark_id in proofs: + continue + logger.info(f"updating job from confirmed benchmark {benchmark_id}") + job = self.jobs[job_idxs[benchmark_id]] + job.sampled_nonces = x.state.sampled_nonces + for batch_idx in job.sampled_nonces_by_batch_idx: + job.last_batch_retry_time[batch_idx] = 0 + + # prune jobs from confirmed proofs + prune_idxs = [ + job_idxs[benchmark_id] + for benchmark_id in proofs + if benchmark_id in job_idxs + ] + [ + job_idxs[benchmark_id] + for benchmark_id in job_idxs + if benchmark_id not in precommits ] - num_finished = sum(x is not None for x in job.batch_merkle_roots) - if num_finished != len(job.batch_merkle_roots): - c_name = self.challenge_id_2_name[job.settings.challenge_id] - logger.info(f"precommit {job.benchmark_id}: (challenge: {c_name}, progress: {num_finished} of {len(job.batch_merkle_roots)} batches, elapsed: {now_ - job.start_time}ms)") - if len(retry_batch_idxs) == 0: - return - for batch_idx in retry_batch_idxs: - job.last_retry_time[batch_idx] = now_ - await emit( - "new_batch", - benchmark_id=job.benchmark_id, - settings=job.settings.to_dict(), - num_nonces=min( - job.batch_size, - job.num_nonces - batch_idx * job.batch_size - ), - start_nonce=batch_idx * job.batch_size, - batch_size=job.batch_size, - rand_hash=job.rand_hash, - sampled_nonces=job.sampled_nonces_by_batch_idx.get(batch_idx, []), - wasm_vm_config=job.wasm_vm_config, - download_url=job.download_url, - ) \ No newline at end of file + for idx in sorted(set(prune_idxs), reverse=True): + job = self.jobs[idx] + logger.info(f"pruning job {job.benchmark_id}") + if os.path.exists(f"{self.config.backup_folder}/{job.benchmark_id}.json"): + os.remove(f"{self.config.backup_folder}/{job.benchmark_id}.json") + self.jobs.pop(idx) + + def run(self): + now = int(time.time() * 1000) + for job in self.jobs: + if job.merkle_root is not None: + continue + num_batches_ready = sum(x is not None for x in job.batch_merkle_roots) + logger.info(f"benchmark {job.benchmark_id}: (batches: {num_batches_ready} of {job.num_batches} ready, #solutions: {len(job.solution_nonces)})") + if num_batches_ready != job.num_batches: + continue + start_time = min(job.last_batch_retry_time) + logger.info(f"benchmark {job.benchmark_id}: ready, took {(now - start_time) / 1000} seconds") + tree = MerkleTree( + job.batch_merkle_roots, + 1 << (job.num_batches - 1).bit_length() + ) + job.merkle_root = tree.calc_merkle_root() + + for job in self.jobs: + if ( + len(job.sampled_nonces) == 0 or # benchmark not confirmed + len(job.merkle_proofs) == len(job.sampled_nonces) # already processed + ): + continue + logger.info(f"proof {job.benchmark_id}: (merkle_proof: {len(job.batch_merkle_proofs)} of {len(job.sampled_nonces)} ready)") + if len(job.batch_merkle_proofs) != len(job.sampled_nonces): # not finished + continue + logger.info(f"proof {job.benchmark_id}: ready") + depth_offset = (job.batch_size - 1).bit_length() + tree = MerkleTree( + job.batch_merkle_roots, + 1 << (job.num_batches - 1).bit_length() + ) + proofs = {} + sampled_nonces_by_batch_idx = job.sampled_nonces_by_batch_idx + for batch_idx in sampled_nonces_by_batch_idx: + upper_stems = [ + (d + depth_offset, h) + for d, h in tree.calc_merkle_branch(batch_idx).stems + ] + for nonce in set(sampled_nonces_by_batch_idx[batch_idx]): + proof = job.batch_merkle_proofs[nonce] + job.merkle_proofs[nonce] = MerkleProof( + leaf=proof.leaf, + branch=MerkleBranch(proof.branch.stems + upper_stems) + ) + + for job in self.jobs: + file_path = f"{self.config.backup_folder}/{job.benchmark_id}.json" + logger.debug(f"backing up job to {file_path}") + with open(file_path, "w") as f: + json.dump(job.to_dict(), f) \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/extensions/precommit_manager.py b/tig-benchmarker/tig_benchmarker/extensions/precommit_manager.py index c7d0aa7..185051d 100644 --- a/tig-benchmarker/tig_benchmarker/extensions/precommit_manager.py +++ b/tig-benchmarker/tig_benchmarker/extensions/precommit_manager.py @@ -3,7 +3,8 @@ import os import logging import random from dataclasses import dataclass -from tig_benchmarker.event_bus import * +from tig_benchmarker.extensions.job_manager import Job +from tig_benchmarker.extensions.submissions_manager import SubmitPrecommitRequest from tig_benchmarker.structs import * from tig_benchmarker.utils import FromDict from typing import Dict, List, Optional, Set @@ -15,146 +16,123 @@ class AlgorithmSelectionConfig(FromDict): algorithm: str base_fee_limit: PreciseNumber num_nonces: int - weight: Optional[float] = None + weight: float @dataclass class PrecommitManagerConfig(FromDict): - max_unresolved_precommits: int + max_pending_benchmarks: int algo_selection: Dict[str, AlgorithmSelectionConfig] -class Extension: - def __init__(self, player_id: str, backup_folder: str, precommit_manager: dict, **kwargs): +class PrecommitManager: + def __init__(self, config: PrecommitManagerConfig, player_id: str, jobs: List[Job]): + self.config = config self.player_id = player_id - self.config = PrecommitManagerConfig.from_dict(precommit_manager) + self.jobs = jobs self.last_block_id = None - self.num_precommits_this_block = 0 - self.num_unresolved_precommits = None - self.curr_base_fees = {} - self.difficulty_samples = {} + self.num_precommits_submitted = 0 self.algorithm_name_2_id = {} self.challenge_name_2_id = {} - self.percent_qualifiers_by_challenge = {} - self.num_solutions_by_challenge = {} - self.lock = True + self.curr_base_fees = {} - async def on_new_block( - self, - block: Block, - challenges: Dict[str, Challenge], - algorithms: Dict[str, Algorithm], - precommits: Dict[str, Precommit], + def on_new_block( + self, + block: Block, + precommits: Dict[str, Precommit], benchmarks: Dict[str, Benchmark], - proofs: Dict[str, Proof], + challenges: Dict[str, Challenge], + algorithms: Dict[str, Algorithm], player: Optional[Player], + difficulty_data: Dict[str, List[DifficultyData]], **kwargs ): - if self.last_block_id == block.id: - return - self.num_precommits_this_block = 0 - self.num_unresolved_precommits = sum(1 for benchmark_id in precommits if benchmark_id not in proofs) self.last_block_id = block.id - for c in challenges.values(): - c_name = c.details.name - assert c_name in self.config.algo_selection, f"missing algorithm selection for challenge {c_name}" - self.challenge_name_2_id[c_name] = c.id - a_name = self.config.algo_selection[c_name].algorithm - a = next( - ( - a for a in algorithms.values() - if a.details.challenge_id == c.id and a.details.name == a_name - ), - None - ) - assert a is not None, f"selected non-existent algorithm {a_name} for challenge {c_name}" - self.algorithm_name_2_id[f"{c_name}_{a_name}"] = a.id - if c.block_data is not None: - self.curr_base_fees[c_name] = c.block_data.base_fee - logger.info(f"current base_fees: {self.curr_base_fees}") - if ( - player is None or - player.block_data is None or - player.block_data.num_qualifiers_by_challenge is None - ): - self.percent_qualifiers_by_challenge = { - c.details.name: 0 - for c in challenges.values() - if c.block_data is not None - } - else: - self.percent_qualifiers_by_challenge = { - c.details.name: player.block_data.num_qualifiers_by_challenge.get(c.id, 0) / c.block_data.num_qualifiers - for c in challenges.values() - if c.block_data is not None - } - logger.info(f"percent_qualifiers_by_challenge: {self.percent_qualifiers_by_challenge}") - self.num_solutions_by_challenge = { - c.details.name: 0 + self.num_precommits_submitted = 0 + self.challenge_name_2_id = { + c.details.name: c.id + for c in challenges.values() + } + self.algorithm_name_2_id = { + f"{challenges[a.details.challenge_id].details.name}_{a.details.name}": a.id + for a in algorithms.values() + } + self.curr_base_fees = { + c.details.name: c.block_data.base_fee for c in challenges.values() if c.block_data is not None } - for benchmark_id, benchmark in benchmarks.items(): - precommit = precommits[benchmark_id] + benchmark_stats_by_challenge = { + c.details.name: { + "solutions": 0, + "nonces": 0, + "qualifiers": 0 + } + for c in challenges.values() + if c.block_data is not None + } + for benchmark in benchmarks.values(): + precommit = precommits[benchmark.id] c_name = challenges[precommit.settings.challenge_id].details.name - num_solutions = benchmark.details.num_solutions - self.num_solutions_by_challenge[c_name] += num_solutions - logger.info(f"num_solutions_by_challenge: {self.num_solutions_by_challenge}") - self.lock = False + benchmark_stats_by_challenge[c_name]["solutions"] += benchmark.details.num_solutions + benchmark_stats_by_challenge[c_name]["nonces"] += precommit.details.num_nonces - async def on_difficulty_samples(self, challenge_id: str, samples: list, **kwargs): - self.difficulty_samples[challenge_id] = samples + if player is not None: + logger.info(f"player earnings: (latest: {player.block_data.reward.to_float()}, round: {player.block_data.round_earnings.to_float()})") + logger.info(f"player stats: (cutoff: {player.block_data.cutoff}, imbalance: {player.block_data.imbalance.to_float() * 100}%)") + for c_id, num_qualifiers in player.block_data.num_qualifiers_by_challenge.items(): + c_name = challenges[c_id].details.name + benchmark_stats_by_challenge[c_name]["qualifiers"] = num_qualifiers + + for c_name, x in benchmark_stats_by_challenge.items(): + avg_nonces_per_solution = (x["nonces"] // x["solutions"]) if x["solutions"] > 0 else 0 + logger.info(f"benchmark stats for {c_name}: (#nonces: {x['nonces']}, #solutions: {x['solutions']}, #qualifiers: {x['qualifiers']}, avg_nonces_per_solution: {avg_nonces_per_solution})") - async def on_update(self): - if self.lock: + if player is not None and any(x['qualifiers'] == player.block_data.cutoff for x in benchmark_stats_by_challenge.values()): + c_name = min(benchmark_stats_by_challenge, key=lambda x: benchmark_stats_by_challenge[x]['solutions']) + logger.warning(f"recommend finding more solutions for challenge {c_name} to avoid being cut off") + + aggregate_difficulty_data = { + c_id: { + "nonces": sum( + x.num_nonces if x.difficulty in challenges[c_id].block_data.qualifier_difficulties else 0 + for x in difficulty_data + ), + "solutions": sum( + x.num_solutions if x.difficulty in challenges[c_id].block_data.qualifier_difficulties else 0 + for x in difficulty_data + ), + } + for c_id, difficulty_data in difficulty_data.items() + } + for c_id, x in aggregate_difficulty_data.items(): + avg_nonces_per_solution = (x["nonces"] // x["solutions"]) if x["solutions"] > 0 else 0 + logger.info(f"global qualifier difficulty stats for {challenges[c_id].details.name}: (#nonces: {x['nonces']}, #solutions: {x['solutions']}, avg_nonces_per_solution: {avg_nonces_per_solution})") + + def run(self, difficulty_samples: Dict[str, List[int]]) -> SubmitPrecommitRequest: + num_pending_benchmarks = sum(1 for job in self.jobs if job.merkle_root is None) + self.num_precommits_submitted + if num_pending_benchmarks >= self.config.max_pending_benchmarks: + logger.debug(f"number of pending benchmarks has reached max of {self.config.max_pending_benchmarks}") return - if self.num_precommits_this_block + self.num_unresolved_precommits >= self.config.max_unresolved_precommits: - logger.debug(f"reached max unresolved precommits: {self.config.max_unresolved_precommits}") - return - algo_selection = [ - (c_name, selection) - for c_name, selection in self.config.algo_selection.items() - if self.curr_base_fees[c_name] <= selection.base_fee_limit + selections = [ + (c_name, x) for c_name, x in self.config.algo_selection.items() + if self.curr_base_fees[c_name] <= x.base_fee_limit ] - if len(algo_selection) == 0: - logger.warning("no challenges within base fee limit") - return - if any(x[1].weight is not None for x in algo_selection): - logger.debug(f"using config weights to randomly select a challenge + algorithm: {self.config.algo_selection}") - selection = random.choices(algo_selection, weights=[x[1].weight or 1e-12 for x in algo_selection])[0] - elif (max_percent := max(v for v in self.percent_qualifiers_by_challenge.values())) > 0: - logger.debug(f"using percent qualifiers to randomly select a challenge + algorithm: {self.percent_qualifiers_by_challenge}") - selection = random.choices( - algo_selection, - weights=[max_percent - self.percent_qualifiers_by_challenge.get(x[0], 0) + 1e-12 for x in algo_selection] - )[0] - else: - logger.debug(f"using number of solutions to randomly select a challenge + algorithm: {self.num_solutions_by_challenge}") - max_solution = max(v for v in self.num_solutions_by_challenge.values()) - selection = random.choices( - algo_selection, - weights=[max_solution - self.num_solutions_by_challenge.get(x[0], 0) + 1e-12 for x in algo_selection] - )[0] - c_name = selection[0] - c_id = self.challenge_name_2_id[c_name] - a_name = selection[1].algorithm - a_id = self.algorithm_name_2_id[f"{c_name}_{a_name}"] - num_nonces = selection[1].num_nonces - - difficulty_samples = self.difficulty_samples.get(c_id, []) - if len(difficulty_samples) == 0: - return - difficulty = difficulty_samples.pop() - settings = BenchmarkSettings( - player_id=self.player_id, - block_id=self.last_block_id, - algorithm_id=a_id, - challenge_id=c_id, - difficulty=difficulty + if len(selections) == 0: + logger.warning("No challenges under base fee limit") + return None + logger.debug(f"Selecting challenge from: {[(c_name, x.weight) for c_name, x in selections]}") + selection = random.choices(selections, weights=[x.weight for _, x in selections])[0] + c_id = self.challenge_name_2_id[selection[0]] + a_id = self.algorithm_name_2_id[f"{selection[0]}_{selection[1].algorithm}"] + self.num_precommits_submitted += 1 + req = SubmitPrecommitRequest( + settings=BenchmarkSettings( + challenge_id=c_id, + algorithm_id=a_id, + player_id=self.player_id, + block_id=self.last_block_id, + difficulty=difficulty_samples[selection[0]] + ), + num_nonces=selection[1].num_nonces ) - logger.debug(f"created precommit: (settings: {settings}, num_nonces: {num_nonces})") - self.num_precommits_this_block += 1 - await emit( - "precommit_ready", - settings=settings, - num_nonces=num_nonces - ) - \ No newline at end of file + logger.info(f"Created precommit (challenge: {selection[0]}, algorithm: {selection[1].algorithm}, difficulty: {req.settings.difficulty}, num_nonces: {req.num_nonces})") + return req \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/extensions/slave_manager.py b/tig-benchmarker/tig_benchmarker/extensions/slave_manager.py index b0b4d7a..5b59ff2 100644 --- a/tig-benchmarker/tig_benchmarker/extensions/slave_manager.py +++ b/tig-benchmarker/tig_benchmarker/extensions/slave_manager.py @@ -3,115 +3,109 @@ import os import json import logging import re -from dataclasses import dataclass -from collections import deque -from enum import Enum +import signal from quart import Quart, request, jsonify from hypercorn.config import Config from hypercorn.asyncio import serve -from tig_benchmarker.event_bus import * +from tig_benchmarker.extensions.job_manager import Job from tig_benchmarker.structs import * from tig_benchmarker.utils import * from typing import Dict, List, Optional, Set logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) -class Status(Enum): - QUEUED = 0 - PROCESSING = 1 - FINISHED = 2 - PRIORITY_QUEUED = 3 - PRIORITY_PROCESSING = 4 - PRIORITY_FINISHED = 5 +@dataclass +class Batch(FromDict): + benchmark_id: str + start_nonce: int + num_nonces: int + settings: BenchmarkSettings + sampled_nonces: List[int] + wasm_vm_config: dict + download_url: str + rand_hash: str + batch_size: int + +@dataclass +class BatchResult(FromDict): + merkle_root: MerkleHash + solution_nonces: List[int] + merkle_proofs: List[MerkleProof] @dataclass class SlaveConfig(FromDict): name_regex: str - challenge_selection: Optional[List[str]] + max_concurrent_batches: Dict[str, int] @dataclass class SlaveManagerConfig(FromDict): + port: int + time_before_batch_retry: int slaves: List[SlaveConfig] -class Extension: - def __init__(self, port: int, exit_event: asyncio.Event, slave_manager: dict, **kwargs): - self.port = port - self.config = SlaveManagerConfig.from_dict(slave_manager) - self.exit_event = exit_event - self.batch_status = {} - self.batches = {} - self.priority_batches = {} - self.challenge_name_2_id = {} - self.last_update = 0 - self.lock = True - self._start_server() +class SlaveManager: + def __init__(self, config: SlaveManagerConfig, jobs: List[Job]): + self.config = config + self.jobs = jobs - def _start_server(self): + def start(self): app = Quart(__name__) - @app.route('/get-batch', methods=['GET']) - async def get_batch(): - if self.lock: - return "Slave manager is not ready", 503 + @app.route('/get-batches', methods=['GET']) + async def get_batches(): if (slave_name := request.headers.get('User-Agent', None)) is None: return "User-Agent header is required", 403 if not any(re.match(slave.name_regex, slave_name) for slave in self.config.slaves): - logger.warning(f"slave {slave_name} does not match any regex. rejecting get-batch request") + logger.warning(f"slave {slave_name} does not match any regex. rejecting get-batches request") return "Unregistered slave", 403 - if (slave := next((slave for slave in self.config.slaves if re.match(slave.name_regex, slave_name)), None)) is not None: - challenge_selection = slave.challenge_selection - else: - challenge_selection = None - - batch = None - while batch is None: - if challenge_selection is not None: - for c_name in challenge_selection: - c_id = self.challenge_name_2_id[c_name] - if len(self.priority_batches.get(c_id, [])): - batch, _ = self.priority_batches[c_id].popleft() - break - elif len(self.batches.get(c_id, [])): - batch, _ = self.batches[c_id].popleft() - break - else: - return "No batches available", 404 - else: - if (non_empty_queues := [q for q in self.priority_batches.values() if q]): - batch, _ = min(non_empty_queues, key=lambda q: q[0][1]).popleft() - elif (non_empty_queues := [q for q in self.batches.values() if q]): - batch, _ = min(non_empty_queues, key=lambda q: q[0][1]).popleft() - else: - return "No batches available", 404 + slave = next((slave for slave in self.config.slaves if re.match(slave.name_regex, slave_name)), None) - benchmark_id = batch['benchmark_id'] - start_nonce = batch['start_nonce'] - is_priority = len(batch['sampled_nonces']) > 0 - batch_status = self.batch_status.get(benchmark_id, {}).get(start_nonce, None) - if batch_status is None: - batch = None - elif is_priority and batch_status in { - Status.PRIORITY_FINISHED - }: - batch = None - elif not is_priority and batch_status in { - Status.FINISHED, - Status.PRIORITY_QUEUED, - Status.PRIORITY_PROCESSING, - Status.PRIORITY_FINISHED - }: - batch = None + now = int(time.time() * 1000) + batches = [] + selected_challenge = None + max_concurrent_batches = None + for job in self.jobs: + if ( + job.challenge not in slave.max_concurrent_batches or + (selected_challenge is not None and job.challenge != selected_challenge) + ): + continue + sampled_nonces_by_batch_idx = job.sampled_nonces_by_batch_idx + for batch_idx in range(job.num_batches): + if not ( + now - job.last_batch_retry_time[batch_idx] > self.config.time_before_batch_retry and + ( + job.batch_merkle_roots[batch_idx] is None or + not set(sampled_nonces_by_batch_idx.get(batch_idx, [])).issubset(job.merkle_proofs) + ) + ): + continue + selected_challenge = job.challenge + max_concurrent_batches = slave.max_concurrent_batches[job.challenge] + start_nonce = batch_idx * job.batch_size + batches.append(Batch( + benchmark_id=job.benchmark_id, + start_nonce=start_nonce, + num_nonces=min(job.batch_size, job.num_nonces - start_nonce), + settings=job.settings.to_dict(), + sampled_nonces=sampled_nonces_by_batch_idx.get(batch_idx, []), + wasm_vm_config=job.wasm_vm_config, + download_url=job.download_url, + rand_hash=job.rand_hash, + batch_size=job.batch_size + )) + if len(batches) >= max_concurrent_batches: + break + if len(batches) >= max_concurrent_batches: + break - batch_id = f"{benchmark_id}_{start_nonce}" - if is_priority: - self.batch_status[benchmark_id][start_nonce] = Status.PRIORITY_PROCESSING - logger.debug(f"{slave_name} got priority batch {batch_id}") + if len(batches) == 0: + logger.debug(f"{slave_name} get-batches: None available") + return "No batches available", 503 else: - self.batch_status[benchmark_id][start_nonce] = Status.PROCESSING - logger.debug(f"{slave_name} got batch {batch_id}") - - return jsonify(batch) + logger.debug(f"{slave_name} get-batches: Assigning {len(batches)} {selected_challenge} batches") + return jsonify([b.to_dict() for b in batches]) @app.route('/submit-batch-result/', methods=['POST']) async def submit_batch_result(batch_id): @@ -121,81 +115,27 @@ class Extension: logger.warning(f"slave {slave_name} does not match any regex. rejecting submit-batch-result request") benchmark_id, start_nonce = batch_id.split("_") start_nonce = int(start_nonce) - batch_status = self.batch_status.get(benchmark_id, {}).get(start_nonce, None) - data = await request.json - is_priority = len(data["merkle_proofs"]) > 0 - if batch_status is None: - return "Redundant result", 404 - elif is_priority and batch_status == Status.PRIORITY_FINISHED: - return "Redundant result", 404 - elif not is_priority and batch_status in { - Status.FINISHED, - Status.PRIORITY_QUEUED, - Status.PRIORITY_PROCESSING, - Status.PRIORITY_FINISHED - }: - return "Redundant result", 404 - - if is_priority: - self.batch_status[benchmark_id][start_nonce] = Status.PRIORITY_FINISHED - logger.debug(f"{slave_name} returned priority batch {batch_id}") - else: - self.batch_status[benchmark_id][start_nonce] = Status.FINISHED - logger.debug(f"{slave_name} returned batch {batch_id}") - await emit("batch_result", benchmark_id=benchmark_id, start_nonce=start_nonce, **data) + result = BatchResult.from_dict(await request.json) + job = next((job for job in self.jobs if job.benchmark_id == benchmark_id), None) + logger.debug(f"{slave_name} submit-batch-result: (benchmark_id: {benchmark_id}, start_nonce: {start_nonce}, #solutions: {len(result.solution_nonces)}, #proofs: {len(result.merkle_proofs)})") + if job is None: + logger.warning(f"{slave_name} submit-batch-result: no job found with benchmark_id {benchmark_id}") + return "Invalid benchmark_id", 400 + batch_idx = start_nonce // job.batch_size + job.batch_merkle_roots[batch_idx] = result.merkle_root + job.solution_nonces = list(set(job.solution_nonces + result.solution_nonces)) + job.batch_merkle_proofs.update({ + x.nonce: x + for x in result.merkle_proofs + }) return "OK" config = Config() - config.bind = [f"0.0.0.0:{self.port}"] + config.bind = [f"0.0.0.0:{self.config.port}"] - self._server_task = asyncio.create_task(serve(app, config, shutdown_trigger=self.exit_event.wait)) - logger.info(f"webserver started on {config.bind[0]}") - - async def on_new_batch(self, **batch): - benchmark_id = batch['benchmark_id'] - start_nonce = batch['start_nonce'] - challenge_id = batch['settings']['challenge_id'] - is_priority = len(batch['sampled_nonces']) > 0 - batch_status = self.batch_status.get(benchmark_id, {}).get(start_nonce, None) - if is_priority and batch_status in { - Status.PRIORITY_QUEUED, - Status.PRIORITY_FINISHED - }: - return - elif not is_priority and batch_status in { - Status.QUEUED, - Status.FINISHED, - Status.PRIORITY_QUEUED, - Status.PRIORITY_PROCESSING, - Status.PRIORITY_FINISHED - }: - return - - now_ = now() - if is_priority: - self.batch_status.setdefault(benchmark_id, {})[start_nonce] = Status.PRIORITY_QUEUED - self.priority_batches.setdefault(challenge_id, deque()).append((batch, now_)) - else: - self.batch_status.setdefault(benchmark_id, {})[start_nonce] = Status.QUEUED - self.batches.setdefault(challenge_id, deque()).append((batch, now_)) - - async def on_update(self): - now_ = now() - if now_ - self.last_update < 10000: - return - self.last_update = now_ - logger.info(f"#batches in queue (normal: {sum(len(x) for x in self.batches.values())}, priority: {sum(len(x) for x in self.priority_batches.values())})") - - async def on_new_block(self, precommits: Dict[str, Precommit], challenges: Dict[str, Challenge], **kwargs): - for benchmark_id in list(self.batch_status): - if benchmark_id in precommits: - continue - self.batch_status.pop(benchmark_id) - - self.challenge_name_2_id = {c.details.name: c.id for c in challenges.values()} - challenge_names = set(self.challenge_name_2_id) - for slave in self.config.slaves: - if slave.challenge_selection is None: - continue - assert set(slave.challenge_selection).issubset(challenge_names), f"challenge_selection for slave regex '{slave.name_regex}' is not a subset of {challenge_names}" - self.lock = False \ No newline at end of file + exit_event = asyncio.Event() + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGINT, exit_event.set) + loop.add_signal_handler(signal.SIGTERM, exit_event.set) + asyncio.create_task(serve(app, config, shutdown_trigger=exit_event.wait)) + logger.info(f"webserver started on {config.bind[0]}") \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/extensions/submissions_manager.py b/tig-benchmarker/tig_benchmarker/extensions/submissions_manager.py index f13c591..97c138a 100644 --- a/tig-benchmarker/tig_benchmarker/extensions/submissions_manager.py +++ b/tig-benchmarker/tig_benchmarker/extensions/submissions_manager.py @@ -1,8 +1,9 @@ import aiohttp +import asyncio import logging import json import os -from tig_benchmarker.event_bus import * +from tig_benchmarker.extensions.job_manager import Job from tig_benchmarker.structs import * from tig_benchmarker.utils import * from typing import Union @@ -27,202 +28,70 @@ class SubmitProofRequest(FromDict): @dataclass class SubmissionsManagerConfig(FromDict): - clear_precommits_submission_on_new_block: bool = True - max_retries: Optional[int] = None - ms_delay_between_retries: int = 60000 + time_between_retries: int -@dataclass -class PendingSubmission: - last_retry_time: int - retries: int - request: Union[SubmitPrecommitRequest, SubmitBenchmarkRequest, SubmitProofRequest] - -class Extension: - def __init__(self, api_url: str, api_key: str, backup_folder: str, **kwargs): +class SubmissionsManager: + def __init__(self, config: SubmissionsManagerConfig, api_url: str, api_key: str, jobs: List[Job]): + self.config = config + self.jobs = jobs self.api_url = api_url self.api_key = api_key - self.backup_folder = backup_folder - if (submissions_manager := kwargs.get("submissions_manager", None)): - self.config = SubmissionsManagerConfig.from_dict(submissions_manager) - else: - self.config = SubmissionsManagerConfig() - self.pending_submissions = { - "precommit": [], - "benchmark": [], - "proof": [] - } - self.last_submit = 0 - self._restore_pending_submissions() - - def _restore_pending_submissions(self): - pending_benchmarks = [] - pending_proofs = [] - for submission_type in ["benchmark", "proof"]: - path = os.path.join(self.backup_folder, submission_type) - if not os.path.exists(path): - logger.info(f"creating backup folder {path}") - os.makedirs(path, exist_ok=True) - for file in os.listdir(path): - if not file.endswith(".json"): - continue - file_path = os.path.join(path, file) - logger.info(f"restoring {submission_type} from {file_path}") - with open(file_path) as f: - d = json.load(f) - submission = PendingSubmission( - last_retry_time=0, - retries=0, - request=SubmitBenchmarkRequest.from_dict(d) if submission_type == "benchmark" else SubmitProofRequest.from_dict(d) - ) - self.pending_submissions[submission_type].append(submission) - - async def on_precommit_ready(self, settings: BenchmarkSettings, num_nonces: int, **kwargs): - self.pending_submissions["precommit"].append( - PendingSubmission( - last_retry_time=0, - retries=0, - request=SubmitPrecommitRequest(settings=settings, num_nonces=num_nonces) - ) - ) - - async def on_benchmark_ready(self, benchmark_id: str, merkle_root: MerkleHash, solution_nonces: Set[int], **kwargs): - if any( - x.request.benchmark_id == benchmark_id - for x in self.pending_submissions["benchmark"] - ): - logger.warning(f"benchmark {benchmark_id} already pending submission") - else: - request = SubmitBenchmarkRequest( - benchmark_id=benchmark_id, - merkle_root=merkle_root, - solution_nonces=solution_nonces - ) - self.pending_submissions["benchmark"].append( - PendingSubmission( - last_retry_time=0, - retries=0, - request=request - ) - ) - with open(os.path.join(self.backup_folder, "benchmark", f"{benchmark_id}.json"), "w") as f: - json.dump(request.to_dict(), f) - - async def on_proof_ready(self, benchmark_id: str, merkle_proofs: List[MerkleProof], **kwargs): - if any( - x.request.benchmark_id == benchmark_id - for x in self.pending_submissions["proof"] - ): - logger.warning(f"proof {benchmark_id} already in pending submissions") - else: - request = SubmitProofRequest( - benchmark_id=benchmark_id, - merkle_proofs=merkle_proofs - ) - self.pending_submissions["proof"].append( - PendingSubmission( - last_retry_time=0, - retries=0, - request=request - ) - ) - with open(os.path.join(self.backup_folder, "proof", f"{benchmark_id}.json"), "w") as f: - json.dump(request.to_dict(), f) - - async def on_new_block( - self, - block: Block, - precommits: Dict[str, Precommit], - **kwargs - ): - if self.config.clear_precommits_submission_on_new_block: - logger.debug(f"clearing {len(self.pending_submissions['precommit'])} pending precommits") - self.pending_submissions["precommit"].clear() - - for submission_type in ["benchmark", "proof"]: - filtered_submissions = [] - for submission in self.pending_submissions[submission_type]: - if ( - submission.request.benchmark_id not in precommits or # is expired - submission.request.benchmark_id in kwargs[submission_type + "s"] # is confirmed - ): - self._prune_pending_submission(submission_type, submission.request.benchmark_id) - else: - filtered_submissions.append(submission) - self.pending_submissions[submission_type] = filtered_submissions - - def _prune_pending_submission(self, submission_type: str, benchmark_id: str): - logger.debug(f"removing {submission_type} {benchmark_id} from pending submissions") - path = os.path.join(self.backup_folder, submission_type, f"{benchmark_id}.json") - if os.path.exists(path): - os.remove(path) - - async def on_update(self): - if ( - len(self.pending_submissions['precommit']) == 0 and - len(self.pending_submissions['benchmark']) == 0 and - len(self.pending_submissions['proof']) == 0 - ): - return - now_ = now() - if now_ - self.last_submit < 5500: - return - self.last_submit = now_ - logger.debug(f"pending submissions: (#precommits: {len(self.pending_submissions['precommit'])}, #benchmarks: {len(self.pending_submissions['benchmark'])}, #proofs: {len(self.pending_submissions['proof'])})") - - now_ = now() - for submission_type in ["precommit", "benchmark", "proof"]: - if len(self.pending_submissions[submission_type]) > 0: - self.pending_submissions[submission_type] = sorted(self.pending_submissions[submission_type], key=lambda x: x.last_retry_time) - if now_ - self.pending_submissions[submission_type][0].last_retry_time < self.config.ms_delay_between_retries: - logger.debug(f"no {submission_type} ready for submission") - else: - s = self.pending_submissions[submission_type].pop(0) - asyncio.create_task(self.submit(s)) - if submission_type == "precommit": - logger.info(f"submitting precommit") - else: - logger.info(f"submitting {submission_type} '{s.request.benchmark_id}'") - - async def submit( - self, - submission: PendingSubmission, - ): + async def _post(self, submission_type: str, req: Union[SubmitPrecommitRequest, SubmitBenchmarkRequest, SubmitProofRequest]): headers = { "X-Api-Key": self.api_key, "Content-Type": "application/json", "User-Agent": "tig-benchmarker-py/v0.2" } - - if isinstance(submission.request, SubmitPrecommitRequest): - submission_type = "precommit" - elif isinstance(submission.request, SubmitBenchmarkRequest): - submission_type = "benchmark" - elif isinstance(submission.request, SubmitProofRequest): - submission_type = "proof" + if submission_type == "precommit": + logger.info(f"submitting {submission_type}") else: - raise ValueError(f"Invalid request type: {type(submission.request)}") - - d = submission.request.to_dict() - logger.debug(f"submitting {submission_type}: {d}") + logger.info(f"submitting {submission_type} '{req.benchmark_id}'") + logger.debug(f"{req}") async with aiohttp.ClientSession() as session: - async with session.post(f"{self.api_url}/submit-{submission_type}", json=d, headers=headers) as resp: + async with session.post(f"{self.api_url}/submit-{submission_type}", json=req.to_dict(), headers=headers) as resp: text = await resp.text() if resp.status == 200: logger.info(f"submitted {submission_type} successfully") - if submission_type != "precommit": - self._prune_pending_submission(submission_type, submission.request.benchmark_id) - await emit(f"submit_{submission_type}_success", text=text, **d) + elif resp.headers.get("Content-Type") == "text/plain": + logger.error(f"status {resp.status} when submitting {submission_type}: {text}") else: - if resp.headers.get("Content-Type") == "text/plain": - logger.error(f"status {resp.status} when submitting {submission_type}: {text}") - else: - logger.error(f"status {resp.status} when submitting {submission_type}") - if 500 <= resp.status <= 599 and ( - self.config.max_retries is None or - submission.retries + 1 < self.config.max_retries - ): - submission.retries += 1 - submission.last_retry_time = now() - self.pending_submissions[submission_type].append(submission) - await emit(f"submit_{submission_type}_error", text=text, status=resp.status, request=submission.request) \ No newline at end of file + logger.error(f"status {resp.status} when submitting {submission_type}") + + def run(self, submit_precommit_req: Optional[SubmitPrecommitRequest]): + now = int(time.time() * 1000) + if submit_precommit_req is None: + logger.debug("no precommit to submit") + else: + asyncio.create_task(self._post("precommit", submit_precommit_req)) + + for job in self.jobs: + if ( + job.merkle_root is not None and + len(job.sampled_nonces) == 0 and + now - job.last_benchmark_submit_time > self.config.time_between_retries + ): + job.last_benchmark_submit_time = now + asyncio.create_task(self._post("benchmark", SubmitBenchmarkRequest( + benchmark_id=job.benchmark_id, + merkle_root=job.merkle_root.to_str(), + solution_nonces=job.solution_nonces + ))) + break + else: + logger.debug("no benchmark to submit") + + for job in self.jobs: + if ( + len(job.sampled_nonces) > 0 and + len(job.merkle_proofs) == len(job.sampled_nonces) and + now - job.last_proof_submit_time > self.config.time_between_retries + ): + job.last_proof_submit_time = now + asyncio.create_task(self._post("proof", SubmitProofRequest( + benchmark_id=job.benchmark_id, + merkle_proofs=list(job.merkle_proofs.values()) + ))) + break + else: + logger.debug("no proof to submit") \ No newline at end of file diff --git a/tig-benchmarker/tig_benchmarker/structs.py b/tig-benchmarker/tig_benchmarker/structs.py index 38cab8d..9b1230d 100644 --- a/tig-benchmarker/tig_benchmarker/structs.py +++ b/tig-benchmarker/tig_benchmarker/structs.py @@ -275,13 +275,7 @@ class TopUp(FromDict): state: TopUpState @dataclass -class QueryData(FromDict): - block: Block - algorithms: Dict[str, Algorithm] - wasms: Dict[str, Wasm] - player: Optional[Player] - precommits: Dict[str, Precommit] - benchmarks: Dict[str, Benchmark] - proofs: Dict[str, Proof] - frauds: Dict[str, Fraud] - challenges: Dict[str, Challenge] \ No newline at end of file +class DifficultyData(FromDict): + num_solutions: int + num_nonces: int + difficulty: Point \ No newline at end of file