import csv import io import pandas as pd from datetime import datetime, timedelta CANONICAL_COLS = ["idx", "speed", "std_dev", "energy", "power_factor", "time"] TIME_FORMATS = ["%H:%M:%S.%f", "%H:%M:%S", "%H:%M:%S,%f"] def parse_csv(stream) -> pd.DataFrame: raw = stream.read() if isinstance(raw, bytes): raw = raw.decode("utf-8-sig") # Strip BOM characters that may appear anywhere in the file raw = raw.replace("\ufeff", "") data_rows = [] for line in raw.splitlines(): fields = _split_line(line) if len(fields) >= 6 and _is_index(fields[0]) and _is_time(fields[5]): data_rows.append(fields[:6]) if len(data_rows) < 2: raise ValueError( "Could not find valid data rows in the CSV. " "Expected rows with: integer index, 4 numeric values, and a time (HH:MM:SS)." ) df = pd.DataFrame(data_rows, columns=CANONICAL_COLS) for col in ("speed", "std_dev", "energy", "power_factor"): df[col] = _parse_numeric(df[col]) df["time"] = _parse_time_column(df["time"]) df = df.sort_values("time").reset_index(drop=True) return df[["speed", "std_dev", "energy", "power_factor", "time"]] def _split_line(line: str) -> list: """Parse one CSV line, respecting quoted fields.""" for row in csv.reader([line], quotechar='"', doublequote=True, skipinitialspace=True): return [f.strip() for f in row] return [] def _is_index(val: str) -> bool: """True if the value is a non-negative integer (auto-increment row index).""" try: return int(val.strip()) >= 0 except (ValueError, AttributeError): return False def _is_time(val: str) -> bool: """True if the value parses as HH:MM:SS or HH:MM:SS.fff.""" cleaned = val.strip() for fmt in TIME_FORMATS: try: datetime.strptime(cleaned, fmt) return True except ValueError: continue return False def _parse_numeric(col: pd.Series) -> pd.Series: """Parse a numeric column, accepting both '.' and ',' as decimal separator.""" result = pd.to_numeric(col, errors="coerce") if result.isna().any(): result = pd.to_numeric( col.astype(str).str.replace(",", ".", regex=False), errors="coerce", ) if result.isna().any(): bad = col[result.isna()].tolist() raise ValueError(f"Non-numeric values in column: {bad}") return result def _parse_time_column(col: pd.Series) -> pd.Series: today = datetime.today().date() cleaned = col.astype(str).str.strip() parsed = None for fmt in TIME_FORMATS: candidate = pd.to_datetime(cleaned, format=fmt, errors="coerce") if candidate.notna().all(): parsed = candidate break if parsed is None: candidate = pd.to_datetime(cleaned, errors="coerce") if candidate.notna().all(): parsed = candidate if parsed is None: raise ValueError( "Could not parse time column. Expected format: HH:MM:SS or HH:MM:SS.fff" ) parsed = parsed.apply(lambda t: datetime.combine(today, t.time())) times = parsed.tolist() for i in range(1, len(times)): if times[i] < times[i - 1]: times[i] += timedelta(days=1) return pd.Series(times, index=col.index)