diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index d4161ff..6e43400 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -98,11 +98,7 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm batch_id = READY_BATCH_IDS.pop() except KeyError: logger.debug("No pending batches") - time.sleep(1) return - - if FINISHED_BATCH_IDS[batch_id] is not None: - return; output_folder = f"{output_path}/{batch_id}" with open(f"{output_folder}/batch.json") as f: @@ -134,7 +130,6 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm else: logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) else: with open(f"{output_folder}/data.zlib", "rb") as f: @@ -175,15 +170,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm else: logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: logger.debug("No pending batches") - time.sleep(1) return if ( @@ -204,10 +196,7 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") download_wasm(session, batch['download_url'], wasm_path) - Thread( - target=run_tig_worker, - args=(tig_worker_path, batch, wasm_path, num_workers, output_path) - ).start() + run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path) def poll_batches(session, master_ip, master_port, output_path): @@ -228,12 +217,9 @@ def poll_batches(session, master_ip, master_port, output_path): json.dump(batch, f) PENDING_BATCH_IDS.clear() PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) - time.sleep(5) else: logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") - time.sleep(5) - def wrap_thread(func, *args): logger.info(f"Starting thread for {func.__name__}") @@ -266,11 +252,6 @@ def main( "User-Agent": slave_name }) - Thread( - target=wrap_thread, - args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) - ).start() - Thread( target=wrap_thread, args=(purge_folders, output_path, ttl) @@ -279,7 +260,12 @@ def main( while True: try: poll_batches(session, master_ip, master_port, output_path) - process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + batches_size = len(PENDING_BATCH_IDS) + for _ in range(batches_size): + process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + if batches_size == 0: + time.sleep(5) except Exception as e: logger.error(f"Error in main loop: {e}") time.sleep(5)