From 4d52507075c64b5e7d0dcc2970a0e7cfdbcd4c20 Mon Sep 17 00:00:00 2001 From: FiveMovesAhead Date: Thu, 1 May 2025 15:42:08 +0100 Subject: [PATCH] Update tig-benchmarker and add runtime docker. --- Dockerfile.runtime | 23 ++++ tig-benchmarker/common/structs.py | 2 +- .../master/master/difficulty_sampler.py | 37 +++--- tig-benchmarker/master/master/job_manager.py | 12 +- .../master/master/precommit_manager.py | 30 ++--- .../master/master/slave_manager.py | 13 ++- tig-benchmarker/postgres/init.sql | 107 ++++++++---------- tig-benchmarker/slave.py | 84 +++++++++----- 8 files changed, 174 insertions(+), 134 deletions(-) create mode 100644 Dockerfile.runtime diff --git a/Dockerfile.runtime b/Dockerfile.runtime new file mode 100644 index 0000000..661a130 --- /dev/null +++ b/Dockerfile.runtime @@ -0,0 +1,23 @@ +ARG BASE_IMAGE=ubuntu:24.04 +ARG DEV_IMAGE=ghcr.io/tig-foundation/tig-monorepo/dev + +FROM ${DEV_IMAGE} AS dev + +FROM ${BASE_IMAGE} + +ENV DEBIAN_FRONTEND=noninteractive + +COPY --from=dev /usr/local/bin/tig-runtime /usr/local/bin/tig-runtime +COPY --from=dev /usr/local/bin/tig-worker /usr/local/bin/tig-worker +COPY --from=dev /usr/local/lib/rust /usr/local/lib/rust + +RUN chmod +x /usr/local/bin/tig-runtime && \ + chmod +x /usr/local/bin/tig-worker && \ + echo "export LD_LIBRARY_PATH=\"${LD_LIBRARY_PATH}:/usr/local/lib/rust\"" >> /etc/bash.bashrc + +RUN apt update && apt install -y python3 python3-pip +RUN pip3 install blake3 requests randomname --break-system-packages + +COPY tig-benchmarker /app + +WORKDIR /app diff --git a/tig-benchmarker/common/structs.py b/tig-benchmarker/common/structs.py index d6e6c99..9119ce7 100644 --- a/tig-benchmarker/common/structs.py +++ b/tig-benchmarker/common/structs.py @@ -12,7 +12,6 @@ class AlgorithmDetails(FromDict): player_id: str challenge_id: str breakthrough_id: Optional[str] - type: str fee_paid: PreciseNumber @dataclass @@ -158,6 +157,7 @@ class BlockDetails(FromDict): timestamp: int num_confirmed: Dict[str, int] num_active: Dict[str, int] + emissions: Dict[str, PreciseNumber] @dataclass class BlockData(FromDict): diff --git a/tig-benchmarker/master/master/difficulty_sampler.py b/tig-benchmarker/master/master/difficulty_sampler.py index a79d789..8e5cdd1 100644 --- a/tig-benchmarker/master/master/difficulty_sampler.py +++ b/tig-benchmarker/master/master/difficulty_sampler.py @@ -117,8 +117,7 @@ class DifficultySampler: self.challenges = {} def on_new_block(self, challenges: Dict[str, Challenge], **kwargs): - config = CONFIG["difficulty_sampler_config"] - for c in challenges.values(): + for c_id, c in challenges.items(): if c.block_data is None: continue logger.debug(f"Calculating valid difficulties and frontiers for challenge {c.details.name}") @@ -126,22 +125,23 @@ class DifficultySampler: upper_frontier, lower_frontier = c.block_data.scaled_frontier, c.block_data.base_frontier else: upper_frontier, lower_frontier = c.block_data.base_frontier, c.block_data.scaled_frontier - self.valid_difficulties[c.details.name] = calc_valid_difficulties(list(upper_frontier), list(lower_frontier)) - if config["difficulty_ranges"] is None: - self.frontiers[c.details.name] = [] - else: - self.frontiers[c.details.name] = calc_all_frontiers(self.valid_difficulties[c.details.name]) + self.valid_difficulties[c_id] = calc_valid_difficulties(list(upper_frontier), list(lower_frontier)) + self.frontiers[c_id] = None - self.challenges = [c.details.name for c in challenges.values()] + self.challenge_id_2_name = { + c_id: c.details.name for c_id, c in challenges.items() + } def run(self) -> Dict[str, Point]: samples = {} - config = CONFIG["difficulty_sampler_config"] - for c_name in self.challenges: + for config in CONFIG["algo_selection"]: found_valid = False + a_id = config["algorithm_id"] + c_id = a_id[:4] + c_name = self.challenge_id_2_name[c_id] - if len(selected_difficulties := config["selected_difficulties"].get(c_name, [])) > 0: + if len(selected_difficulties := config["selected_difficulties"]) > 0: valid_difficulties = set(tuple(d) for d in self.valid_difficulties[c_name]) selected_difficulties = [tuple(d) for d in selected_difficulties] selected_difficulties = [ @@ -157,17 +157,20 @@ class DifficultySampler: logger.debug(f"No valid difficulties found for {c_name} - skipping selected difficulties") if not found_valid: - if len(self.frontiers[c_name]) == 0 or config["difficulty_ranges"] is None: - valid_difficulties = self.valid_difficulties[c_name] + if config["difficulty_range"] is None: + valid_difficulties = self.valid_difficulties[c_id] difficulty = random.choice(valid_difficulties) else: - frontiers = self.frontiers[c_name] - difficulty_range = config["difficulty_ranges"][c_name] + if self.frontiers[c_id] is None: + logger.debug(f"Calculating frontiers for {c_name}") + self.frontiers[c_id] = calc_all_frontiers(self.valid_difficulties[c_id]) + frontiers = self.frontiers[c_id] + difficulty_range = config["difficulty_range"] idx1 = math.floor(difficulty_range[0] * (len(frontiers) - 1)) idx2 = math.ceil(difficulty_range[1] * (len(frontiers) - 1)) difficulties = [p for frontier in frontiers[idx1:idx2 + 1] for p in frontier] difficulty = random.choice(difficulties) - samples[c_name] = difficulty - logger.debug(f"Sampled difficulty {difficulty} for challenge {c_name}") + samples[a_id] = difficulty + logger.debug(f"Sampled difficulty {difficulty} for algorithm {a_id} in challenge {c_name}") return samples \ No newline at end of file diff --git a/tig-benchmarker/master/master/job_manager.py b/tig-benchmarker/master/master/job_manager.py index dafab1b..7c69016 100644 --- a/tig-benchmarker/master/master/job_manager.py +++ b/tig-benchmarker/master/master/job_manager.py @@ -29,6 +29,7 @@ class JobManager: ): api_url = CONFIG["api_url"] config = CONFIG["job_manager_config"] + algo_selection = CONFIG["algo_selection"] # create jobs from confirmed precommits challenge_id_2_name = { c.id: c.details.name @@ -75,7 +76,14 @@ class JobManager: if wasm.details.download_url is None: logger.error(f"no download_url found for wasm {wasm.algorithm_id}") continue - num_batches = math.ceil(x.details.num_nonces / config["batch_sizes"][c_name]) + batch_size = next( + (s["batch_size"] for s in algo_selection if s["algorithm_id"] == x.settings.algorithm_id), + None + ) + if batch_size is None: + batch_size = config["default_batch_size"][x.settings.challenge_id] + logger.error(f"No batch size found for algorithm_id {x.settings.algorithm_id}, using default {batch_size}") + num_batches = math.ceil(x.details.num_nonces / batch_size) atomic_inserts = [ ( """ @@ -105,7 +113,7 @@ class JobManager: num_batches, x.details.rand_hash, json.dumps(block.config["benchmarks"]["runtime_configs"]["wasm"]), - config["batch_sizes"][c_name], + batch_size, c_name, a_name, wasm.details.download_url, diff --git a/tig-benchmarker/master/master/precommit_manager.py b/tig-benchmarker/master/master/precommit_manager.py index bbba4f5..0830f7d 100644 --- a/tig-benchmarker/master/master/precommit_manager.py +++ b/tig-benchmarker/master/master/precommit_manager.py @@ -43,14 +43,6 @@ class PrecommitManager: ): self.last_block_id = block.id self.num_precommits_submitted = 0 - self.challenge_name_2_id = { - c.details.name: c.id - for c in challenges.values() - } - self.algorithm_name_2_id = { - f"{challenges[a.details.challenge_id].details.name}_{a.details.name}": a.id - for a in algorithms.values() - } self.curr_base_fees = { c.details.name: c.block_data.base_fee for c in challenges.values() @@ -103,22 +95,16 @@ class PrecommitManager: )["count"] config = CONFIG["precommit_manager_config"] + algo_selection = CONFIG["algo_selection"] num_pending_benchmarks = num_pending_jobs + self.num_precommits_submitted if num_pending_benchmarks >= config["max_pending_benchmarks"]: logger.debug(f"number of pending benchmarks has reached max of {config['max_pending_benchmarks']}") return - selections = [ - (c_name, x) for c_name, x in config["algo_selection"].items() - #if self.curr_base_fees[c_name] <= x.base_fee_limit - ] - if len(selections) == 0: - logger.warning("No challenges under base fee limit") - return None - logger.debug(f"Selecting challenge from: {[(c_name, x['weight']) for c_name, x in selections]}") - selection = random.choices(selections, weights=[x["weight"] for _, x in selections])[0] - c_id = self.challenge_name_2_id[selection[0]] - a_id = self.algorithm_name_2_id[f"{selection[0]}_{selection[1]['algorithm']}"] + logger.debug(f"Selecting algorithm from: {[(x['algorithm_id'], x['weight']) for x in algo_selection]}") + selection = random.choices(algo_selection, weights=[x["weight"] for x in algo_selection])[0] + a_id = selection["algorithm_id"] + c_id = a_id[:4] self.num_precommits_submitted += 1 req = SubmitPrecommitRequest( settings=BenchmarkSettings( @@ -126,9 +112,9 @@ class PrecommitManager: algorithm_id=a_id, player_id=CONFIG["player_id"], block_id=self.last_block_id, - difficulty=difficulty_samples[selection[0]] + difficulty=difficulty_samples[a_id] ), - num_nonces=selection[1]["num_nonces"] + num_nonces=selection["num_nonces"] ) - logger.info(f"Created precommit (challenge: {selection[0]}, algorithm: {selection[1]['algorithm']}, difficulty: {req.settings.difficulty}, num_nonces: {req.num_nonces})") + logger.info(f"Created precommit (algorithm_id: {a_id}, difficulty: {req.settings.difficulty}, num_nonces: {req.num_nonces})") return req \ No newline at end of file diff --git a/tig-benchmarker/master/master/slave_manager.py b/tig-benchmarker/master/master/slave_manager.py index 16b5bf5..d04cfb5 100644 --- a/tig-benchmarker/master/master/slave_manager.py +++ b/tig-benchmarker/master/master/slave_manager.py @@ -34,7 +34,6 @@ class SlaveManager: A.slave, A.start_time, A.end_time, - B.challenge, JSONB_BUILD_OBJECT( 'id', A.benchmark_id || '_' || A.batch_idx, 'benchmark_id', A.benchmark_id, @@ -47,7 +46,9 @@ class SlaveManager: 'rand_hash', B.rand_hash, 'batch_size', B.batch_size, 'batch_idx', A.batch_idx, - 'hash_threshold', B.hash_threshold + 'hash_threshold', B.hash_threshold, + 'challenge', B.challenge, + 'algorithm', B.algorithm ) AS batch FROM proofs_batch A INNER JOIN job B @@ -65,7 +66,6 @@ class SlaveManager: A.slave, A.start_time, A.end_time, - B.challenge, JSONB_BUILD_OBJECT( 'id', A.benchmark_id || '_' || A.batch_idx, 'benchmark_id', A.benchmark_id, @@ -78,7 +78,9 @@ class SlaveManager: 'rand_hash', B.rand_hash, 'batch_size', B.batch_size, 'batch_idx', A.batch_idx, - 'hash_threshold', B.hash_threshold + 'hash_threshold', B.hash_threshold, + 'challenge', B.challenge, + 'algorithm', B.algorithm ) AS batch FROM root_batch A INNER JOIN job B @@ -109,7 +111,6 @@ class SlaveManager: concurrent = [] updates = [] now = time.time() * 1000 - selected_challenges = set(slave["selected_challenges"]) with self.lock: concurrent = [ b["batch"] for b in self.batches @@ -121,7 +122,7 @@ class SlaveManager: break if ( b["slave"] == slave_name or - b["challenge"] not in selected_challenges or + not re.match(slave["algorithm_id_regex"], b["batch"]["settings"]["algorithm_id"]) or b["end_time"] is not None ): continue diff --git a/tig-benchmarker/postgres/init.sql b/tig-benchmarker/postgres/init.sql index 7e90483..4d1bc33 100644 --- a/tig-benchmarker/postgres/init.sql +++ b/tig-benchmarker/postgres/init.sql @@ -106,72 +106,65 @@ SELECT ' "player_id": "0x0000000000000000000000000000000000000000", "api_key": "00000000000000000000000000000000", "api_url": "https://mainnet-api.tig.foundation", - "difficulty_sampler_config": { - "difficulty_ranges": { - "satisfiability": [0, 0.5], - "vehicle_routing": [0, 0.5], - "knapsack": [0, 0.5], - "vector_search": [0, 0.5] - }, - "selected_difficulties": { - "satisfiability": [], - "vehicle_routing": [], - "knapsack": [], - "vector_search": [] - } - }, - "job_manager_config": { - "batch_sizes": { - "satisfiability": 8, - "vehicle_routing": 8, - "knapsack": 8, - "vector_search": 8 - } - }, "submissions_manager_config": { "time_between_retries": 60000 }, - "precommit_manager_config": { - "max_pending_benchmarks": 4, - "algo_selection": { - "satisfiability": { - "algorithm": "schnoing", - "num_nonces": 40, - "weight": 1, - "base_fee_limit": "10000000000000000" - }, - "vehicle_routing": { - "algorithm": "clarke_wright", - "num_nonces": 40, - "weight": 1, - "base_fee_limit": "10000000000000000" - }, - "knapsack": { - "algorithm": "dynamic", - "num_nonces": 40, - "weight": 1, - "base_fee_limit": "10000000000000000" - }, - "vector_search": { - "algorithm": "optimal_ann", - "num_nonces": 40, - "weight": 1, - "base_fee_limit": "10000000000000000" - } - } + "job_manager_config": { + "default_batch_sizes": { + "c001": 8, + "c002": 8, + "c003": 8, + "c004": 8 + }, }, + "precommit_manager_config": { + "max_pending_benchmarks": 4 + }, + "algo_selection": [ + { + "algorithm_id": "c001_a001", + "num_nonces": 40, + "difficulty_range": [0, 0.5], + "selected_difficulties": [], + "weight": 1, + "batch_size": 8, + "base_fee_limit": "10000000000000000" + }, + { + "algorithm_id": "c002_a001", + "num_nonces": 40, + "difficulty_range": [0, 0.5], + "selected_difficulties": [], + "weight": 1, + "batch_size": 8, + "base_fee_limit": "10000000000000000" + }, + { + "algorithm_id": "c003_a001", + "num_nonces": 40, + "difficulty_range": [0, 0.5], + "selected_difficulties": [], + "weight": 1, + "batch_size": 8, + "base_fee_limit": "10000000000000000" + }, + { + "algorithm_id": "c004_a001", + "num_nonces": 40, + "difficulty_range": [0, 0.5], + "selected_difficulties": [], + "weight": 1, + "batch_size": 8, + "base_fee_limit": "10000000000000000" + } + ], "slave_manager_config": { "time_before_batch_retry": 60000, "slaves": [ { "name_regex": ".*", - "max_concurrent_batches": 1, - "selected_challenges": [ - "satisfiability", - "vehicle_routing", - "knapsack", - "vector_search" - ] + "algorithm_id_regex": ".*", + "max_concurrent_batches": 1 } ] } diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 549d36f..acf074d 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -1,4 +1,7 @@ +#!/usr/bin/env python3 + import argparse +import io import json import os import logging @@ -6,8 +9,10 @@ import randomname import requests import shutil import subprocess +import tarfile import time import zlib +from glob import glob from threading import Thread from common.structs import OutputData, MerkleProof from common.merkle_tree import MerkleTree, MerkleHash @@ -21,34 +26,42 @@ FINISHED_BATCH_IDS = {} def now(): return int(time.time() * 1000) -def download_wasm(download_url, wasm_path): - if not os.path.exists(wasm_path): +def download_library(downloads_folder, batch): + challenge_folder = f"{downloads_folder}/{batch['challenge']}" + so_path = (glob(f"{challenge_folder}/*/{batch['algorithm']}.so") or [None])[0] + + if so_path is None: start = now() - logger.info(f"downloading WASM from {download_url}") - resp = requests.get(download_url) + logger.info(f"downloading {batch['algorithm']}.tar.gz from {batch['download_url']}") + resp = requests.get(batch['download_url'], stream=True) if resp.status_code != 200: - raise Exception(f"status {resp.status_code} when downloading WASM: {resp.text}") - with open(wasm_path, 'wb') as f: - f.write(resp.content) - logger.debug(f"downloading WASM: took {now() - start}ms") - logger.debug(f"WASM Path: {wasm_path}") + raise Exception(f"status {resp.status_code} when downloading algorithm library: {resp.text}") + with tarfile.open(fileobj=io.BytesIO(resp.content), mode="r:gz") as tar: + tar.extractall(path=challenge_folder) + logger.debug(f"downloading {batch['algorithm']}.tar.gz took {now() - start}ms") + so_path = (glob(f"{challenge_folder}/*/{batch['algorithm']}.so") or [None])[0] + + ptx_path = (glob(f"{challenge_folder}/*/{batch['algorithm']}.ptx") or [None])[0] + return so_path, ptx_path -def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path): +def run_tig_worker(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path): start = now() cmd = [ - tig_worker_path, "compute_batch", - json.dumps(batch["settings"]), - batch["rand_hash"], + tig_worker_path, + tig_runtime_path, + json.dumps(batch["settings"]), + batch["rand_hash"], str(batch["start_nonce"]), str(batch["num_nonces"]), - str(batch["batch_size"]), - wasm_path, - "--mem", str(batch["runtime_config"]["max_memory"]), + str(batch["batch_size"]), + so_path, "--fuel", str(batch["runtime_config"]["max_fuel"]), "--workers", str(num_workers), "--output", f"{output_path}/{batch['id']}", ] + if ptx_path is not None: + cmd += ["--ptx", ptx_path] logger.info(f"computing batch: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE @@ -97,7 +110,7 @@ def purge_folders(output_path, ttl): FINISHED_BATCH_IDS.pop(batch_id) -def send_results(headers, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path): +def send_results(headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path): try: batch_id = READY_BATCH_IDS.pop() except KeyError: @@ -185,7 +198,7 @@ def send_results(headers, master_ip, master_port, tig_worker_path, download_wasm time.sleep(2) -def process_batch(tig_worker_path, download_wasms_folder, num_workers, output_path): +def process_batch(tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: @@ -208,12 +221,11 @@ def process_batch(tig_worker_path, download_wasms_folder, num_workers, output_pa with open(f"{output_path}/{batch_id}/batch.json") as f: batch = json.load(f) - wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") - download_wasm(batch['download_url'], wasm_path) + so_path, ptx_path = download_library(downloads_folder, batch) Thread( target=run_tig_worker, - args=(tig_worker_path, batch, wasm_path, num_workers, output_path) + args=(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path) ).start() @@ -258,18 +270,31 @@ def wrap_thread(func, *args): def main( master_ip: str, tig_worker_path: str, - download_wasms_folder: str, + tig_runtime_path: str, + downloads_folder: str, num_workers: int, slave_name: str, master_port: int, output_path: str, ttl: int, ): - print(f"Starting slave {slave_name}") + print(f"Starting slave with config:") + print(f" Slave Name: {slave_name}") + print(f" Master IP: {master_ip}") + print(f" Master Port: {master_port}") + print(f" Worker Path: {tig_worker_path}") + print(f" Runtime Path: {tig_runtime_path}") + print(f" Downloads Folder: {downloads_folder}") + print(f" Number of Workers: {num_workers}") + print(f" Output Path: {output_path}") + print(f" TTL: {ttl}") + print(f" Verbose: {args.verbose}") if not os.path.exists(tig_worker_path): raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") - os.makedirs(download_wasms_folder, exist_ok=True) + if not os.path.exists(tig_runtime_path): + raise FileNotFoundError(f"tig-runtime not found at path: {tig_runtime_path}") + os.makedirs(downloads_folder, exist_ok=True) headers = { "User-Agent": slave_name @@ -277,12 +302,12 @@ def main( Thread( target=wrap_thread, - args=(process_batch, tig_worker_path, download_wasms_folder, num_workers, output_path) + args=(process_batch, tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path) ).start() Thread( target=wrap_thread, - args=(send_results, headers, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + args=(send_results, headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path) ).start() Thread( @@ -295,9 +320,10 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") - parser.add_argument("tig_worker_path", help="Path to tig-worker executable") + parser.add_argument("--tig_worker_path", type=str, default=shutil.which("tig-worker"), help="Path to tig-worker executable") + parser.add_argument("--tig_runtime_path", type=str, default=shutil.which("tig-runtime"), help="Path to tig-runtime executable") parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") - parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)") + parser.add_argument("--download", type=str, default="libs", help="Folder to download algorithm libraries to (default: libs)") parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") @@ -312,4 +338,4 @@ if __name__ == "__main__": level=logging.DEBUG if args.verbose else logging.INFO ) - main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl) + main(args.master, args.tig_worker_path, args.tig_runtime_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)