Maj Nv add agg mode
This commit is contained in:
xnico31 2024-12-18 22:08:11 +01:00
parent ca5ce7bfd8
commit 579088ce03

View File

@ -85,7 +85,9 @@ def purge_folders(output_path, ttl):
FINISHED_BATCH_IDS.pop(batch_id)
def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path):
def send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode):
global LAST_SUCCESSFUL_PORT
try:
batch_id = READY_BATCH_IDS.pop()
except KeyError:
@ -93,6 +95,8 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
time.sleep(1)
return
port_port = LAST_SUCCESSFUL_PORT or master_port
output_folder = f"{output_path}/{batch_id}"
with open(f"{output_folder}/batch.json") as f:
batch = json.load(f)
@ -111,18 +115,20 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
with open(f"{output_folder}/result.json") as f:
result = json.load(f)
submit_url = f"http://{master_ip}:{master_port}/submit-batch-root/{batch_id}"
submit_url = f"http://{master_ip}:{port_port}/submit-batch-root/{batch_id}"
logger.info(f"posting root to {submit_url}")
resp = session.post(submit_url, json=result)
if resp.status_code == 200:
FINISHED_BATCH_IDS[batch_id] = now()
logger.info(f"successfully posted root for batch {batch_id}")
elif resp.status_code == 408: # took too long
elif resp.status_code == 408: # took too long
FINISHED_BATCH_IDS[batch_id] = now()
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
else:
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
else:
@ -145,21 +151,23 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
for n in batch["sampled_nonces"]
]
submit_url = f"http://{master_ip}:{master_port}/submit-batch-proofs/{batch_id}"
submit_url = f"http://{master_ip}:{port_port}/submit-batch-proofs/{batch_id}"
logger.info(f"posting proofs to {submit_url}")
resp = session.post(submit_url, json={"merkle_proofs": proofs_to_submit})
if resp.status_code == 200:
FINISHED_BATCH_IDS[batch_id] = now()
logger.info(f"successfully posted proofs for batch {batch_id}")
elif resp.status_code == 408: # took too long
elif resp.status_code == 408: # took too long
FINISHED_BATCH_IDS[batch_id] = now()
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
else:
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
@ -192,12 +200,26 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers,
).start()
def poll_batches(session, master_ip, master_port, output_path):
def poll_batches(session, master_ip, master_port, output_path, mode):
global LAST_SUCCESSFUL_PORT
get_batches_url = f"http://{master_ip}:{master_port}/get-batches"
logger.info(f"fetching batches from {get_batches_url}")
resp = session.get(get_batches_url)
if mode == "mainnet" and resp.status_code == 666:
logger.warning(f"Status 666 received. Retrying with master_port {master_port - 1}")
resp = session.get(f"http://{master_ip}:{master_port - 1}/get-batches")
if resp.status_code == 666:
time.sleep(2)
logger.error(f"Retrying failed with status 666 again. Reverting to original master_port {master_port}")
resp = session.get(get_batches_url)
if resp.status_code == 200:
LAST_SUCCESSFUL_PORT = master_port if resp.url.endswith(f":{master_port}/get-batches") else master_port - 1
logger.debug(f"Successfully fetched batches from port {LAST_SUCCESSFUL_PORT}")
batches = resp.json()
root_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is None]
proofs_batch_ids = [batch['id'] for batch in batches if batch['sampled_nonces'] is not None]
@ -236,8 +258,11 @@ def main(
master_port: int,
output_path: str,
ttl: int,
mode :str,
):
print(f"Starting slave {slave_name}")
if not os.path.exists(tig_worker_path):
raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}")
@ -255,7 +280,7 @@ def main(
Thread(
target=wrap_thread,
args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path, mode)
).start()
Thread(
@ -263,7 +288,7 @@ def main(
args=(purge_folders, output_path, ttl)
).start()
wrap_thread(poll_batches, session, master_ip, master_port, output_path)
wrap_thread(poll_batches, session, master_ip, master_port, output_path, mode)
if __name__ == "__main__":
@ -277,6 +302,7 @@ if __name__ == "__main__":
parser.add_argument("--verbose", action='store_true', help="Print debug logs")
parser.add_argument("--output", type=str, default="results", help="Folder to output results to (default: results)")
parser.add_argument("--ttl", type=int, default=300, help="(Time To Live) Seconds to retain results (default: 300)")
parser.add_argument("--mode", type=str, default=main, help="mainnet/explo")
args = parser.parse_args()
@ -285,4 +311,4 @@ if __name__ == "__main__":
level=logging.DEBUG if args.verbose else logging.INFO
)
main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl)
main(args.master, args.tig_worker_path, args.download, args.workers, args.name, args.port, args.output, args.ttl, args.mode)