From 3f2914d37bca426e1f52bd1a5bad83d585d83e7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 12:02:09 +0100 Subject: [PATCH 1/7] Send root content into proof submission --- tig-benchmarker/slave.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 615251e..dd53e30 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -131,6 +131,8 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm OutputData.from_dict(x) for x in json.loads(zlib.decompress(f.read()).decode()) ] + with open(f"{output_folder}/result.json") as f: + result = json.load(f) merkle_tree = MerkleTree( [x.to_merkle_hash() for x in leafs], @@ -145,9 +147,14 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm for n in batch["sampled_nonces"] ] + payload = { + "merkle_proofs": proofs_to_submit, + **result + } + submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}" logger.info(f"posting proofs to {submit_url}") - resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) + resp = session.post(submit_url, json=payload) if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() logger.info(f"successfully posted proofs for batch {batch_id}") From d9cd783f96d91fa3b4345e5e9343cc142a5715a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 13:22:39 +0100 Subject: [PATCH 2/7] Add dir purge --- tig-benchmarker/slave.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index dd53e30..9d73bc8 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -11,6 +11,7 @@ import zlib from threading import Thread from common.structs import OutputData, MerkleProof from common.merkle_tree import MerkleTree +from datetime import datetime, timedelta logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0]) PENDING_BATCH_IDS = set() @@ -68,6 +69,13 @@ def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path): def purge_folders(output_path, ttl): + cutoff_time = datetime.now() - timedelta(hours=2) + for folder in os.listdir(output_path): + folder_path = os.path.join(output_path, folder) + if os.path.isdir(folder_path) and datetime.fromtimestamp(os.path.getmtime(folder_path)) < cutoff_time: + logger.info(f"removing old folder: {folder_path}") + shutil.rmtree(folder_path) + n = now() purge_batch_ids = [ batch_id From 292ccc4f06299126892ffb0ed36a129ab6b15f77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 13:28:42 +0100 Subject: [PATCH 3/7] Don't loop on finished jobs --- tig-benchmarker/slave.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 9d73bc8..d4161ff 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -100,11 +100,14 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm logger.debug("No pending batches") time.sleep(1) return + + if FINISHED_BATCH_IDS[batch_id] is not None: + return; output_folder = f"{output_path}/{batch_id}" with open(f"{output_folder}/batch.json") as f: batch = json.load(f) - + if ( not os.path.exists(f"{output_folder}/result.json") or not os.path.exists(f"{output_folder}/data.zlib") From 8a680ddfc356bf6dcc558db4df38966448141dd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 14:19:50 +0100 Subject: [PATCH 4/7] Update --- tig-benchmarker/slave.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index d4161ff..6e43400 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -98,11 +98,7 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm batch_id = READY_BATCH_IDS.pop() except KeyError: logger.debug("No pending batches") - time.sleep(1) return - - if FINISHED_BATCH_IDS[batch_id] is not None: - return; output_folder = f"{output_path}/{batch_id}" with open(f"{output_folder}/batch.json") as f: @@ -134,7 +130,6 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm else: logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) else: with open(f"{output_folder}/data.zlib", "rb") as f: @@ -175,15 +170,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm else: logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") READY_BATCH_IDS.add(batch_id) # requeue - time.sleep(2) - def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): try: batch_id = PENDING_BATCH_IDS.pop() except KeyError: logger.debug("No pending batches") - time.sleep(1) return if ( @@ -204,10 +196,7 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm") download_wasm(session, batch['download_url'], wasm_path) - Thread( - target=run_tig_worker, - args=(tig_worker_path, batch, wasm_path, num_workers, output_path) - ).start() + run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path) def poll_batches(session, master_ip, master_port, output_path): @@ -228,12 +217,9 @@ def poll_batches(session, master_ip, master_port, output_path): json.dump(batch, f) PENDING_BATCH_IDS.clear() PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids) - time.sleep(5) else: logger.error(f"status {resp.status_code} when fetching batch: {resp.text}") - time.sleep(5) - def wrap_thread(func, *args): logger.info(f"Starting thread for {func.__name__}") @@ -266,11 +252,6 @@ def main( "User-Agent": slave_name }) - Thread( - target=wrap_thread, - args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) - ).start() - Thread( target=wrap_thread, args=(purge_folders, output_path, ttl) @@ -279,7 +260,12 @@ def main( while True: try: poll_batches(session, master_ip, master_port, output_path) - process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + batches_size = len(PENDING_BATCH_IDS) + for _ in range(batches_size): + process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path) + send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + if batches_size == 0: + time.sleep(5) except Exception as e: logger.error(f"Error in main loop: {e}") time.sleep(5) From 9d596f738ce9cf60eb225e80d361751225abce07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 15:18:57 +0100 Subject: [PATCH 5/7] create result dir --- tig-benchmarker/slave.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 6e43400..2a04787 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -246,6 +246,7 @@ def main( if not os.path.exists(tig_worker_path): raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") os.makedirs(download_wasms_folder, exist_ok=True) + os.makedirs(output_path, exist_ok=True) session = requests.Session() session.headers.update({ From 1f427214f0f71aa43ab47919cd0c97b6c03f55cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 15:24:48 +0100 Subject: [PATCH 6/7] purge folder is submit is ok --- tig-benchmarker/slave.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index 2a04787..b1e88c1 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -123,9 +123,11 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm resp = session.post(submit_url, json=result) if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() + purge_folders(output_path, 0) logger.info(f"successfully posted root for batch {batch_id}") elif resp.status_code == 408: # took too long FINISHED_BATCH_IDS[batch_id] = now() + purge_folders(output_path, 0) logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") else: logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") @@ -163,9 +165,11 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm resp = session.post(submit_url, json=payload) if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() + purge_folders(output_path, 0) logger.info(f"successfully posted proofs for batch {batch_id}") elif resp.status_code == 408: # took too long FINISHED_BATCH_IDS[batch_id] = now() + purge_folders(output_path, 0) logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") else: logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") From e445a9475ed87637410636235ddedcf35805c966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Patron?= Date: Fri, 20 Dec 2024 15:38:35 +0100 Subject: [PATCH 7/7] revert --- tig-benchmarker/slave.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/tig-benchmarker/slave.py b/tig-benchmarker/slave.py index b1e88c1..82c706e 100644 --- a/tig-benchmarker/slave.py +++ b/tig-benchmarker/slave.py @@ -123,11 +123,9 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm resp = session.post(submit_url, json=result) if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() - purge_folders(output_path, 0) logger.info(f"successfully posted root for batch {batch_id}") elif resp.status_code == 408: # took too long FINISHED_BATCH_IDS[batch_id] = now() - purge_folders(output_path, 0) logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") else: logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") @@ -139,8 +137,6 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm OutputData.from_dict(x) for x in json.loads(zlib.decompress(f.read()).decode()) ] - with open(f"{output_folder}/result.json") as f: - result = json.load(f) merkle_tree = MerkleTree( [x.to_merkle_hash() for x in leafs], @@ -155,21 +151,14 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm for n in batch["sampled_nonces"] ] - payload = { - "merkle_proofs": proofs_to_submit, - **result - } - submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}" logger.info(f"posting proofs to {submit_url}") - resp = session.post(submit_url, json=payload) + resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() - purge_folders(output_path, 0) logger.info(f"successfully posted proofs for batch {batch_id}") elif resp.status_code == 408: # took too long FINISHED_BATCH_IDS[batch_id] = now() - purge_folders(output_path, 0) logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") else: logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")