Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 200 additions & 1 deletion ami/main/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,108 @@ def update_calculated_fields(self, request: HttpRequest, queryset: QuerySet[Even
update_calculated_fields_for_events(qs=queryset)
self.message_user(request, f"Updated {queryset.count()} events.")

@admin.action(description="Run Occurrence Tracking on selected events")
def run_tracking_on_events(self, request: HttpRequest, queryset: QuerySet[Event]):
from collections import defaultdict

from django.contrib import messages
from django.template.response import TemplateResponse

from ami.ml.post_processing.admin_forms import TrackingActionForm

# Superuser-only: queues background jobs and exposes tunables that change
# determination scoring across an event. Project admins can request a run
# via a superuser; widening the gate is a separate decision.
if not request.user.is_superuser:
self.message_user(request, "Only superusers can trigger tracking jobs.", level=messages.ERROR)
return None

if request.POST.get("confirm"):
form = TrackingActionForm(request.POST, events=queryset)
if not form.is_valid():
# Re-render with errors.
return TemplateResponse(
request,
"admin/main/tracking_confirmation.html",
{
**self.admin_site.each_context(request),
"title": "Run Occurrence Tracking",
"queryset": queryset,
"scope_label": f"{queryset.count()} event(s)",
"scope_summary": (
"One Job is enqueued per project. Each Job processes all "
"selected events from that project and is visible on the "
"Jobs admin page where you can watch its log stream."
),
"form": form,
"action_name": "run_tracking_on_events",
"action_checkbox_name": admin.helpers.ACTION_CHECKBOX_NAME,
"opts": self.model._meta,
},
)

config = form.to_config()
by_project: dict[int, list[int]] = defaultdict(list)
null_project_event_ids: list[int] = []
for ev in queryset.values("pk", "project_id"):
if ev["project_id"] is None:
null_project_event_ids.append(ev["pk"])
continue
by_project[ev["project_id"]].append(ev["pk"])

if null_project_event_ids:
self.message_user(
request,
f"Skipped {len(null_project_event_ids)} event(s) without a project: "
f"{null_project_event_ids}. Fix Event.project before tracking those.",
level=messages.WARNING,
)

jobs = []
for project_id, event_ids in by_project.items():
job = Job.objects.create(
name=f"Post-processing: Tracking on {len(event_ids)} event(s)",
project_id=project_id,
job_type_key="post_processing",
params={
"task": "tracking",
"config": {**config, "event_ids": event_ids},
},
)
job.enqueue()
jobs.append(job.pk)

self.message_user(
request,
f"Queued Tracking for {sum(len(v) for v in by_project.values())} event(s) "
f"across {len(by_project)} project(s). Jobs: {jobs}",
)
return None

# GET / first POST without confirm — render the intermediate page.
form = TrackingActionForm(events=queryset)
return TemplateResponse(
request,
"admin/main/tracking_confirmation.html",
{
**self.admin_site.each_context(request),
"title": "Run Occurrence Tracking",
"queryset": queryset,
"scope_label": f"{queryset.count()} event(s)",
"scope_summary": (
"One Job is enqueued per project. Each Job processes all "
"selected events from that project and is visible on the "
"Jobs admin page where you can watch its log stream."
),
"form": form,
"action_name": "run_tracking_on_events",
"action_checkbox_name": admin.helpers.ACTION_CHECKBOX_NAME,
"opts": self.model._meta,
},
)

list_filter = ("deployment", "project", "start")
actions = [update_calculated_fields]
actions = [update_calculated_fields, run_tracking_on_events]


@admin.register(SourceImage)
Expand Down Expand Up @@ -668,10 +768,109 @@ def run_small_size_filter(self, request: HttpRequest, queryset: QuerySet[SourceI

self.message_user(request, f"Queued Small Size Filter for {queryset.count()} capture set(s). Jobs: {jobs}")

@admin.action(description="Run Occurrence Tracking on selected capture sets")
def run_tracking(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]):
from django.contrib import messages
from django.template.response import TemplateResponse

from ami.main.models import Event
from ami.ml.post_processing.admin_forms import TrackingActionForm

# Superuser-only: queues background jobs and exposes tunables that change
# determination scoring across an event. Mirrors EventAdmin.run_tracking_on_events.
if not request.user.is_superuser:
self.message_user(request, "Only superusers can trigger tracking jobs.", level=messages.ERROR)
return None

# Aggregate Event queryset across all selected collections; the form uses this
# to scope the feature-extraction-algorithm dropdown.
events_qs = Event.objects.filter(captures__collections__in=queryset).distinct()

if request.POST.get("confirm"):
form = TrackingActionForm(request.POST, events=events_qs)
if not form.is_valid():
return TemplateResponse(
request,
"admin/main/tracking_confirmation.html",
{
**self.admin_site.each_context(request),
"title": "Run Occurrence Tracking",
"queryset": queryset,
"scope_label": f"{queryset.count()} capture set(s)",
"scope_summary": (
"One Job is enqueued per capture set. Each Job tracks every "
"event whose images belong to the set and is visible on the "
"Jobs admin page where you can watch its log stream."
),
"form": form,
"action_name": "run_tracking",
"action_checkbox_name": admin.helpers.ACTION_CHECKBOX_NAME,
"opts": self.model._meta,
},
)

config = form.to_config()
jobs = []
empty_collections: list[int] = []
for collection in queryset:
event_ids = list(
Event.objects.filter(captures__collections=collection)
.values_list("pk", flat=True)
.distinct()
.order_by("pk")
)
if not event_ids:
empty_collections.append(collection.pk)
continue
job = Job.objects.create(
name=f"Post-processing: Tracking on Capture Set {collection.pk}",
project=collection.project,
source_image_collection=collection,
job_type_key="post_processing",
params={
"task": "tracking",
"config": {**config, "event_ids": event_ids},
},
)
job.enqueue()
jobs.append(job.pk)

if empty_collections:
self.message_user(
request,
f"Skipped {len(empty_collections)} capture set(s) with no events: {empty_collections}.",
level=messages.WARNING,
)
self.message_user(request, f"Queued Tracking for {len(jobs)} capture set(s). Jobs: {jobs}")
return None

# GET / first POST without confirm — render the intermediate page.
form = TrackingActionForm(events=events_qs)
return TemplateResponse(
request,
"admin/main/tracking_confirmation.html",
{
**self.admin_site.each_context(request),
"title": "Run Occurrence Tracking",
"queryset": queryset,
"scope_label": f"{queryset.count()} capture set(s)",
"scope_summary": (
"One Job is enqueued per capture set. Each Job tracks every "
"event whose images belong to the set and is visible on the "
"Jobs admin page where you can watch its log stream."
),
"form": form,
"action_name": "run_tracking",
"action_checkbox_name": admin.helpers.ACTION_CHECKBOX_NAME,
"opts": self.model._meta,
},
)

actions = [
populate_collection,
populate_collection_async,
run_small_size_filter,
run_tracking,
]

# Hide images many-to-many field from form. This would list all source images in the database.
Expand Down
16 changes: 16 additions & 0 deletions ami/main/migrations/0084_add_pgvector_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("main", "0083_dedupe_taxalist_names"),
]

operations = [
migrations.RunSQL(
sql="CREATE EXTENSION IF NOT EXISTS vector;",
# No-op on reverse: the extension may be shared with other features/databases,
# and dropping it can be restricted in some hosted environments.
reverse_sql=migrations.RunSQL.noop,
),
]
20 changes: 20 additions & 0 deletions ami/main/migrations/0085_classification_features_2048.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from django.db import migrations
import pgvector.django.vector


class Migration(migrations.Migration):
dependencies = [
("main", "0084_add_pgvector_extension"),
]

operations = [
migrations.AddField(
model_name="classification",
name="features_2048",
field=pgvector.django.vector.VectorField(
dimensions=2048,
null=True,
help_text="Feature embedding from the model backbone",
),
),
]
23 changes: 23 additions & 0 deletions ami/main/migrations/0086_detection_next_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("main", "0085_classification_features_2048"),
]

operations = [
migrations.AddField(
model_name="detection",
name="next_detection",
field=models.OneToOneField(
blank=True,
help_text="The detection that follows this one in the tracking sequence.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="previous_detection",
to="main.detection",
),
),
]
29 changes: 26 additions & 3 deletions ami/main/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from io import BytesIO
from typing import Final, final # noqa: F401

import pgvector.django
import PIL.Image
import pydantic
from django.apps import apps
Expand Down Expand Up @@ -2590,6 +2591,11 @@ class Classification(BaseModel):
null=True,
help_text="The probabilities the model, calibrated by the model maker, likely the softmax output",
)
features_2048 = pgvector.django.VectorField(
dimensions=2048,
null=True,
help_text="Feature embedding from the model backbone",
)
category_map = models.ForeignKey("ml.AlgorithmCategoryMap", on_delete=models.PROTECT, null=True)

algorithm = models.ForeignKey(
Expand Down Expand Up @@ -2784,6 +2790,15 @@ class Detection(BaseModel):

similarity_vector = models.JSONField(null=True, blank=True)

next_detection = models.OneToOneField(
"self",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="previous_detection",
help_text="The detection that follows this one in the tracking sequence.",
)

# For type hints
classifications: models.QuerySet["Classification"]
source_image_id: int
Expand Down Expand Up @@ -3363,9 +3378,17 @@ def update_occurrence_determination(
new_score = top_identification.score
elif not top_identification:
top_prediction = occurrence.best_prediction
if top_prediction and top_prediction.taxon and top_prediction.taxon != current_determination:
new_determination = top_prediction.taxon
new_score = top_prediction.score
if top_prediction and top_prediction.taxon:
if top_prediction.taxon != current_determination:
new_determination = top_prediction.taxon
new_score = top_prediction.score
elif top_prediction.score != occurrence.determination_score:
# Taxon unchanged but a higher-scoring classification has appeared
# for the same taxon (e.g. tracking merged a new detection into the
# chain whose top species classification scored higher than the
# keeper's). Refresh the score so determination_score reflects the
# best evidence available across the occurrence's detections.
new_score = top_prediction.score

if new_determination and new_determination != current_determination:
logger.debug(f"Changing det. of {occurrence} from {current_determination} to {new_determination}")
Expand Down
Loading
Loading