From 64af3b29107d69029efb3c3a328470d68f4734e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Thu, 12 Dec 2024 14:51:56 +0100 Subject: [PATCH] Add slave.py script --- scripts/slave.py | 156 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 scripts/slave.py diff --git a/scripts/slave.py b/scripts/slave.py new file mode 100644 index 0000000..6ca7f31 --- /dev/null +++ b/scripts/slave.py @@ -0,0 +1,156 @@ +import argparse +import json +import os +import logging +import randomname +import aiohttp +import asyncio +import subprocess +import time + +logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) + +def now(): + return int(time.time() * 1000) + +async def download_wasm(session, download_url, wasm_path): + if not os.path.exists(wasm_path): + start = now() + logger.info(f"downloading WASM from {download_url}") + async with session.get(download_url) as resp: + if resp.status != 200: + raise Exception(f"status {resp.status} when downloading WASM: {await resp.text()}") + with open(wasm_path, 'wb') as f: + f.write(await resp.read()) + logger.debug(f"downloading WASM: took {now() - start}ms") + logger.debug(f"WASM Path: {wasm_path}") + +async def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers): + start = now() + cmd = [ + tig_worker_path, "compute_batch", + json.dumps(batch["settings"]), + batch["rand_hash"], + str(batch["start_nonce"]), + str(batch["num_nonces"]), + str(batch["batch_size"]), + wasm_path, + "--mem", str(batch["runtime_config"]["max_memory"]), + "--fuel", str(batch["runtime_config"]["max_fuel"]), + "--workers", str(num_workers), + ] + if batch["sampled_nonces"]: + cmd += ["--sampled", *map(str, batch["sampled_nonces"])] + logger.info(f"computing batch: {' '.join(cmd)}") + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + if process.returncode != 0: + raise Exception(f"tig-worker failed: {stderr.decode()}") + result = json.loads(stdout.decode()) + logger.info(f"computing batch took {now() - start}ms") + logger.debug(f"batch result: {result}") + return result + +async def process_batch(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, batch, headers): + try: + batch_id = f"{batch['benchmark_id']}_{batch['start_nonce']}" + logger.info(f"Processing batch {batch_id}: {batch}") + + # Step 2: Download WASM + wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") + await download_wasm(session, batch['download_url'], wasm_path) + + # Step 3: Run tig-worker + result = await run_tig_worker(tig_worker_path, batch, wasm_path, num_workers) + + # Step 4: Submit results + start = now() + submit_url = f"http://{master_ip}:{master_port}/submit-batch-result/{batch_id}" + logger.info(f"posting results to {submit_url}") + async with session.post(submit_url, json=result, headers=headers) as resp: + if resp.status != 200: + raise Exception(f"status {resp.status} when posting results to master: {await resp.text()}") + logger.debug(f"posting results took {now() - start} ms") + + except Exception as e: + logger.error(f"Error processing batch {batch_id}: {e}") + +async def main( + master_ip: str, + tig_worker_path: str, + download_wasms_folder: str, + num_workers: int, + slave_name: str, + master_port: int +): + if not os.path.exists(tig_worker_path): + raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") + os.makedirs(download_wasms_folder, exist_ok=True) + + headers = { + "User-Agent": slave_name + } + + + async with aiohttp.ClientSession() as session: + while True: + try: + # Step 1: Query for job test maj + start = now() + get_batch_url = f"http://{master_ip}:{master_port}/get-batches" + logger.info(f"fetching job from {get_batch_url}") + try: + resp = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=5) + if resp.status != 200: + text = await resp.text() + if resp.status == 404 and text.strip() == "No batches available": + # Retry with master_port - 1 + new_port = master_port - 1 + get_batch_url = f"http://{master_ip}:{new_port}/get-batches" + logger.info(f"No batches available on port {master_port}, trying port {new_port}") + resp_retry = await asyncio.wait_for(session.get(get_batch_url, headers=headers), timeout=10) + if resp_retry.status != 200: + raise Exception(f"status {resp_retry.status} when fetching job: {await resp_retry.text()}") + master_port_w = new_port + batches = await resp_retry.json(content_type=None) + else: + raise Exception(f"status {resp.status} when fetching job: {text}") + else: + master_port_w = master_port + batches = await resp.json(content_type=None) + except asyncio.TimeoutError: + logger.error(f"Timeout occurred when fetching job from {get_batch_url}") + continue + logger.debug(f"fetching job: took {now() - start}ms") + + # Process batches concurrently + tasks = [ + process_batch(session, master_ip, master_port_w, tig_worker_path, download_wasms_folder, num_workers, batch, headers) + for batch in batches + ] + await asyncio.gather(*tasks) + + except Exception as e: + logger.error(e) + await asyncio.sleep(2) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="TIG Slave Benchmarker") + parser.add_argument("master_ip", help="IP address of the master") + parser.add_argument("tig_worker_path", help="Path to tig-worker executable") + parser.add_argument("--download", type=str, default="wasms", help="Folder to download WASMs to (default: wasms)") + parser.add_argument("--workers", type=int, default=8, help="Number of workers (default: 8)") + parser.add_argument("--name", type=str, default=randomname.get_name(), help="Name for the slave (default: randomly generated)") + parser.add_argument("--port", type=int, default=5115, help="Port for master (default: 5115)") + parser.add_argument("--verbose", action='store_true', help="Print debug logs") + + args = parser.parse_args() + + logging.basicConfig( + format='%(levelname)s - [%(name)s] - %(message)s', + level=logging.DEBUG if args.verbose else logging.INFO + ) + + asyncio.run(main(args.master_ip, args.tig_worker_path, args.download, args.workers, args.name, args.port)) \ No newline at end of file