This commit is contained in:
François Patron 2024-12-20 14:19:50 +01:00
parent 292ccc4f06
commit 8a680ddfc3

View File

@ -98,11 +98,7 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
batch_id = READY_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
if FINISHED_BATCH_IDS[batch_id] is not None:
return;
output_folder = f"{output_path}/{batch_id}"
with open(f"{output_folder}/batch.json") as f:
@ -134,7 +130,6 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
else:
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
else:
with open(f"{output_folder}/data.zlib", "rb") as f:
@ -175,15 +170,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
else:
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
if (
@ -204,10 +196,7 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers,
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
download_wasm(session, batch['download_url'], wasm_path)
Thread(
target=run_tig_worker,
args=(tig_worker_path, batch, wasm_path, num_workers, output_path)
).start()
run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path)
def poll_batches(session, master_ip, master_port, output_path):
@ -228,12 +217,9 @@ def poll_batches(session, master_ip, master_port, output_path):
json.dump(batch, f)
PENDING_BATCH_IDS.clear()
PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids)
time.sleep(5)
else:
logger.error(f"status {resp.status_code} when fetching batch: {resp.text}")
time.sleep(5)
def wrap_thread(func, *args):
logger.info(f"Starting thread for {func.__name__}")
@ -266,11 +252,6 @@ def main(
"User-Agent": slave_name
})
Thread(
target=wrap_thread,
args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
).start()
Thread(
target=wrap_thread,
args=(purge_folders, output_path, ttl)
@ -279,7 +260,12 @@ def main(
while True:
try:
poll_batches(session, master_ip, master_port, output_path)
process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path)
batches_size = len(PENDING_BATCH_IDS)
for _ in range(batches_size):
process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path)
send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
if batches_size == 0:
time.sleep(5)
except Exception as e:
logger.error(f"Error in main loop: {e}")
time.sleep(5)