This commit is contained in:
xnico31 2024-12-20 19:36:58 +01:00
commit 6b46f97098

View File

@ -11,6 +11,7 @@ import zlib
from threading import Thread
from common.structs import OutputData, MerkleProof
from common.merkle_tree import MerkleTree
from datetime import datetime, timedelta
logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
PENDING_BATCH_IDS = set()
@ -68,6 +69,13 @@ def run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path):
def purge_folders(output_path, ttl):
cutoff_time = datetime.now() - timedelta(hours=2)
for folder in os.listdir(output_path):
folder_path = os.path.join(output_path, folder)
if os.path.isdir(folder_path) and datetime.fromtimestamp(os.path.getmtime(folder_path)) < cutoff_time:
logger.info(f"removing old folder: {folder_path}")
shutil.rmtree(folder_path)
n = now()
purge_batch_ids = [
batch_id
@ -90,13 +98,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
batch_id = READY_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
output_folder = f"{output_path}/{batch_id}"
with open(f"{output_folder}/batch.json") as f:
batch = json.load(f)
if (
not os.path.exists(f"{output_folder}/result.json")
or not os.path.exists(f"{output_folder}/data.zlib")
@ -123,7 +130,6 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
else:
logger.error(f"status {resp.status_code} when posting root for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
else:
with open(f"{output_folder}/data.zlib", "rb") as f:
@ -157,15 +163,12 @@ def send_results(session, master_ip, master_port, tig_worker_path, download_wasm
else:
logger.error(f"status {resp.status_code} when posting proofs for batch {batch_id} to master: {resp.text}")
READY_BATCH_IDS.add(batch_id) # requeue
time.sleep(2)
def process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path):
try:
batch_id = PENDING_BATCH_IDS.pop()
except KeyError:
logger.debug("No pending batches")
time.sleep(1)
return
if (
@ -186,10 +189,7 @@ def process_batch(session, tig_worker_path, download_wasms_folder, num_workers,
wasm_path = os.path.join(download_wasms_folder, f"{batch['settings']['algorithm_id']}.wasm")
download_wasm(session, batch['download_url'], wasm_path)
Thread(
target=run_tig_worker,
args=(tig_worker_path, batch, wasm_path, num_workers, output_path)
).start()
run_tig_worker(tig_worker_path, batch, wasm_path, num_workers, output_path)
def poll_batches(session, master_ip, master_port, output_path):
@ -210,12 +210,9 @@ def poll_batches(session, master_ip, master_port, output_path):
json.dump(batch, f)
PENDING_BATCH_IDS.clear()
PENDING_BATCH_IDS.update(root_batch_ids + proofs_batch_ids)
time.sleep(5)
else:
logger.error(f"status {resp.status_code} when fetching batch: {resp.text}")
time.sleep(5)
def wrap_thread(func, *args):
logger.info(f"Starting thread for {func.__name__}")
@ -242,17 +239,13 @@ def main(
if not os.path.exists(tig_worker_path):
raise FileNotFoundError(f"tig-worker not found at path: {tig_worker_path}")
os.makedirs(download_wasms_folder, exist_ok=True)
os.makedirs(output_path, exist_ok=True)
session = requests.Session()
session.headers.update({
"User-Agent": slave_name
})
Thread(
target=wrap_thread,
args=(send_results, session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
).start()
Thread(
target=wrap_thread,
args=(purge_folders, output_path, ttl)
@ -261,7 +254,12 @@ def main(
while True:
try:
poll_batches(session, master_ip, master_port, output_path)
process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path)
batches_size = len(PENDING_BATCH_IDS)
for _ in range(batches_size):
process_batch(session, tig_worker_path, download_wasms_folder, num_workers, output_path)
send_results(session, master_ip, master_port, tig_worker_path, download_wasms_folder, num_workers, output_path)
if batches_size == 0:
time.sleep(5)
except Exception as e:
logger.error(f"Error in main loop: {e}")
time.sleep(5)