First commit of claude's rework in django + vanillajs fronted
This commit is contained in:
0
apps/tools/analyzer/__init__.py
Normal file
0
apps/tools/analyzer/__init__.py
Normal file
82
apps/tools/analyzer/charts.py
Normal file
82
apps/tools/analyzer/charts.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import io
|
||||
import base64
|
||||
import matplotlib
|
||||
matplotlib.use("Agg")
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.dates as mdates
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def render_group_charts(groups: list, y_min: float, y_max: float) -> list:
|
||||
padding_fraction = 0.05
|
||||
y_range = y_max - y_min
|
||||
if y_range == 0:
|
||||
y_pad = 1.0
|
||||
else:
|
||||
y_pad = y_range * padding_fraction
|
||||
|
||||
charts = []
|
||||
for i, g in enumerate(groups):
|
||||
fig, ax = plt.subplots(figsize=(9, 4))
|
||||
|
||||
x = g["time"]
|
||||
y = g["speed"]
|
||||
|
||||
ax.plot(x, y, marker="o", linewidth=1.5, markersize=5, color="#1f77b4")
|
||||
|
||||
ax.set_ylim(y_min - y_pad, y_max + y_pad)
|
||||
|
||||
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
|
||||
fig.autofmt_xdate(rotation=30)
|
||||
|
||||
ax.set_title(f"Group {i + 1} — {len(g)} shot(s)")
|
||||
ax.set_xlabel("Time of Day")
|
||||
ax.set_ylabel("Speed")
|
||||
ax.grid(True, alpha=0.3)
|
||||
fig.tight_layout()
|
||||
|
||||
buf = io.BytesIO()
|
||||
fig.savefig(buf, format="png", dpi=100)
|
||||
plt.close(fig)
|
||||
buf.seek(0)
|
||||
charts.append(base64.b64encode(buf.read()).decode("utf-8"))
|
||||
|
||||
return charts
|
||||
|
||||
|
||||
def render_overview_chart(group_stats: list) -> str:
|
||||
"""Dual-axis line chart: avg speed and avg std dev per group."""
|
||||
indices = [s["group_index"] for s in group_stats]
|
||||
speeds = [s["mean_speed"] for s in group_stats]
|
||||
stds = [s["std_speed"] if s["std_speed"] is not None else 0.0 for s in group_stats]
|
||||
|
||||
fig, ax1 = plt.subplots(figsize=(7, 3))
|
||||
|
||||
color_speed = "#1f77b4"
|
||||
color_std = "#d62728"
|
||||
|
||||
ax1.plot(indices, speeds, marker="o", linewidth=1.8, markersize=5,
|
||||
color=color_speed, label="Avg speed")
|
||||
ax1.set_xlabel("Group")
|
||||
ax1.set_ylabel("Avg speed", color=color_speed)
|
||||
ax1.tick_params(axis="y", labelcolor=color_speed)
|
||||
ax1.set_xticks(indices)
|
||||
|
||||
ax2 = ax1.twinx()
|
||||
ax2.plot(indices, stds, marker="s", linewidth=1.8, markersize=5,
|
||||
color=color_std, linestyle="--", label="Avg std dev")
|
||||
ax2.set_ylabel("Avg std dev", color=color_std)
|
||||
ax2.tick_params(axis="y", labelcolor=color_std)
|
||||
|
||||
lines1, labels1 = ax1.get_legend_handles_labels()
|
||||
lines2, labels2 = ax2.get_legend_handles_labels()
|
||||
ax1.legend(lines1 + lines2, labels1 + labels2, fontsize=8, loc="upper right")
|
||||
|
||||
ax1.grid(True, alpha=0.3)
|
||||
fig.tight_layout()
|
||||
|
||||
buf = io.BytesIO()
|
||||
fig.savefig(buf, format="png", dpi=100)
|
||||
plt.close(fig)
|
||||
buf.seek(0)
|
||||
return base64.b64encode(buf.read()).decode("utf-8")
|
||||
60
apps/tools/analyzer/grouper.py
Normal file
60
apps/tools/analyzer/grouper.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from datetime import timedelta
|
||||
import pandas as pd
|
||||
|
||||
OUTLIER_FACTOR = 5
|
||||
|
||||
|
||||
def detect_groups(df: pd.DataFrame, outlier_factor: float = OUTLIER_FACTOR,
|
||||
manual_splits: list | None = None,
|
||||
forced_splits: list | None = None) -> list:
|
||||
"""Split shots into groups.
|
||||
|
||||
forced_splits: when provided, ONLY these split positions are used — auto-detection
|
||||
is bypassed entirely. Use this for user-defined groupings from the visual editor.
|
||||
|
||||
manual_splits: added on top of auto-detected splits (when forced_splits is None).
|
||||
Both auto+manual mechanisms are merged and deduplicated.
|
||||
"""
|
||||
if len(df) <= 1:
|
||||
return [df]
|
||||
|
||||
def _build_groups(all_splits):
|
||||
if not all_splits:
|
||||
return [df]
|
||||
groups = []
|
||||
prev = 0
|
||||
for pos in all_splits:
|
||||
group = df.iloc[prev:pos]
|
||||
if len(group) > 0:
|
||||
groups.append(group.reset_index(drop=True))
|
||||
prev = pos
|
||||
last = df.iloc[prev:]
|
||||
if len(last) > 0:
|
||||
groups.append(last.reset_index(drop=True))
|
||||
return groups
|
||||
|
||||
# Forced mode: user controls exact split positions, no auto-detection
|
||||
if forced_splits is not None:
|
||||
valid = sorted(s for s in forced_splits if 0 < s < len(df))
|
||||
return _build_groups(valid)
|
||||
|
||||
times = df["time"]
|
||||
diffs = times.diff().dropna()
|
||||
|
||||
if diffs.empty:
|
||||
return [df]
|
||||
|
||||
median_gap = diffs.median()
|
||||
|
||||
# Auto-detect splits based on time gaps
|
||||
auto_splits: set[int] = set()
|
||||
if median_gap != timedelta(0):
|
||||
threshold = outlier_factor * median_gap
|
||||
for idx, gap in diffs.items():
|
||||
if gap > threshold:
|
||||
pos = df.index.get_loc(idx)
|
||||
auto_splits.add(pos)
|
||||
|
||||
# Merge with manual splits (filter to valid range)
|
||||
extra = set(manual_splits) if manual_splits else set()
|
||||
return _build_groups(sorted(auto_splits | extra))
|
||||
107
apps/tools/analyzer/parser.py
Normal file
107
apps/tools/analyzer/parser.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import csv
|
||||
import io
|
||||
import pandas as pd
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
CANONICAL_COLS = ["idx", "speed", "std_dev", "energy", "power_factor", "time"]
|
||||
TIME_FORMATS = ["%H:%M:%S.%f", "%H:%M:%S", "%H:%M:%S,%f"]
|
||||
|
||||
|
||||
def parse_csv(stream) -> pd.DataFrame:
|
||||
raw = stream.read()
|
||||
if isinstance(raw, bytes):
|
||||
raw = raw.decode("utf-8-sig")
|
||||
# Strip BOM characters that may appear anywhere in the file
|
||||
raw = raw.replace("\ufeff", "")
|
||||
|
||||
data_rows = []
|
||||
for line in raw.splitlines():
|
||||
fields = _split_line(line)
|
||||
if len(fields) >= 6 and _is_index(fields[0]) and _is_time(fields[5]):
|
||||
data_rows.append(fields[:6])
|
||||
|
||||
if len(data_rows) < 2:
|
||||
raise ValueError(
|
||||
"Could not find valid data rows in the CSV. "
|
||||
"Expected rows with: integer index, 4 numeric values, and a time (HH:MM:SS)."
|
||||
)
|
||||
|
||||
df = pd.DataFrame(data_rows, columns=CANONICAL_COLS)
|
||||
|
||||
for col in ("speed", "std_dev", "energy", "power_factor"):
|
||||
df[col] = _parse_numeric(df[col])
|
||||
|
||||
df["time"] = _parse_time_column(df["time"])
|
||||
df = df.sort_values("time").reset_index(drop=True)
|
||||
return df[["speed", "std_dev", "energy", "power_factor", "time"]]
|
||||
|
||||
|
||||
def _split_line(line: str) -> list:
|
||||
"""Parse one CSV line, respecting quoted fields."""
|
||||
for row in csv.reader([line], quotechar='"', doublequote=True, skipinitialspace=True):
|
||||
return [f.strip() for f in row]
|
||||
return []
|
||||
|
||||
|
||||
def _is_index(val: str) -> bool:
|
||||
"""True if the value is a non-negative integer (auto-increment row index)."""
|
||||
try:
|
||||
return int(val.strip()) >= 0
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
|
||||
|
||||
def _is_time(val: str) -> bool:
|
||||
"""True if the value parses as HH:MM:SS or HH:MM:SS.fff."""
|
||||
cleaned = val.strip()
|
||||
for fmt in TIME_FORMATS:
|
||||
try:
|
||||
datetime.strptime(cleaned, fmt)
|
||||
return True
|
||||
except ValueError:
|
||||
continue
|
||||
return False
|
||||
|
||||
|
||||
def _parse_numeric(col: pd.Series) -> pd.Series:
|
||||
"""Parse a numeric column, accepting both '.' and ',' as decimal separator."""
|
||||
result = pd.to_numeric(col, errors="coerce")
|
||||
if result.isna().any():
|
||||
result = pd.to_numeric(
|
||||
col.astype(str).str.replace(",", ".", regex=False),
|
||||
errors="coerce",
|
||||
)
|
||||
if result.isna().any():
|
||||
bad = col[result.isna()].tolist()
|
||||
raise ValueError(f"Non-numeric values in column: {bad}")
|
||||
return result
|
||||
|
||||
|
||||
def _parse_time_column(col: pd.Series) -> pd.Series:
|
||||
today = datetime.today().date()
|
||||
cleaned = col.astype(str).str.strip()
|
||||
|
||||
parsed = None
|
||||
for fmt in TIME_FORMATS:
|
||||
candidate = pd.to_datetime(cleaned, format=fmt, errors="coerce")
|
||||
if candidate.notna().all():
|
||||
parsed = candidate
|
||||
break
|
||||
|
||||
if parsed is None:
|
||||
candidate = pd.to_datetime(cleaned, errors="coerce")
|
||||
if candidate.notna().all():
|
||||
parsed = candidate
|
||||
|
||||
if parsed is None:
|
||||
raise ValueError(
|
||||
"Could not parse time column. Expected format: HH:MM:SS or HH:MM:SS.fff"
|
||||
)
|
||||
|
||||
parsed = parsed.apply(lambda t: datetime.combine(today, t.time()))
|
||||
|
||||
times = parsed.tolist()
|
||||
for i in range(1, len(times)):
|
||||
if times[i] < times[i - 1]:
|
||||
times[i] += timedelta(days=1)
|
||||
return pd.Series(times, index=col.index)
|
||||
95
apps/tools/analyzer/pdf_report.py
Normal file
95
apps/tools/analyzer/pdf_report.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import base64
|
||||
import io
|
||||
from datetime import datetime
|
||||
|
||||
from fpdf import FPDF
|
||||
|
||||
_COL_LABEL = 80
|
||||
_COL_VALUE = 50
|
||||
_ROW_H = 7
|
||||
|
||||
|
||||
def generate_pdf(overall: dict, group_stats: list, charts: list, overview_chart: str) -> bytes:
|
||||
pdf = FPDF()
|
||||
pdf.set_auto_page_break(auto=True, margin=15)
|
||||
|
||||
pdf.add_page()
|
||||
_title_block(pdf)
|
||||
_overall_section(pdf, overall, overview_chart)
|
||||
|
||||
for stat, chart_b64 in zip(group_stats, charts):
|
||||
_group_section(pdf, stat, chart_b64)
|
||||
|
||||
return bytes(pdf.output())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _title_block(pdf: FPDF):
|
||||
pdf.set_font("Helvetica", "B", 18)
|
||||
pdf.cell(0, 12, "Ballistic Analysis Report", new_x="LMARGIN", new_y="NEXT", align="C")
|
||||
pdf.set_font("Helvetica", "", 9)
|
||||
pdf.cell(
|
||||
0, 5,
|
||||
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
|
||||
new_x="LMARGIN", new_y="NEXT", align="C",
|
||||
)
|
||||
pdf.ln(8)
|
||||
|
||||
|
||||
def _overall_section(pdf: FPDF, overall: dict, overview_chart: str):
|
||||
_section_heading(pdf, "Overall Statistics")
|
||||
rows = [
|
||||
("Total shots", str(overall["count"])),
|
||||
("Min speed", f"{overall['min_speed']:.4f}"),
|
||||
("Max speed", f"{overall['max_speed']:.4f}"),
|
||||
("Mean speed", f"{overall['mean_speed']:.4f}"),
|
||||
("Std dev (speed)", f"{overall['std_speed']:.4f}" if overall["std_speed"] is not None else "n/a"),
|
||||
]
|
||||
_table(pdf, rows)
|
||||
img_bytes = base64.b64decode(overview_chart)
|
||||
pdf.image(io.BytesIO(img_bytes), x=pdf.l_margin, w=min(140, pdf.epw))
|
||||
pdf.ln(4)
|
||||
|
||||
|
||||
def _group_section(pdf: FPDF, stat: dict, chart_b64: str):
|
||||
pdf.ln(4)
|
||||
heading = (
|
||||
f"Group {stat['group_index']} - "
|
||||
f"{stat['time_start']} to {stat['time_end']} "
|
||||
f"({stat['count']} shot(s))"
|
||||
)
|
||||
_section_heading(pdf, heading)
|
||||
|
||||
rows = [
|
||||
("Min speed", f"{stat['min_speed']:.4f}"),
|
||||
("Max speed", f"{stat['max_speed']:.4f}"),
|
||||
("Mean speed", f"{stat['mean_speed']:.4f}"),
|
||||
("Std dev (speed)", f"{stat['std_speed']:.4f}" if stat["std_speed"] is not None else "n/a"),
|
||||
]
|
||||
_table(pdf, rows)
|
||||
|
||||
img_bytes = base64.b64decode(chart_b64)
|
||||
# Check remaining page space; add new page if chart won't fit
|
||||
if pdf.get_y() + 75 > pdf.page_break_trigger:
|
||||
pdf.add_page()
|
||||
pdf.image(io.BytesIO(img_bytes), x=pdf.l_margin, w=pdf.epw)
|
||||
pdf.ln(4)
|
||||
|
||||
|
||||
def _section_heading(pdf: FPDF, text: str):
|
||||
pdf.set_font("Helvetica", "B", 12)
|
||||
pdf.set_fill_color(230, 236, 255)
|
||||
pdf.cell(0, 8, text, new_x="LMARGIN", new_y="NEXT", fill=True)
|
||||
pdf.ln(2)
|
||||
|
||||
|
||||
def _table(pdf: FPDF, rows: list):
|
||||
for i, (label, value) in enumerate(rows):
|
||||
fill = i % 2 == 0
|
||||
pdf.set_fill_color(248, 249, 252) if fill else pdf.set_fill_color(255, 255, 255)
|
||||
pdf.set_font("Helvetica", "", 10)
|
||||
pdf.cell(_COL_LABEL, _ROW_H, label, border=0, fill=fill)
|
||||
pdf.set_font("Helvetica", "B", 10)
|
||||
pdf.cell(_COL_VALUE, _ROW_H, value, border=0, fill=fill, new_x="LMARGIN", new_y="NEXT")
|
||||
pdf.ln(3)
|
||||
30
apps/tools/analyzer/stats.py
Normal file
30
apps/tools/analyzer/stats.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def compute_overall_stats(df: pd.DataFrame) -> dict:
|
||||
s = df["speed"]
|
||||
return {
|
||||
"min_speed": s.min(),
|
||||
"max_speed": s.max(),
|
||||
"mean_speed": s.mean(),
|
||||
"std_speed": s.std(ddof=1),
|
||||
"count": len(df),
|
||||
}
|
||||
|
||||
|
||||
def compute_group_stats(groups: list) -> list:
|
||||
result = []
|
||||
for i, g in enumerate(groups):
|
||||
s = g["speed"]
|
||||
std = s.std(ddof=1) if len(g) > 1 else None
|
||||
result.append({
|
||||
"group_index": i + 1,
|
||||
"count": len(g),
|
||||
"min_speed": s.min(),
|
||||
"max_speed": s.max(),
|
||||
"mean_speed": s.mean(),
|
||||
"std_speed": std,
|
||||
"time_start": g["time"].min().strftime("%H:%M:%S"),
|
||||
"time_end": g["time"].max().strftime("%H:%M:%S"),
|
||||
})
|
||||
return result
|
||||
Reference in New Issue
Block a user