diff --git a/bin/new_slave/slave.py b/bin/new_slave/slave.py index 4d8a51e..0067e22 100644 --- a/bin/new_slave/slave.py +++ b/bin/new_slave/slave.py @@ -85,7 +85,9 @@ def purge_folders(output_path, ttl): FINISHED_BATCH_IDS.pop(batch_id) -def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path): +def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode): + global LAST_SUCCESSFUL_PORT + try: batch_id = READY_BATCH_IDS.pop() except KeyError: @@ -93,6 +95,8 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm time.sleep(1) return + port_port = LAST_SUCCESSFUL_PORT or master_port + output_folder = f"{output_path}/{batch_id}" with open(f"{output_folder}/batch.json") as f: batch = json.load(f) @@ -111,18 +115,20 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm with open(f"{output_folder}/result.json") as f: result = json.load(f) - submit_url = f"http://{master_ip}:{master_port}/submit-batch-root/{batch_id}" + submit_url = f"http://{master_ip}:{port_port}/submit-batch-root/{batch_id}" logger.info(f"posting root to {submit_url}") resp = session.post(submit_url, json=result) + + if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() logger.info(f"successfully posted root for batch {batch_id}") - elif resp.status_code == 408: # took too long + elif resp.status_code == 408: # took too long FINISHED_BATCH_IDS[batch_id] = now() logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") else: logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}") - READY_BATCH_IDS.add(batch_id) # requeue + READY_BATCH_IDS.add(batch_id) # requeue time.sleep(2) else: @@ -145,21 +151,23 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm for n in batch["sampled_nonces"] ] - submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}" + submit_url = f"http://{master_ip}:{port_port}/submit-batch-proofs/{batch_id}" logger.info(f"posting proofs to {submit_url}") resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit}) + if resp.status_code == 200: FINISHED_BATCH_IDS[batch_id] = now() logger.info(f"successfully posted proofs for batch {batch_id}") - elif resp.status_code == 408: # took too long + elif resp.status_code == 408: # took too long FINISHED_BATCH_IDS[batch_id] = now() logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") else: logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}") - READY_BATCH_IDS.add(batch_id) # requeue + READY_BATCH_IDS.add(batch_id) # requeue time.sleep(2) + def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path): try: batch_id = PENDING_BATCH_IDS.pop() @@ -192,12 +200,26 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, ).start() -def poll_batches(session, master_ip, master_port, output_path): +def poll_batches(session, master_ip, master_port, output_path, mode): + + global LAST_SUCCESSFUL_PORT + get_batches_url = f"http://{master_ip}:{master_port}/get-batches" logger.info(f"fetching batches from {get_batches_url}") resp = session.get(get_batches_url) + if mode == "mainnet" and resp.status_code == 666: + logger.warning(f"Status 666 received. Retrying with master_port {master_port - 1}") + resp = session.get(f"http://{master_ip}:{master_port - 1}/get-batches") + if resp.status_code == 666: + time.sleep(2) + logger.error(f"Retrying failed with status 666 again. Reverting to original master_port {master_port}") + resp = session.get(get_batches_url) + + if resp.status_code == 200: + LAST_SUCCESSFUL_PORT = master_port if resp.url.endswith(f":{master_port}/get-batches") else master_port - 1 + logger.debug(f"Successfully fetched batches from port {LAST_SUCCESSFUL_PORT}") batches = resp.json() root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None] proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None] @@ -236,8 +258,11 @@ def main( master_port: int, output_path: str, ttl: int, + mode :str, ): print(f"Starting slave {slave_name}") + + if not os.path.exists(tig_worker_path): raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}") @@ -255,7 +280,7 @@ def main( Thread( target=wrap_thread, - args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path) + args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode) ).start() Thread( @@ -263,7 +288,7 @@ def main( args=(purge_folders, output_path, ttl) ).start() - wrap_thread(poll_batches, session, master_ip, master_port, output_path) + wrap_thread(poll_batches, session, master_ip, master_port, output_path, mode) if __name__ == "__main__": @@ -277,6 +302,7 @@ if __name__ == "__main__": parser.add_argument("--verbose", action='store_true', help="Print debug logs") parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)") parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)") + parser.add_argument("--mode", type=str, default=main, help="mainnet/explo") args = parser.parse_args() @@ -285,4 +311,4 @@ if __name__ == "__main__": level=logging.DEBUG if args.verbose else logging.INFO ) - main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl) + main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl, args.mode)