diff --git a/apps/api/src/modules/training-external/schema/training-external.schema.ts b/apps/api/src/modules/training-external/schema/training-external.schema.ts index 4f9d178f..bb13e061 100644 --- a/apps/api/src/modules/training-external/schema/training-external.schema.ts +++ b/apps/api/src/modules/training-external/schema/training-external.schema.ts @@ -131,10 +131,16 @@ export const trainingOutputUploadSchema = z.object({ object_path: z.string(), }); +export const trainingCheckpointConfigSchema = z.object({ + put_url: z.string(), + get_url: z.string(), +}); + const basePayload = z.object({ id: z.number(), training_config: trainingConfigSchema, output_config: trainingOutputUploadSchema.array(), + checkpoint_config: trainingCheckpointConfigSchema.optional(), }); const createDataSchema = (labelSchema: T) => diff --git a/apps/api/src/modules/training-external/services/training-external.service.ts b/apps/api/src/modules/training-external/services/training-external.service.ts index 5b4f53f5..26cfcd92 100644 --- a/apps/api/src/modules/training-external/services/training-external.service.ts +++ b/apps/api/src/modules/training-external/services/training-external.service.ts @@ -324,6 +324,7 @@ export class TrainingExternalService { const instancePolicy: protos.google.cloud.batch.v1.AllocationPolicy.IInstancePolicy = { machineType: mlBatchMachineType, bootDisk: { sizeGb: String(mlBatchBootDiskGb) }, + provisioningModel: "SPOT", }; if (mlBatchGpuType && mlBatchGpuCount > 0) { instancePolicy.accelerators = [ @@ -380,6 +381,15 @@ export class TrainingExternalService { memoryMib: mlBatchTaskMemoryMib, }, maxRunDuration: { seconds: String(mlBatchMaxRunSeconds) }, + maxRetryCount: 3, + lifecyclePolicies: [ + { + action: protos.google.cloud.batch.v1.LifecyclePolicy.Action.RETRY_TASK, + actionCondition: { + exitCodes: [50001], + }, + }, + ], }, }, ], @@ -464,16 +474,40 @@ export class TrainingExternalService { } }) + const isUltralytics = model.backend === ModelBackendEnum.ULTRALYTICS; + + const checkpointObjectPath = `${model.projectId}/model/${model.id}/checkpoint_last.pt`; + const [checkpointPutUrl, checkpointGetUrl] = isUltralytics + ? await Promise.all([ + this.assetsService.generateSignedUploadUrl( + checkpointObjectPath, + "application/octet-stream", + UPLOAD_URL_TTL_MS, + ), + this.assetsService.generateSignedDownloadUrl( + checkpointObjectPath, + UPLOAD_URL_TTL_MS, + ), + ]) + : [undefined, undefined]; + const basePayload: TrainingBasePayload = { id: model.id, output_config: outputUploads, + ...(checkpointPutUrl && + checkpointGetUrl && { + checkpoint_config: { + put_url: checkpointPutUrl, + get_url: checkpointGetUrl, + }, + }), training_config: { output_types: model.outputTypes, epochs: model.epochs, // ml-yolo consumes this to flip HubAI's quantization_mode between // FP16_STANDARD and INT8_STANDARD. Omit for Luxonis — its Pydantic // model rejects unknown keys (extra="forbid"). - ...(model.backend === ModelBackendEnum.ULTRALYTICS && { + ...(isUltralytics && { quantization: model.quantization, }), dataset_config: { diff --git a/apps/ml-yolo/app/ml/train_model.py b/apps/ml-yolo/app/ml/train_model.py index 0e979a56..4a95d2c3 100644 --- a/apps/ml-yolo/app/ml/train_model.py +++ b/apps/ml-yolo/app/ml/train_model.py @@ -194,233 +194,270 @@ def run_training(config: ModelConfig) -> None: api_key = cfg.api_key callbacks: Optional[WebhookCallbacks] = None + # Use a fixed workdir path inside the container. Ultralytics bakes the + # absolute path to data.yaml into checkpoints; a fixed path ensures + # those internal references stay valid if a Spot instance is preempted + # and the task restarts on a new VM. + workdir = "/tmp/robopipe-ml-yolo-workdir" + if os.path.exists(workdir): + import shutil + shutil.rmtree(workdir, ignore_errors=True) + os.makedirs(workdir, exist_ok=True) + try: - with tempfile.TemporaryDirectory() as workdir: - # 1) Prepare YOLO-format dataset on disk (shared with luxonis pipeline). - prepare_dataset( + # 1) Prepare YOLO-format dataset on disk (shared with luxonis pipeline). + prepare_dataset( + workdir, + config.data, + config.training_config.dataset_config, + config.type, + ) + + # 2) Optional deterministic preprocessings. + if config.training_config.dataset_config.preprocessings: + preprocess_dataset( workdir, - config.data, - config.training_config.dataset_config, + config.training_config.dataset_config.preprocessings, config.type, + _resolve_imgsz(config), ) - # 2) Optional deterministic preprocessings. - if config.training_config.dataset_config.preprocessings: - preprocess_dataset( - workdir, - config.training_config.dataset_config.preprocessings, - config.type, - _resolve_imgsz(config), - ) - - # 3) Write Ultralytics data.yaml (or resolve classification root). - data_path = build_data_yaml(config, workdir) - - # 4) Load pretrained weights + wire callbacks. - variant = get_model_variant(config) + # 3) Write Ultralytics data.yaml (or resolve classification root). + data_path = build_data_yaml(config, workdir) + + # 4) Load pretrained weights + wire callbacks. + checkpoint_path = os.path.join(workdir, "checkpoint_last.pt") + has_checkpoint = False + if config.checkpoint_config: + try: + print(f"[ml-yolo] Checking for existing checkpoint...") + r = requests.get(config.checkpoint_config.get_url, stream=True, timeout=30) + if r.status_code == 200: + with open(checkpoint_path, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + print(f"[ml-yolo] Downloaded checkpoint to {checkpoint_path}") + has_checkpoint = True + else: + print(f"[ml-yolo] No existing checkpoint found (status {r.status_code})") + except Exception as e: + print(f"[ml-yolo] Error downloading checkpoint: {e}") + + variant = get_model_variant(config) + if has_checkpoint: + print(f"[ml-yolo] Resuming from checkpoint: {checkpoint_path}") + model = YOLO(checkpoint_path) + else: print(f"[ml-yolo] Loading Ultralytics model: {variant}") model = YOLO(variant) - if webhook_url is not None: - callbacks = WebhookCallbacks( - webhook_url=f"{webhook_url}/progress/{config.id}", - api_key=api_key, - model_id=config.id, - model_type=config.type, - label_ids=config.training_config.dataset_config.label_ids, - ) - model.add_callback("on_fit_epoch_end", callbacks.on_fit_epoch_end) - model.add_callback("on_train_end", callbacks.on_train_end) - - # 5) Train. - project_dir = os.path.join(workdir, "runs") - train_kwargs = build_train_kwargs(config, data_path, project_dir) - print(f"[ml-yolo] train kwargs: {train_kwargs}") - model.train(**train_kwargs) - - # 6) Export best weights via luxonis/tools (subprocess, separate - # venv) → ONNX with multi-output naming the camera-side parsers - # expect, plus an NN archive with proper `heads` metadata. We - # feed the *archive* (not raw ONNX) to HubAI below so the heads - # block survives RVC4 compilation; tools' graph surgery is what - # makes detection, segmentation, pose, and OBB all work without - # us reimplementing the head layout per task type. - # - # The previous path (`best.export(format="onnx", simplify=False, - # opset=18, dynamic=False)`) produced a single-output Ultralytics - # ONNX (`output0`, shape `(1, 4+nc, num_anchors)`), which HubAI - # would compile but emit an archive without heads — segmentation - # then crashed on the camera with "No heads defined in the NN - # Archive." tools-produced archives carry the heads through. - save_dir = Path(model.trainer.save_dir) - best_pt = save_dir / "weights" / "best.pt" - print(f"[ml-yolo] Exporting via luxonis/tools from {best_pt}") - onnx_path, archive_path = _export_via_tools( - best_pt, _resolve_imgsz(config), workdir + if webhook_url is not None: + callbacks = WebhookCallbacks( + webhook_url=f"{webhook_url}/progress/{config.id}", + api_key=api_key, + model_id=config.id, + model_type=config.type, + label_ids=config.training_config.dataset_config.label_ids, ) + model.add_callback("on_fit_epoch_end", callbacks.on_fit_epoch_end) + model.add_callback("on_train_end", callbacks.on_train_end) + + if config.checkpoint_config: + from .ultralytics_callbacks import CheckpointCallback + checkpoint_cb = CheckpointCallback(config.checkpoint_config.put_url) + model.add_callback("on_fit_epoch_end", checkpoint_cb.on_fit_epoch_end) + + # 5) Train. + project_dir = os.path.join(workdir, "runs") + train_kwargs = build_train_kwargs(config, data_path, project_dir) + if has_checkpoint: + train_kwargs["resume"] = True + print(f"[ml-yolo] train kwargs: {train_kwargs}") + model.train(**train_kwargs) + + # 6) Export best weights via luxonis/tools (subprocess, separate + # venv) → ONNX with multi-output naming the camera-side parsers + # expect, plus an NN archive with proper `heads` metadata. We + # feed the *archive* (not raw ONNX) to HubAI below so the heads + # block survives RVC4 compilation; tools' graph surgery is what + # makes detection, segmentation, pose, and OBB all work without + # us reimplementing the head layout per task type. + # + # The previous path (`best.export(format="onnx", simplify=False, + # opset=18, dynamic=False)`) produced a single-output Ultralytics + # ONNX (`output0`, shape `(1, 4+nc, num_anchors)`), which HubAI + # would compile but emit an archive without heads — segmentation + # then crashed on the camera with "No heads defined in the NN + # Archive." tools-produced archives carry the heads through. + save_dir = Path(model.trainer.save_dir) + best_pt = save_dir / "weights" / "best.pt" + print(f"[ml-yolo] Exporting via luxonis/tools from {best_pt}") + onnx_path, archive_path = _export_via_tools( + best_pt, _resolve_imgsz(config), workdir + ) - if webhook_url is None: - print("[ml-yolo] No webhook_url configured, skipping uploads") - return - - # 7) Upload RAW + run HubAI conversions for each requested output. - output_types = config.training_config.output_types - uploads_by_type: dict[ModelOutputType, OutputUpload] = { - u.type: u for u in config.output_config - } - - def upload_for(t: ModelOutputType) -> OutputUpload: - upload = uploads_by_type.get(t) - if upload is None: - raise RuntimeError( - f"No signed upload URL for output type {t.value}" - ) - return upload - - completed: list[OutputUpload] = [] - if ModelOutputType.RAW in output_types: - raw_upload = upload_for(ModelOutputType.RAW) - _upload_to_signed_url(raw_upload, onnx_path) - completed.append(raw_upload) - - non_raw = [t for t in output_types if t != ModelOutputType.RAW] - if non_raw: - _post_progress( - webhook_url, - api_key, - config.id, - {"progress": {"type": "converting"}}, + if webhook_url is None: + print("[ml-yolo] No webhook_url configured, skipping uploads") + return + + # 7) Upload RAW + run HubAI conversions for each requested output. + output_types = config.training_config.output_types + uploads_by_type: dict[ModelOutputType, OutputUpload] = { + u.type: u for u in config.output_config + } + + def upload_for(t: ModelOutputType) -> OutputUpload: + upload = uploads_by_type.get(t) + if upload is None: + raise RuntimeError( + f"No signed upload URL for output type {t.value}" ) + return upload - # Routing: RVC4+INT8 goes through the offline luxonis/modelconverter - # path so we can feed it a real calibration set sampled from the - # training images. Everything else (FP16, RVC2/RVC3 of any - # precision) goes through HubAI — modelconverter could do RVC2/RVC3 - # too but HubAI is fine for those and we'd just be re-implementing - # the OpenVINO chain locally for no win. - is_int8 = config.training_config.quantization == "INT8" + completed: list[OutputUpload] = [] + if ModelOutputType.RAW in output_types: + raw_upload = upload_for(ModelOutputType.RAW) + _upload_to_signed_url(raw_upload, onnx_path) + completed.append(raw_upload) - if is_int8: - quantization_mode = "INT8_INT16_MIXED" - else: - quantization_mode = "FP16_STANDARD" - # HubAI fallback domain — used for RVC2/RVC3 INT8 only (where - # modelconverter isn't on the path). RVC4+INT8 supplies its own - # local calibration dir, so this value is ignored there. - hubai_quantization_data = "GENERAL" if is_int8 else None - - # Sample a calibration set up-front when we know we'll need it, - # so the cost is paid once even if multiple targets request it. - # - # Priority: test → val → train. The test split is held out from - # training, so it's the closest proxy to inference distribution - # and gives the quantizer the best signal for activation ranges. - # We fall back to val and then train only if test alone doesn't - # reach max_images (small datasets, or skewed splits). - calib_dir: str | None = None - if is_int8 and ModelOutputType.RVC4 in non_raw: - calib_dir = os.path.join(workdir, "calib_images") - # Layouts differ by task type — see prepare_dataset: - # classification: /dataset//