diff --git a/apps/api/src/modules/training-external/services/training-external.service.ts b/apps/api/src/modules/training-external/services/training-external.service.ts index 26cfcd92..6697b236 100644 --- a/apps/api/src/modules/training-external/services/training-external.service.ts +++ b/apps/api/src/modules/training-external/services/training-external.service.ts @@ -323,7 +323,7 @@ export class TrainingExternalService { // the N1-style "custom attachment" path. const instancePolicy: protos.google.cloud.batch.v1.AllocationPolicy.IInstancePolicy = { machineType: mlBatchMachineType, - bootDisk: { sizeGb: String(mlBatchBootDiskGb) }, + bootDisk: { sizeGb: String(mlBatchBootDiskGb), type: "pd-ssd" }, provisioningModel: "SPOT", }; if (mlBatchGpuType && mlBatchGpuCount > 0) { @@ -381,7 +381,7 @@ export class TrainingExternalService { memoryMib: mlBatchTaskMemoryMib, }, maxRunDuration: { seconds: String(mlBatchMaxRunSeconds) }, - maxRetryCount: 3, + maxRetryCount: 10, lifecyclePolicies: [ { action: protos.google.cloud.batch.v1.LifecyclePolicy.Action.RETRY_TASK, diff --git a/apps/ml-yolo/Dockerfile b/apps/ml-yolo/Dockerfile index 0eb23a6f..36d49b54 100644 --- a/apps/ml-yolo/Dockerfile +++ b/apps/ml-yolo/Dockerfile @@ -24,6 +24,16 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN pip install --no-cache-dir --upgrade pip COPY requirements.txt . +# The main env MUST install the CUDA build of torch (the PyPI default, +# which pulls the ~2.5GB nvidia-* wheels). Do NOT add the +# download.pytorch.org/whl/cpu index here: `2.8.0+cpu` satisfies the +# `torch==2.8.0` pin and pip prefers it, but +cpu wheels are compiled +# without CUDA support entirely — torch.cuda.is_available() is then +# false no matter what the host provides, and Ultralytics silently +# trains on CPU. Cloud Batch's installGpuDrivers only supplies the +# kernel driver + libcuda.so; the CUDA user-space libs (cuBLAS, cuDNN, +# NCCL, ...) ship exclusively in these wheels. The tools venv below is +# different: export runs on CPU by design, so +cpu is correct there. RUN pip install --no-cache-dir --timeout=120 --retries=5 -r requirements.txt # luxonis/tools — used at export time to convert Ultralytics .pt to a @@ -57,7 +67,11 @@ RUN python3 -m venv /opt/tools-venv && \ git checkout edbe7da1a7f75833a71d65caf1028036faa81061 && \ git submodule update --init --recursive && \ PIP_CONSTRAINT=constraints.txt /opt/tools-venv/bin/pip install --no-cache-dir --timeout=120 --retries=5 . && \ - rm -rf /tmp/luxonis-tools + rm -rf /tmp/luxonis-tools && \ + # Purge heavy build dependencies after compilation and git clones are done + apt-get purge -y build-essential cmake git && \ + apt-get autoremove -y && \ + rm -rf /var/lib/apt/lists/* ENV ROBOPIPE_TOOLS_BIN=/opt/tools-venv/bin/tools @@ -130,30 +144,36 @@ ENV IN_DOCKER=1 ENV ROBOPIPE_MODELCONVERTER_BIN=/opt/modelconverter-venv/bin/modelconverter ENV ROBOPIPE_SNPE_ROOT=/opt/snpe -# Pre-download all yolo11 detection + segmentation pretrained weights into the -# image. Ultralytics' YOLO(name) constructor calls safe_download() which has a -# known silent-failure path: if a retry deletes a partial file and the loop -# exits without raising, attempt_download_asset() returns a Path to a -# non-existent file and the next torch.load() raises FileNotFoundError. By -# baking the weights at /app (the WORKDIR), YOLO()'s first existence check -# (`Path(file).exists()` against cwd) hits and skips the download entirely. -# Pin to v8.3.0 — the release tag that hosts the yolo11 family weights. +# Pre-download all yolo11 detection, segmentation, and classification pretrained +# weights into the image. Ultralytics' YOLO(name) constructor calls +# safe_download() which has a known silent-failure path: if a retry deletes a +# partial file and the loop exits without raising, attempt_download_asset() +# returns a Path to a non-existent file and the next torch.load() raises +# FileNotFoundError. By baking the weights at /app (the WORKDIR), YOLO()'s first +# existence check (`Path(file).exists()` against cwd) hits and skips the +# download entirely. Pin to v8.3.0 — the release tag that hosts the yolo11 family. RUN cd /app && for v in \ yolo11n yolo11s yolo11m yolo11l yolo11x \ yolo11n-seg yolo11s-seg yolo11m-seg yolo11l-seg yolo11x-seg \ + yolo11n-cls yolo11s-cls yolo11m-cls yolo11l-cls yolo11x-cls \ ; do \ curl -fL --retry 3 -o ${v}.pt \ https://github.com/ultralytics/assets/releases/download/v8.3.0/${v}.pt; \ done +# Bake Ultralytics fonts into the image. Ultralytics downloads Arial.ttf and +# Arial.Unicode.ttf at runtime if missing from $YOLO_CONFIG_DIR. By baking them +# into a persistent /opt/ultralytics path, we save ~3-5s of start-up latency. +ENV YOLO_CONFIG_DIR=/opt/ultralytics +RUN mkdir -p ${YOLO_CONFIG_DIR} && \ + curl -fL --retry 3 -o ${YOLO_CONFIG_DIR}/Arial.ttf https://ultralytics.com/assets/Arial.ttf && \ + curl -fL --retry 3 -o ${YOLO_CONFIG_DIR}/Arial.Unicode.ttf https://ultralytics.com/assets/Arial.Unicode.ttf + COPY app/ ./app/ RUN mkdir -p /app/export /app/dataset ENV PYTHONUNBUFFERED=1 -# Cloud Batch containers don't always have a writable $HOME; pin Ultralytics' -# settings dir to /tmp to avoid the "not writable" warning on every run. -ENV YOLO_CONFIG_DIR=/tmp/Ultralytics # Default to Cloud Batch job mode. Docker-compose overrides CMD to run the # FastAPI server for local HTTP testing. diff --git a/apps/ml-yolo/app/ml/dataset.py b/apps/ml-yolo/app/ml/dataset.py index ad266477..995421b8 100644 --- a/apps/ml-yolo/app/ml/dataset.py +++ b/apps/ml-yolo/app/ml/dataset.py @@ -1,9 +1,11 @@ +import hashlib import math import random import requests import shutil import yaml import os +from concurrent.futures import ThreadPoolExecutor from ..models.dataset_config import DatasetConfig from ..models.image import Image @@ -23,7 +25,7 @@ def copy_image(image: Image, dest: str): image_path = image.file_url if image_path.startswith("http://") or image_path.startswith("https://"): - response = requests.get(image_path, stream=True) + response = requests.get(image_path, stream=True, timeout=30) if response.status_code == 200: filename = os.path.basename(image_path) dest_path = os.path.join(dest, filename) @@ -57,9 +59,24 @@ def prepare_dataset_config(config: DatasetConfig, dir: str): yaml.dump({"names": get_label_mapping(config)}, f) -def iterate_datasets(images: list[Image], config: DatasetConfig): - shuffled_images = list(images) - random.shuffle(shuffled_images) +def split_seed(seed_source: str) -> int: + """Derive a deterministic RNG seed from a stable string (the model id). + + sha256, not the built-in hash() — hash() is salted per process + (PYTHONHASHSEED), so it would produce a different seed on every VM and + defeat the purpose. + """ + return int.from_bytes(hashlib.sha256(seed_source.encode()).digest()[:8], "big") + + +def iterate_datasets(images: list[Image], config: DatasetConfig, seed: int): + # The split must be identical across Cloud Batch task attempts: a Spot + # retry resumes from a checkpoint, and a re-drawn split would leak + # already-trained images into val/test. Sort by file_url first so the + # result is also independent of payload ordering, then shuffle with a + # seeded RNG isolated from the global `random` state. + shuffled_images = sorted(images, key=lambda image: image.file_url) + random.Random(seed).shuffle(shuffled_images) train_split, val_split, _ = (s / 100.0 for s in config.dataset_split) total_images = len(images) train_end = int(total_images * train_split) @@ -86,7 +103,11 @@ def prepare_classification_directory( def prepare_dataset( - dir: str, images: list[Image], config: DatasetConfig, task_type: ModelType + dir: str, + images: list[Image], + config: DatasetConfig, + task_type: ModelType, + seed: int, ): global VAL_DIR dir = f"{dir}/{DATASET_DIR}" @@ -102,7 +123,8 @@ def prepare_dataset( prepare_dirs(label_dir) prepare_dataset_config(config, dir) - for image, curr_dir in iterate_datasets(images, config): + def _download_task(args): + image, curr_dir = args if task_type == ModelType.CLASSIFICATION: label_name = label_mapping[image.labels[0].label.label_number] copy_image(image, f"{dir}/{curr_dir}/{label_name}") @@ -113,3 +135,11 @@ def prepare_dataset( copy_image(image, f"{image_dir}/{curr_dir}") with open(f"{label_dir}/{curr_dir}/{label_filename}", "w") as f: f.write("\n".join(image.labels_str(task_type))) + + tasks = list(iterate_datasets(images, config, seed)) + max_workers = min(32, len(tasks) or 1) + print(f"[ml-yolo] Downloading {len(tasks)} images using {max_workers} workers...") + with ThreadPoolExecutor(max_workers=max_workers) as executor: + list(executor.map(_download_task, tasks)) + print(f"[ml-yolo] Dataset preparation complete.") + diff --git a/apps/ml-yolo/app/ml/preprocess.py b/apps/ml-yolo/app/ml/preprocess.py index 3ae27ea5..655209f8 100644 --- a/apps/ml-yolo/app/ml/preprocess.py +++ b/apps/ml-yolo/app/ml/preprocess.py @@ -9,6 +9,7 @@ import os from pathlib import Path +from concurrent.futures import ThreadPoolExecutor import albumentations as A import cv2 @@ -109,38 +110,69 @@ def _write_polygon_labels( f.write(f"{int(cls_id)} {points_str}\n") +def _process_file_classification(filepath, label_dir, filename, pipeline, keep_originals): + image = cv2.imread(filepath) + if image is None: + return 0 + + result = pipeline(image=image) + ext = Path(filename).suffix.lower() + + if keep_originals: + stem = Path(filename).stem + out_path = os.path.join(label_dir, f"{stem}{PREPROCESS_SUFFIX}{ext}") + cv2.imwrite(out_path, result["image"]) + else: + cv2.imwrite(filepath, result["image"]) + return 1 + + def _process_classification( split_dir: str, pipeline: A.Compose, keep_originals: bool, ) -> int: """Apply the full preprocessing pipeline to all images in a classification split.""" - count = 0 + tasks = [] for label_dir_name in os.listdir(split_dir): label_dir = os.path.join(split_dir, label_dir_name) if not os.path.isdir(label_dir): continue - for filename in list(os.listdir(label_dir)): + for filename in os.listdir(label_dir): filepath = os.path.join(label_dir, filename) ext = Path(filename).suffix.lower() if ext not in IMAGE_EXTENSIONS: continue + tasks.append((filepath, label_dir, filename, pipeline, keep_originals)) - image = cv2.imread(filepath) - if image is None: - continue + with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: + results = executor.map(lambda args: _process_file_classification(*args), tasks) + return sum(results) - result = pipeline(image=image) - if keep_originals: - stem = Path(filename).stem - out_path = os.path.join(label_dir, f"{stem}{PREPROCESS_SUFFIX}{ext}") - cv2.imwrite(out_path, result["image"]) - else: - cv2.imwrite(filepath, result["image"]) - count += 1 +def _process_file_detection(filepath, image_split_dir, label_split_dir, filename, compose, keep_originals): + image = cv2.imread(filepath) + if image is None: + return 0 + + stem = Path(filename).stem + ext = Path(filename).suffix.lower() + label_path = os.path.join(label_split_dir, f"{stem}.txt") + bboxes, class_ids = _parse_yolo_labels(label_path) - return count + result = compose(image=image, bboxes=bboxes, class_labels=class_ids) + out_bboxes = [list(b) for b in result["bboxes"]] + out_class_ids = result["class_labels"] + + if keep_originals: + out_img_path = os.path.join(image_split_dir, f"{stem}{PREPROCESS_SUFFIX}{ext}") + out_label_path = os.path.join(label_split_dir, f"{stem}{PREPROCESS_SUFFIX}.txt") + cv2.imwrite(out_img_path, result["image"]) + _write_yolo_labels(out_label_path, out_bboxes, out_class_ids) + else: + cv2.imwrite(filepath, result["image"]) + _write_yolo_labels(label_path, out_bboxes, out_class_ids) + return 1 def _process_detection( @@ -150,7 +182,6 @@ def _process_detection( keep_originals: bool, ) -> int: """Apply the full preprocessing pipeline to all images and YOLO labels.""" - count = 0 compose = A.Compose( pipeline_transforms, bbox_params=A.BboxParams( @@ -160,39 +191,64 @@ def _process_detection( ), ) - for filename in list(os.listdir(image_split_dir)): + tasks = [] + for filename in os.listdir(image_split_dir): filepath = os.path.join(image_split_dir, filename) ext = Path(filename).suffix.lower() if ext not in IMAGE_EXTENSIONS: continue - stem = Path(filename).stem - - image = cv2.imread(filepath) - if image is None: - continue + tasks.append((filepath, image_split_dir, label_split_dir, filename, compose, keep_originals)) - label_path = os.path.join(label_split_dir, f"{stem}.txt") - bboxes, class_ids = _parse_yolo_labels(label_path) + with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: + results = executor.map(lambda args: _process_file_detection(*args), tasks) + return sum(results) - result = compose(image=image, bboxes=bboxes, class_labels=class_ids) - out_bboxes = [list(b) for b in result["bboxes"]] - out_class_ids = result["class_labels"] - if keep_originals: - out_img_path = os.path.join( - image_split_dir, f"{stem}{PREPROCESS_SUFFIX}{ext}" - ) - out_label_path = os.path.join( - label_split_dir, f"{stem}{PREPROCESS_SUFFIX}.txt" - ) - cv2.imwrite(out_img_path, result["image"]) - _write_yolo_labels(out_label_path, out_bboxes, out_class_ids) - else: - cv2.imwrite(filepath, result["image"]) - _write_yolo_labels(label_path, out_bboxes, out_class_ids) - count += 1 +def _process_file_segmentation(filepath, image_split_dir, label_split_dir, filename, compose, keep_originals): + image = cv2.imread(filepath) + if image is None: + return 0 - return count + stem = Path(filename).stem + ext = Path(filename).suffix.lower() + img_h, img_w = image.shape[:2] + label_path = os.path.join(label_split_dir, f"{stem}.txt") + polygons, class_ids = _parse_polygon_labels(label_path) + + # Flatten polygon points into keypoints (pixel coords) + keypoints = [] + poly_map = [] # (polygon_idx, point_count) + for poly_idx, points in enumerate(polygons): + poly_map.append((poly_idx, len(points))) + for x_norm, y_norm in points: + keypoints.append((x_norm * img_w, y_norm * img_h)) + + result = compose(image=image, keypoints=keypoints) + out_image = result["image"] + out_keypoints = result["keypoints"] + new_h, new_w = out_image.shape[:2] + + # Reconstruct polygons from transformed keypoints + out_polygons = [] + kp_idx = 0 + for _, point_count in poly_map: + poly_points = [] + for _ in range(point_count): + if kp_idx < len(out_keypoints): + px, py = out_keypoints[kp_idx] + poly_points.append((px / new_w, py / new_h)) + kp_idx += 1 + out_polygons.append(poly_points) + + if keep_originals: + out_img_path = os.path.join(image_split_dir, f"{stem}{PREPROCESS_SUFFIX}{ext}") + out_label_path = os.path.join(label_split_dir, f"{stem}{PREPROCESS_SUFFIX}.txt") + cv2.imwrite(out_img_path, out_image) + _write_polygon_labels(out_label_path, out_polygons, class_ids) + else: + cv2.imwrite(filepath, out_image) + _write_polygon_labels(label_path, out_polygons, class_ids) + return 1 def _process_segmentation( @@ -202,68 +258,22 @@ def _process_segmentation( keep_originals: bool, ) -> int: """Apply the full preprocessing pipeline to all images and polygon labels.""" - count = 0 + compose = A.Compose( + pipeline_transforms, + keypoint_params=A.KeypointParams(format="xy", remove_invisible=False), + ) - for filename in list(os.listdir(image_split_dir)): + tasks = [] + for filename in os.listdir(image_split_dir): filepath = os.path.join(image_split_dir, filename) ext = Path(filename).suffix.lower() if ext not in IMAGE_EXTENSIONS: continue - stem = Path(filename).stem - - image = cv2.imread(filepath) - if image is None: - continue + tasks.append((filepath, image_split_dir, label_split_dir, filename, compose, keep_originals)) - img_h, img_w = image.shape[:2] - label_path = os.path.join(label_split_dir, f"{stem}.txt") - polygons, class_ids = _parse_polygon_labels(label_path) - - # Flatten polygon points into keypoints (pixel coords) - keypoints = [] - poly_map = [] # (polygon_idx, point_count) - for poly_idx, points in enumerate(polygons): - poly_map.append((poly_idx, len(points))) - for x_norm, y_norm in points: - keypoints.append((x_norm * img_w, y_norm * img_h)) - - compose = A.Compose( - pipeline_transforms, - keypoint_params=A.KeypointParams(format="xy", remove_invisible=False), - ) - - result = compose(image=image, keypoints=keypoints) - out_image = result["image"] - out_keypoints = result["keypoints"] - new_h, new_w = out_image.shape[:2] - - # Reconstruct polygons from transformed keypoints - out_polygons = [] - kp_idx = 0 - for _, point_count in poly_map: - poly_points = [] - for _ in range(point_count): - if kp_idx < len(out_keypoints): - px, py = out_keypoints[kp_idx] - poly_points.append((px / new_w, py / new_h)) - kp_idx += 1 - out_polygons.append(poly_points) - - if keep_originals: - out_img_path = os.path.join( - image_split_dir, f"{stem}{PREPROCESS_SUFFIX}{ext}" - ) - out_label_path = os.path.join( - label_split_dir, f"{stem}{PREPROCESS_SUFFIX}.txt" - ) - cv2.imwrite(out_img_path, out_image) - _write_polygon_labels(out_label_path, out_polygons, class_ids) - else: - cv2.imwrite(filepath, out_image) - _write_polygon_labels(label_path, out_polygons, class_ids) - count += 1 - - return count + with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: + results = executor.map(lambda args: _process_file_segmentation(*args), tasks) + return sum(results) def _run_pipeline( diff --git a/apps/ml-yolo/app/ml/train_model.py b/apps/ml-yolo/app/ml/train_model.py index 4a95d2c3..f70a1720 100644 --- a/apps/ml-yolo/app/ml/train_model.py +++ b/apps/ml-yolo/app/ml/train_model.py @@ -27,6 +27,7 @@ TRAIN_DIR, VAL_DIR, prepare_dataset, + split_seed, ) from .model_conversion import convert_model from .modelconverter_conversion import ( @@ -211,6 +212,10 @@ def run_training(config: ModelConfig) -> None: config.data, config.training_config.dataset_config, config.type, + # Seeded by model id so a preempted Spot task re-creates the + # exact same train/val/test split before resuming from the + # checkpoint (see iterate_datasets). + seed=split_seed(str(config.id)), ) # 2) Optional deterministic preprocessings. @@ -246,7 +251,17 @@ def run_training(config: ModelConfig) -> None: variant = get_model_variant(config) if has_checkpoint: print(f"[ml-yolo] Resuming from checkpoint: {checkpoint_path}") - model = YOLO(checkpoint_path) + try: + model = YOLO(checkpoint_path) + except Exception as e: + # A torn/corrupt checkpoint in GCS must not crash-loop the + # Batch task — fall back to a fresh start. + print( + f"[ml-yolo] Checkpoint unusable ({e}); " + f"starting fresh from {variant}" + ) + has_checkpoint = False + model = YOLO(variant) else: print(f"[ml-yolo] Loading Ultralytics model: {variant}") model = YOLO(variant) @@ -265,7 +280,9 @@ def run_training(config: ModelConfig) -> None: if config.checkpoint_config: from .ultralytics_callbacks import CheckpointCallback checkpoint_cb = CheckpointCallback(config.checkpoint_config.put_url) - model.add_callback("on_fit_epoch_end", checkpoint_cb.on_fit_epoch_end) + # on_model_save fires after the trainer finishes writing + # last.pt; hooking on_fit_epoch_end would race the write. + model.add_callback("on_model_save", checkpoint_cb.on_model_save) # 5) Train. project_dir = os.path.join(workdir, "runs") @@ -291,6 +308,18 @@ def run_training(config: ModelConfig) -> None: # Archive." tools-produced archives carry the heads through. save_dir = Path(model.trainer.save_dir) best_pt = save_dir / "weights" / "best.pt" + if not best_pt.exists(): + # Resumed runs restore best_fitness (the number) from the + # checkpoint, but best.pt (the file) lived on the preempted VM. + # Ultralytics only rewrites best.pt when a post-resume epoch + # reaches that restored fitness, so it may never appear. The + # final-epoch weights are the best artifact on this VM. + last_pt = save_dir / "weights" / "last.pt" + print( + f"[ml-yolo] best.pt missing (resumed run where pre-preemption " + f"best was never beaten); falling back to {last_pt}" + ) + best_pt = last_pt print(f"[ml-yolo] Exporting via luxonis/tools from {best_pt}") onnx_path, archive_path = _export_via_tools( best_pt, _resolve_imgsz(config), workdir diff --git a/apps/ml-yolo/app/ml/ultralytics_callbacks.py b/apps/ml-yolo/app/ml/ultralytics_callbacks.py index 3562f52e..00d98f71 100644 --- a/apps/ml-yolo/app/ml/ultralytics_callbacks.py +++ b/apps/ml-yolo/app/ml/ultralytics_callbacks.py @@ -6,6 +6,7 @@ """ import math +import shutil from pathlib import Path import threading from typing import Any, Optional @@ -34,21 +35,54 @@ def _upload_checkpoint_worker(put_url: str, file_path: Path) -> None: class CheckpointCallback: - """Callback to upload last.pt to GCS after each epoch.""" + """Upload last.pt to GCS after each checkpoint save. + + Hooked on on_model_save, NOT on_fit_epoch_end: the trainer writes + last.pt between those two callbacks, so reading it at on_fit_epoch_end + races the write and can upload a torn file — which then poisons the + Spot resume path (torch.load fails, the task crashes, Batch retries + into the same corrupt checkpoint). + + last.pt is stable during on_model_save and untouched until the next + epoch's save, so we snapshot it with a cheap local copy and upload the + snapshot from a background thread — training never blocks on GCS. + Single-flight: if the previous upload is still running, this epoch is + skipped; the next save uploads a fresher checkpoint anyway. The lock + also guarantees the snapshot file is never overwritten mid-upload. + """ def __init__(self, put_url: str): self.put_url = put_url + self._upload_lock = threading.Lock() - def on_fit_epoch_end(self, trainer: Any) -> None: + def on_model_save(self, trainer: Any) -> None: last_pt = Path(trainer.save_dir) / "weights" / "last.pt" - # Run upload in background thread to not block training. + if not last_pt.exists(): + return + if not self._upload_lock.acquire(blocking=False): + print("[ml-yolo] Previous checkpoint upload still in flight, skipping") + return + snapshot = last_pt.with_name("last_upload_snapshot.pt") + try: + shutil.copyfile(last_pt, snapshot) + except Exception as e: + self._upload_lock.release() + print(f"[ml-yolo] Failed to snapshot checkpoint: {e}") + return thread = threading.Thread( - target=_upload_checkpoint_worker, - args=(self.put_url, last_pt), + target=self._upload_and_release, + args=(snapshot,), daemon=True, ) thread.start() + def _upload_and_release(self, snapshot: Path) -> None: + try: + _upload_checkpoint_worker(self.put_url, snapshot) + finally: + snapshot.unlink(missing_ok=True) + self._upload_lock.release() + def _coerce_float(value: Any) -> Optional[float]: if value is None: diff --git a/apps/ml-yolo/app/ml/ultralytics_config.py b/apps/ml-yolo/app/ml/ultralytics_config.py index 21fda86c..317ca353 100644 --- a/apps/ml-yolo/app/ml/ultralytics_config.py +++ b/apps/ml-yolo/app/ml/ultralytics_config.py @@ -9,7 +9,7 @@ lr0, lrf, momentum, weight_decay, warmup_epochs, close_mosaic, box, cls, dfl, hsv_h, hsv_s, hsv_v, degrees, translate, scale, shear, perspective, flipud, fliplr, mosaic, mixup, copy_paste, optimizer, cos_lr, patience, - imgsz, workers, device, amp + imgsz, workers, device, amp, batch Plus two ml-yolo-specific keys stripped before passthrough: backend — consumed by the API dispatcher model_variant — pretrained weights filename (e.g. yolo11m.pt) @@ -90,16 +90,27 @@ def build_train_kwargs( custom.pop("backend", None) custom.pop("model_variant", None) + # The API doesn't send batch_size today, so without this the Pydantic + # default (8) would always apply — wasting most of an A100. A float in + # (0, 1) tells Ultralytics to auto-size the batch to that fraction of + # CUDA memory; on CPU-only runs (local Docker smoke tests) it ignores + # the fraction and falls back to its default batch of 16. An explicit + # batch_size in the payload still wins, as does a `batch` key in + # custom_hyperparams via the merge below. + batch = tc.batch_size if "batch_size" in tc.model_fields_set else 0.7 + kwargs: dict[str, Any] = { "data": data_path, "epochs": tc.epochs, - "batch": tc.batch_size, + "batch": batch, "project": project_dir, "name": str(config.id), "exist_ok": True, "verbose": True, + "cache": True, # Cache images in RAM for much faster epoch times } - # custom_hyperparams wins over defaults but not over the dispatch kwargs above. + # custom_hyperparams is merged last, so user-supplied keys (e.g. `batch`, + # `imgsz`) override the dispatch defaults above. for key, value in custom.items(): kwargs[key] = value return kwargs diff --git a/apps/ml/Dockerfile b/apps/ml/Dockerfile index f97d9215..6cf4621e 100644 --- a/apps/ml/Dockerfile +++ b/apps/ml/Dockerfile @@ -20,12 +20,20 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Copy requirements and install Python dependencies COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +# Install main requirements using the CPU-only PyTorch index. +# This prevents pip from pulling massive nvidia-* CUDA wheels. +RUN pip install --no-cache-dir \ + --extra-index-url https://download.pytorch.org/whl/cpu \ + -r requirements.txt # Apply patches to installed dependencies COPY patches/ ./patches/ COPY postinstall.sh . -RUN bash postinstall.sh +RUN bash postinstall.sh && \ + # Purge heavy build dependencies after compilation and patching + apt-get purge -y build-essential gcc g++ cmake git && \ + apt-get autoremove -y && \ + rm -rf /var/lib/apt/lists/* # Copy application code COPY app/ ./app/ diff --git a/apps/ml/app/ml/dataset.py b/apps/ml/app/ml/dataset.py index ad266477..c78db4c9 100644 --- a/apps/ml/app/ml/dataset.py +++ b/apps/ml/app/ml/dataset.py @@ -4,6 +4,7 @@ import shutil import yaml import os +from concurrent.futures import ThreadPoolExecutor from ..models.dataset_config import DatasetConfig from ..models.image import Image @@ -23,7 +24,7 @@ def copy_image(image: Image, dest: str): image_path = image.file_url if image_path.startswith("http://") or image_path.startswith("https://"): - response = requests.get(image_path, stream=True) + response = requests.get(image_path, stream=True, timeout=30) if response.status_code == 200: filename = os.path.basename(image_path) dest_path = os.path.join(dest, filename) @@ -102,7 +103,8 @@ def prepare_dataset( prepare_dirs(label_dir) prepare_dataset_config(config, dir) - for image, curr_dir in iterate_datasets(images, config): + def _download_task(args): + image, curr_dir = args if task_type == ModelType.CLASSIFICATION: label_name = label_mapping[image.labels[0].label.label_number] copy_image(image, f"{dir}/{curr_dir}/{label_name}") @@ -113,3 +115,11 @@ def prepare_dataset( copy_image(image, f"{image_dir}/{curr_dir}") with open(f"{label_dir}/{curr_dir}/{label_filename}", "w") as f: f.write("\n".join(image.labels_str(task_type))) + + tasks = list(iterate_datasets(images, config)) + max_workers = min(32, len(tasks) or 1) + print(f"[ml] Downloading {len(tasks)} images using {max_workers} workers...") + with ThreadPoolExecutor(max_workers=max_workers) as executor: + list(executor.map(_download_task, tasks)) + print(f"[ml] Dataset preparation complete.") +