diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 615251e..82c706e 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -11,6 +11,7 @@ import zlib from threading import Thread from common.structs import OutputData, MerkleProof from common.merkle_tree import MerkleTree +from datetime import datetime, timedelta logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) PENDING_BATCH_IDS = set() @@ -68,6 +69,13 @@ def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path): def purge_folders(output_path, ttl): + cutoff_time = datetime.now() - timedelta(hours=2) + for folder in os.listdir(output_path): + folder_path = os.path.join(output_path, folder) + if os.path.isdir(folder_path) and datetime.fromtimestamp(os.path.getmtime(folder_path)) < cutoff_time: + logger.info(f"removing old folder: {folder_path}") + shutil.rmtree(folder_path) + n = now() purge_batch_ids = [ batch_id @@ -90,13 +98,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm batch_id = READY_BATCH_IDS.pop() except KeyError: logger.debug("No pending batches") - time.sleep(1) return output_folder = f"{output_path}/{batch_id}" with open(f"{output_folder}/batch.json") as f: batch = json.load(f) - + if ( not os.path.exists(f"{output_folder}/result.json") or not os.path.exists(f"{output_folder}/data.zlib") @@ -123,7 +130,6 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm else: logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) else: with open(f"{output_folder}/data.zlib", "rb") as f: @@ -157,15 +163,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm else: logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: logger.debug("No pending batches") - time.sleep(1) return if ( @@ -186,10 +189,7 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") download_wasm(session, batch['download_url'], wasm_path) - Thread( - target=run_tig_worker, - args=(tig_worker_path, batch, wasm_path, num_workers, output_path) - ).start() + run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path) def poll_batches(session, master_ip, master_port, output_path): @@ -210,12 +210,9 @@ def poll_batches(session, master_ip, master_port, output_path): json.dump(batch, f) PENDING_BATCH_IDS.clear() PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) - time.sleep(5) else: logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") - time.sleep(5) - def wrap_thread(func, *args): logger.info(f"Starting thread for {func.__name__}") @@ -242,17 +239,13 @@ def main( if not os.path.exists(tig_worker_path): raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") os.makedirs(download_wasms_folder, exist_ok=True) + os.makedirs(output_path, exist_ok=True) session = requests.Session() session.headers.update({ "User-Agent": slave_name }) - Thread( - target=wrap_thread, - args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) - ).start() - Thread( target=wrap_thread, args=(purge_folders, output_path, ttl) @@ -261,7 +254,12 @@ def main( while True: try: poll_batches(session, master_ip, master_port, output_path) - process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + batches_size = len(PENDING_BATCH_IDS) + for _ in range(batches_size): + process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + if batches_size == 0: + time.sleep(5) except Exception as e: logger.error(f"Error in main loop: {e}") time.sleep(5)