From 9c1d871c8a96b6b3c3056a4a31fd221b73bbe748 Mon Sep 17 00:00:00 2001 From: FiveMovesAhead Date: Tue, 20 May 2025 09:21:12 +0100 Subject: [PATCH] Update slave to execute tig-runtime directly. --- tig-benchmarker/config.json | 11 +++ tig-benchmarker/slave.py | 173 ++++++++++++++++++++++++++---------- 2 files changed, 139 insertions(+), 45 deletions(-) create mode 100644 tig-benchmarker/config.json diff --git a/tig-benchmarker/config.json b/tig-benchmarker/config.json new file mode 100644 index 00000000..bc66cf1e --- /dev/null +++ b/tig-benchmarker/config.json @@ -0,0 +1,11 @@ +{ + "cpus": 8, + "gpus": 0, + "algorithms": [ + { + "id_regex": ".*", + "cpu_cost": 1.0, + "gpu_cost": 0.0 + } + ] +} \ No newline at end of file diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 8d5066e4..b784621d 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -7,6 +7,7 @@ import os import platform import logging import randomname +import re import requests import shutil import subprocess @@ -23,6 +24,7 @@ PENDING_BATCH_IDS = set() PROCESSING_BATCH_IDS = set() READY_BATCH_IDS = set() FINISHED_BATCH_IDS = {} +RUNTIME_THREADS = [] if (CPU_ARCH := platform.machine().lower()) in ["x86_64", "amd64"]: CPU_ARCH = "amd64" elif CPU_ARCH in ["arm64", "aarch64"]: @@ -56,43 +58,48 @@ def download_library(downloads_folder, batch): return so_path, ptx_path -def run_tig_worker(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path): +def run_tig_runtime(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path): start = now() + output_file = f"{output_path}/{batch['id']}/{nonce}.json" cmd = [ - tig_worker_path, tig_runtime_path, json.dumps(batch["settings"]), batch["rand_hash"], - str(batch["start_nonce"]), - str(batch["num_nonces"]), - str(batch["batch_size"]), + str(nonce), so_path, "--fuel", str(batch["runtime_config"]["max_fuel"]), - "--workers", str(num_workers), - "--output", f"{output_path}/{batch['id']}", + "--output", output_file, ] if ptx_path is not None: cmd += ["--ptx", ptx_path] - logger.info(f"computing batch: {' '.join(cmd)}") + logger.debug(f"computing batch: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) while True: ret = process.poll() if ret is not None: - if ret != 0: - PROCESSING_BATCH_IDS.remove(batch["id"]) - raise Exception(f"tig-worker failed with return code {ret}") + # exit codes: + # 0 - success + # 1 - runtime error + # 85 - no solution + # 86 - invalid solution + # 87 - out of fuel + if (ret == 1 or ret == 87) and not os.path.exists(output_file): + d = OutputData( + nonce=nonce, + runtime_signature=0, + fuel_consumed=(ret == 87) and (batch["runtime_config"]["max_fuel"] + 1), + solution={}, + cpu_arch=CPU_ARCH, + ) + with open(output_file, "wb") as f: + json.dump(d.to_dict(), f) + + assert os.path.exists(output_file), f"Exit code {ret}. Output file does not exist" + if ret == 1: + logger.error(f"batch {batch['id']}, nonce {nonce}, runtime error: {process.stderr.read().decode()}") - stdout, stderr = process.communicate() - result = json.loads(stdout.decode()) - logger.info(f"computing batch {batch['id']} took {now() - start}ms") - logger.debug(f"batch {batch['id']} result: {result}") - with open(f"{output_path}/{batch['id']}/result.json", "w") as f: - json.dump(result, f) - - PROCESSING_BATCH_IDS.remove(batch["id"]) - READY_BATCH_IDS.add(batch["id"]) break elif batch["id"] not in PROCESSING_BATCH_IDS: @@ -101,6 +108,39 @@ def run_tig_worker(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, break time.sleep(0.1) + + if not all( + os.path.exists(f"{output_path}/{batch['id']}/{n}.json") + for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]) + ): + return + + compute_time = now() - start + hashes = [] + solution_nonces = [] + for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]): + with open(f"{output_path}/{batch['id']}/{n}.json", "r") as f: + d = OutputData.from_dict(json.load(f)) + if len(d.solution) > 0: + solution_nonces.append(n) + hashes.append(d.to_merkle_hash()) + + merkle_tree = MerkleTree(hashes, batch["batch_size"]) + with open(f"{output_path}/{batch['id']}/hashes.zlib", "wb") as f: + hashes = [h.to_str() for h in hashes] + f.write(zlib.compress(json.dumps(hashes).encode())) + with open(f"{output_path}/{batch['id']}/result.json", "w") as f: + result = { + "solution_nonces": list(solution_nonces), + "merkle_root": merkle_tree.calc_merkle_root().to_str(), + } + logger.debug(f"batch {batch['id']} result: {result}") + json.dump(result, f) + merkle_root_time = now() - start - compute_time + logger.info(f"batch {batch['id']}, compute_time: {compute_time}ms, merkle_root_time: {merkle_root_time}ms") + + PROCESSING_BATCH_IDS.remove(batch["id"]) + READY_BATCH_IDS.add(batch["id"]) def purge_folders(output_path, ttl): @@ -121,7 +161,7 @@ def purge_folders(output_path, ttl): FINISHED_BATCH_IDS.pop(batch_id) -def send_results(headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path): +def send_results(headers, master_ip, master_port, output_path): try: batch_id = READY_BATCH_IDS.pop() except KeyError: @@ -139,7 +179,10 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol if ( not os.path.exists(f"{output_folder}/result.json") - or not os.path.exists(f"{output_folder}/data.zlib") + or not all( + os.path.exists(f"{output_folder}/{n}.json") + for n in range(batch["start_nonce"], batch["start_nonce"] + batch["num_nonces"]) + ) or not os.path.exists(f"{output_folder}/hashes.zlib") ): if os.path.exists(f"{output_folder}/result.json"): @@ -178,8 +221,10 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol else: with open(f"{output_folder}/hashes.zlib", "rb") as f: hashes = json.loads(zlib.decompress(f.read()).decode()) - with open(f"{output_folder}/data.zlib", "rb") as f: - leafs = json.loads(zlib.decompress(f.read()).decode()) + leafs = [] + for n in batch["sampled_nonces"]: + with open(f"{output_folder}/{n}.json", "r") as f: + leafs.append(json.load(f)) merkle_tree = MerkleTree( [MerkleHash.from_str(x) for x in hashes], @@ -188,10 +233,10 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol proofs_to_submit = [ dict( - leaf=leafs[n - batch["start_nonce"]], + leaf=leaf, branch=merkle_tree.calc_merkle_branch(branch_idx=n - batch["start_nonce"]).to_str() ) - for n in batch["sampled_nonces"] + for n, leaf in zip(batch["sampled_nonces"], leafs) ] submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}" @@ -209,7 +254,7 @@ def send_results(headers, master_ip, master_port, tig_worker_path, downloads_fol time.sleep(2) -def process_batch(tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path): +def process_batch(tig_runtime_path, downloads_folder, config, output_path): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: @@ -233,11 +278,41 @@ def process_batch(tig_worker_path, tig_runtime_path, downloads_folder, num_worke batch = json.load(f) so_path, ptx_path = download_library(downloads_folder, batch) - - Thread( - target=run_tig_worker, - args=(tig_worker_path, tig_runtime_path, batch, so_path, ptx_path, num_workers, output_path) - ).start() + c = next( + ( + x for x in config["algorithms"] + if re.match(x["id_regex"], batch["settings"]["algorithm_id"]) + ), + None + ) + if c is None: + logger.error(f"Algorithm {batch['settings']['algorithm_id']} does not match any regex in the config") + return + + nonce = batch["start_nonce"] + while nonce < batch["start_nonce"] + batch["num_nonces"]: + if batch["id"] not in PROCESSING_BATCH_IDS: + logger.info(f"batch {batch['id']} stopped") + break + + RUNTIME_THREADS[:] = [x for x in RUNTIME_THREADS if not x[2].is_alive()] + total_cpu_cost = sum(x[0] for x in RUNTIME_THREADS) + total_gpu_cost = sum(x[1] for x in RUNTIME_THREADS) + + if ( + total_cpu_cost + c["cpu_cost"] <= config["cpus"] and + total_gpu_cost + c["gpu_cost"] <= config["gpus"] + ): + logger.debug(f"Starting batch {batch['id']}, nonce {nonce}, cpu_cost {c['cpu_cost']}, gpu_cost {c['gpu_cost']}") + t = Thread( + target=run_tig_runtime, + args=(nonce, tig_runtime_path, batch, so_path, ptx_path, output_path) + ) + t.start() + RUNTIME_THREADS.append((c["cpu_cost"], c["gpu_cost"], t)) + nonce += 1 + + time.sleep(0.1) def poll_batches(headers, master_ip, master_port, output_path): @@ -280,31 +355,40 @@ def wrap_thread(func, *args): def main( master_ip: str, - tig_worker_path: str, tig_runtime_path: str, downloads_folder: str, - num_workers: int, + config_path: str, slave_name: str, master_port: int, output_path: str, ttl: int, ): + if not os.path.exists(tig_runtime_path): + logger.error(f"tig-runtime not found at path: {tig_runtime_path}") + sys.exit(1) + if not os.path.exists(config_path): + logger.error(f"Config file not found at path: {config_path}") + sys.exit(1) + try: + with open(config_path) as f: + config = json.load(f) + except Exception as e: + logger.error(f"Error loading config file: {e}") + sys.exit(1) + print(f"Starting slave with config:") print(f" Slave Name: {slave_name}") print(f" Master IP: {master_ip}") print(f" Master Port: {master_port}") - print(f" Worker Path: {tig_worker_path}") + print(f" CPU Architecture: {CPU_ARCH}") + print(f" GPU Available: {HAS_GPU}") print(f" Runtime Path: {tig_runtime_path}") print(f" Downloads Folder: {downloads_folder}") - print(f" Number of Workers: {num_workers}") + print(f" Config: {json.dumps(config, indent=2)}") print(f" Output Path: {output_path}") print(f" TTL: {ttl}") print(f" Verbose: {args.verbose}") - if not os.path.exists(tig_worker_path): - raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") - if not os.path.exists(tig_runtime_path): - raise FileNotFoundError(f"tig-runtime not found at path: {tig_runtime_path}") os.makedirs(downloads_folder, exist_ok=True) headers = { @@ -313,12 +397,12 @@ def main( Thread( target=wrap_thread, - args=(process_batch, tig_worker_path, tig_runtime_path, downloads_folder, num_workers, output_path) + args=(process_batch, tig_runtime_path, downloads_folder, config, output_path) ).start() Thread( target=wrap_thread, - args=(send_results, headers, master_ip, master_port, tig_worker_path, downloads_folder, num_workers, output_path) + args=(send_results, headers, master_ip, master_port, output_path) ).start() Thread( @@ -331,11 +415,10 @@ def main( if __name__ == "__main__": parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") - parser.add_argument("--tig_worker_path", type=str, default=shutil.which("tig-worker"), help="Path to tig-worker executable") + parser.add_argument("config", type=str, help="Path to config file") parser.add_argument("--tig_runtime_path", type=str, default=shutil.which("tig-runtime"), help="Path to tig-runtime executable") parser.add_argument("--master", type=str, default="0.0.0.0", help="IP address of the master (default: 0.0.0.0)") parser.add_argument("--download", type=str, default="libs", help="Folder to download algorithm libraries to (default: libs)") - parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") parser.add_argument("--verbose", action='store_true', help="Print debug logs") @@ -349,4 +432,4 @@ if __name__ == "__main__": level=logging.DEBUG if args.verbose else logging.INFO ) - main(args.master, args.tig_worker_path, args.tig_runtime_path, args.download, args.workers, args.name, args.port, args.output, args.ttl) + main(args.master, args.tig_runtime_path, args.download, args.config, args.name, args.port, args.output, args.ttl)