Files
ShooterHub/apps/tools/views.py
2026-04-02 11:24:30 +02:00

320 lines
13 KiB
Python

import datetime
import pandas as pd
from django.http import HttpResponse
from django.shortcuts import get_object_or_404
from rest_framework import parsers, status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from .analyzer.charts import render_group_charts, render_overview_chart
from .analyzer.grouper import detect_groups
from .analyzer.parser import parse_csv
from .analyzer.pdf_report import generate_pdf
from .analyzer.stats import compute_group_stats, compute_overall_stats
from .models import ChronographAnalysis, Shot, ShotGroup
from .permissions import IsOwnerOrUnclaimed
from .serializers import (
ChronographAnalysisDetailSerializer,
ChronographAnalysisListSerializer,
ShotGroupSerializer,
ShotGroupStandaloneSerializer,
ShotSerializer,
)
# ── Standalone ShotGroup ──────────────────────────────────────────────────────
class ShotGroupViewSet(viewsets.ModelViewSet):
"""
Top-level CRUD for ShotGroups that may or may not belong to an analysis.
GET/POST /api/groups/
GET/PATCH/DELETE /api/groups/{id}/
"""
permission_classes = [IsOwnerOrUnclaimed]
serializer_class = ShotGroupStandaloneSerializer
def get_queryset(self):
from django.db.models import Q
user = self.request.user
if not user.is_authenticated:
return ShotGroup.objects.none()
return (
ShotGroup.objects
.filter(
Q(analysis__user=user) |
Q(analysis__isnull=True, user=user)
)
.select_related('analysis', 'ammo_batch__recipe', 'ammo_batch__powder', 'ammo')
.prefetch_related('shots')
.distinct()
)
def perform_create(self, serializer):
serializer.save(user=self.request.user)
def _group_to_df(group):
"""Convert a ShotGroup ORM object to a DataFrame expected by the analyzer."""
shots = list(group.shots.order_by('shot_number'))
if not shots:
return pd.DataFrame(columns=['speed', 'time'])
rows = []
base = datetime.datetime(2000, 1, 1, 0, 0, 0)
for i, shot in enumerate(shots):
rows.append({
'speed': float(shot.velocity_fps),
'time': base + datetime.timedelta(seconds=i),
})
return pd.DataFrame(rows)
# ── ChronographAnalysis ───────────────────────────────────────────────────────
class ChronographAnalysisViewSet(viewsets.ModelViewSet):
"""
Manage chronograph analysis sessions and their nested shot groups / shots.
Standard CRUD on the analysis itself, plus nested actions:
GET/POST .../groups/ list / create shot groups
GET/PATCH/DELETE .../groups/{group_pk}/ manage a single group
GET/POST .../groups/{group_pk}/shots/ list / add individual shots
DELETE .../groups/{group_pk}/shots/{shot_pk}/ remove a single shot
"""
permission_classes = [IsOwnerOrUnclaimed]
pagination_class = None # sidebar needs the full list; pagination handled client-side
def get_queryset(self):
return (
ChronographAnalysis.objects
.prefetch_related('shot_groups__shots')
)
def get_serializer_class(self):
if self.action == 'list':
return ChronographAnalysisListSerializer
return ChronographAnalysisDetailSerializer
def perform_create(self, serializer):
user = self.request.user if self.request.user.is_authenticated else None
serializer.save(user=user)
# ── Shot groups ───────────────────────────────────────────────────────────
@action(detail=True, methods=['get', 'post'], url_path='groups')
def groups(self, request, pk=None):
analysis = self.get_object()
if request.method == 'GET':
qs = analysis.shot_groups.prefetch_related('shots').order_by('order', 'id')
serializer = ShotGroupSerializer(qs, many=True, context={'request': request})
return Response(serializer.data)
serializer = ShotGroupSerializer(
data=request.data,
context={'request': request, 'analysis': analysis},
)
serializer.is_valid(raise_exception=True)
group = serializer.save()
return Response(
ShotGroupSerializer(group, context={'request': request}).data,
status=status.HTTP_201_CREATED,
)
@action(
detail=True,
methods=['get', 'patch', 'delete'],
url_path=r'groups/(?P<group_pk>[^/.]+)',
)
def group_detail(self, request, pk=None, group_pk=None):
analysis = self.get_object()
group = get_object_or_404(ShotGroup, pk=group_pk, analysis=analysis)
if request.method == 'DELETE':
group.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
if request.method == 'GET':
serializer = ShotGroupSerializer(group, context={'request': request})
return Response(serializer.data)
# PATCH
serializer = ShotGroupSerializer(
group,
data=request.data,
partial=True,
context={'request': request, 'analysis': analysis},
)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
# ── Shots ─────────────────────────────────────────────────────────────────
@action(
detail=True,
methods=['get', 'post'],
url_path=r'groups/(?P<group_pk>[^/.]+)/shots',
)
def shots(self, request, pk=None, group_pk=None):
analysis = self.get_object()
group = get_object_or_404(ShotGroup, pk=group_pk, analysis=analysis)
if request.method == 'GET':
serializer = ShotSerializer(group.shots.all(), many=True, context={'request': request})
return Response(serializer.data)
serializer = ShotSerializer(data=request.data, context={'request': request})
serializer.is_valid(raise_exception=True)
shot = serializer.save(group=group)
return Response(
ShotSerializer(shot, context={'request': request}).data,
status=status.HTTP_201_CREATED,
)
@action(
detail=True,
methods=['delete'],
url_path=r'groups/(?P<group_pk>[^/.]+)/shots/(?P<shot_pk>[^/.]+)',
)
def shot_detail(self, request, pk=None, group_pk=None, shot_pk=None):
analysis = self.get_object()
group = get_object_or_404(ShotGroup, pk=group_pk, analysis=analysis)
shot = get_object_or_404(Shot, pk=shot_pk, group=group)
shot.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# ── CSV upload ─────────────────────────────────────────────────────────────
@action(
detail=False,
methods=['post'],
url_path='upload',
parser_classes=[parsers.MultiPartParser],
permission_classes=[IsOwnerOrUnclaimed],
)
def upload(self, request):
"""
Parse a CSV file and create a ChronographAnalysis with auto-detected groups.
Expected CSV columns: idx, speed, std_dev, energy, power_factor, time (HH:MM:SS)
"""
file = request.FILES.get('file')
if not file:
return Response({'detail': 'No file provided.'}, status=status.HTTP_400_BAD_REQUEST)
name = request.data.get('name', '').strip() or file.name.rsplit('.', 1)[0]
notes = request.data.get('notes', '')
date_str = request.data.get('date', '')
velocity_unit = request.data.get('velocity_unit', 'fps') # 'fps' or 'mps'
chrono_type = request.data.get('chrono_type', 'garmin_xero_c1_pro') # future: switch parser
SUPPORTED_CHRONO_TYPES = {'garmin_xero_c1_pro'}
if chrono_type not in SUPPORTED_CHRONO_TYPES:
return Response(
{'detail': f'Unsupported chronograph type: {chrono_type}'},
status=status.HTTP_400_BAD_REQUEST,
)
try:
df = parse_csv(file)
except ValueError as exc:
return Response({'detail': str(exc)}, status=status.HTTP_400_BAD_REQUEST)
groups = detect_groups(df)
# Determine session date
if date_str:
try:
session_date = datetime.date.fromisoformat(date_str)
except ValueError:
session_date = datetime.date.today()
else:
try:
session_date = df['time'].iloc[0].date()
except Exception:
session_date = datetime.date.today()
analysis = ChronographAnalysis.objects.create(
user=request.user if request.user.is_authenticated else None,
name=name,
date=session_date,
notes=notes,
)
for i, gdf in enumerate(groups):
group = ShotGroup.objects.create(
analysis=analysis,
label=f'Group {i + 1}',
order=i,
)
for _, row in gdf.iterrows():
speed = float(row['speed'])
if velocity_unit == 'mps':
speed = speed / 0.3048 # convert m/s → fps
Shot.objects.create(group=group, velocity_fps=speed)
return Response(
ChronographAnalysisListSerializer(analysis).data,
status=status.HTTP_201_CREATED,
)
# ── Charts ─────────────────────────────────────────────────────────────────
@action(detail=True, methods=['get'], url_path='charts')
def charts(self, request, pk=None):
"""Return base64-encoded PNG charts for an analysis."""
analysis = self.get_object()
groups_qs = analysis.shot_groups.prefetch_related('shots').order_by('order', 'id')
groups_dfs = [_group_to_df(g) for g in groups_qs]
groups_dfs = [gdf for gdf in groups_dfs if not gdf.empty]
if not groups_dfs:
return Response({'overview': None, 'groups': []})
group_stats = compute_group_stats(groups_dfs)
all_speeds = [s for gs in group_stats for s in [gs.get('min_speed'), gs.get('max_speed')] if s is not None]
y_min = min(all_speeds) if all_speeds else 0
y_max = max(all_speeds) if all_speeds else 1000
return Response({
'overview': render_overview_chart(group_stats),
'groups': render_group_charts(groups_dfs, y_min, y_max),
})
# ── PDF report ─────────────────────────────────────────────────────────────
@action(detail=True, methods=['get'], url_path='report.pdf')
def report_pdf(self, request, pk=None):
"""Generate and return a PDF analysis report."""
analysis = self.get_object()
groups_qs = analysis.shot_groups.prefetch_related('shots').order_by('order', 'id')
groups_dfs = [_group_to_df(g) for g in groups_qs]
groups_dfs = [gdf for gdf in groups_dfs if not gdf.empty]
if not groups_dfs:
return Response({'detail': 'No shot data available.'}, status=status.HTTP_404_NOT_FOUND)
all_df = pd.concat(groups_dfs, ignore_index=True)
overall_stats = compute_overall_stats(all_df)
group_stats = compute_group_stats(groups_dfs)
all_speeds = [s for gs in group_stats for s in [gs.get('min_speed'), gs.get('max_speed')] if s is not None]
y_min = min(all_speeds) if all_speeds else 0
y_max = max(all_speeds) if all_speeds else 1000
group_charts = render_group_charts(groups_dfs, y_min, y_max)
overview_chart = render_overview_chart(group_stats)
pdf_bytes = generate_pdf(overall_stats, group_stats, group_charts, overview_chart)
safe_name = analysis.name.replace(' ', '_').replace('/', '-')
return HttpResponse(
pdf_bytes,
content_type='application/pdf',
headers={'Content-Disposition': f'attachment; filename="{safe_name}.pdf"'},
)