Skip to main content

tp_lib_core/io/csv/
detections.rs

1//! CSV parser for punctual & linear detections (T009).
2//!
3//! See `specs/004-train-detections/contracts/detections-csv.md`.
4
5use std::collections::BTreeMap;
6use std::path::Path;
7
8use chrono::{DateTime, FixedOffset};
9
10use crate::detections::error::DetectionError;
11use crate::models::{
12    Detection, DetectionKind, GeographicLocation, LinearDetection, PunctualDetection,
13    TopologicalLocation,
14};
15
16/// Reserved (kind-specific) column names. All other columns become metadata.
17const PUNCTUAL_RESERVED: &[&str] = &[
18    "timestamp",
19    "netelement_id",
20    "intrinsic",
21    "lat",
22    "lon",
23    "crs",
24    "id",
25    "source",
26];
27
28const LINEAR_RESERVED: &[&str] = &[
29    "t_from",
30    "t_to",
31    "netelement_id",
32    "start_intrinsic",
33    "end_intrinsic",
34    "id",
35    "source",
36];
37
38/// Load detections from a CSV file.
39pub fn load(path: &Path, expected_kind: DetectionKind) -> Result<Vec<Detection>, DetectionError> {
40    let source_file = path.display().to_string();
41    let text = std::fs::read_to_string(path)?;
42    load_str(&text, &source_file, expected_kind)
43}
44
45/// In-memory variant of [`load`] that accepts the full CSV text. Required by
46/// the .NET bindings (FR-012, no temp files).
47pub fn load_str(
48    text: &str,
49    source_file: &str,
50    expected_kind: DetectionKind,
51) -> Result<Vec<Detection>, DetectionError> {
52    let mut rdr = csv::ReaderBuilder::new()
53        .has_headers(true)
54        .flexible(false)
55        .from_reader(text.as_bytes());
56
57    let headers: Vec<String> = rdr
58        .headers()
59        .map_err(|e| DetectionError::InvalidSchema(format!("failed to read CSV header: {e}")))?
60        .iter()
61        .map(|s| s.trim_start_matches('\u{feff}').to_string())
62        .collect();
63
64    match expected_kind {
65        DetectionKind::Punctual => parse_punctual(&mut rdr, &headers, source_file),
66        DetectionKind::Linear => parse_linear(&mut rdr, &headers, source_file),
67    }
68}
69
70fn require_columns(headers: &[String], required: &[&str]) -> Result<(), DetectionError> {
71    for col in required {
72        if !headers.iter().any(|h| h == col) {
73            return Err(DetectionError::InvalidSchema(format!(
74                "missing required column '{col}'"
75            )));
76        }
77    }
78    Ok(())
79}
80
81fn col<'a>(headers: &'a [String], record: &'a csv::StringRecord, name: &str) -> Option<&'a str> {
82    let idx = headers.iter().position(|h| h == name)?;
83    let v = record.get(idx)?;
84    let v = v.trim();
85    if v.is_empty() {
86        None
87    } else {
88        Some(v)
89    }
90}
91
92fn parse_timestamp(
93    s: &str,
94    source_file: &str,
95    source_row: usize,
96) -> Result<DateTime<FixedOffset>, DetectionError> {
97    crate::temporal::parse_timestamp_flexible_str(s).map_err(|e| DetectionError::InvalidTimestamp {
98        source_file: source_file.to_string(),
99        source_row,
100        message: format!("'{s}': {e}"),
101    })
102}
103
104fn parse_intrinsic(s: &str, source_file: &str, source_row: usize) -> Result<f64, DetectionError> {
105    let v: f64 = s.parse().map_err(|e| DetectionError::Parse {
106        source_file: source_file.to_string(),
107        source_row,
108        message: format!("invalid float '{s}': {e}"),
109    })?;
110    if !(0.0..=1.0).contains(&v) {
111        return Err(DetectionError::InvalidIntrinsic {
112            source_file: source_file.to_string(),
113            source_row,
114            value: v,
115        });
116    }
117    Ok(v)
118}
119
120fn parse_float(
121    s: &str,
122    source_file: &str,
123    source_row: usize,
124    field: &str,
125) -> Result<f64, DetectionError> {
126    s.parse::<f64>().map_err(|e| DetectionError::Parse {
127        source_file: source_file.to_string(),
128        source_row,
129        message: format!("invalid float for '{field}': '{s}': {e}"),
130    })
131}
132
133fn collect_metadata(
134    headers: &[String],
135    record: &csv::StringRecord,
136    reserved: &[&str],
137) -> BTreeMap<String, String> {
138    let mut map = BTreeMap::new();
139    for (idx, name) in headers.iter().enumerate() {
140        if reserved.iter().any(|r| r == name) {
141            continue;
142        }
143        if let Some(v) = record.get(idx) {
144            let v = v.trim();
145            if !v.is_empty() {
146                map.insert(name.clone(), v.to_string());
147            }
148        }
149    }
150    map
151}
152
153fn parse_punctual<R: std::io::Read>(
154    rdr: &mut csv::Reader<R>,
155    headers: &[String],
156    source_file: &str,
157) -> Result<Vec<Detection>, DetectionError> {
158    require_columns(headers, &["timestamp"])?;
159
160    let mut out = Vec::new();
161    for (row_idx, result) in rdr.records().enumerate() {
162        let record = result.map_err(|e| DetectionError::Parse {
163            source_file: source_file.to_string(),
164            source_row: row_idx + 2, // header is row 1
165            message: format!("CSV read error: {e}"),
166        })?;
167        let source_row = row_idx + 2;
168
169        let timestamp_str =
170            col(headers, &record, "timestamp").ok_or_else(|| DetectionError::InvalidTimestamp {
171                source_file: source_file.to_string(),
172                source_row,
173                message: "empty timestamp".to_string(),
174            })?;
175        let timestamp = parse_timestamp(timestamp_str, source_file, source_row)?;
176
177        let netelement_id = col(headers, &record, "netelement_id").map(str::to_string);
178        let lat = col(headers, &record, "lat");
179        let lon = col(headers, &record, "lon");
180        let crs = col(headers, &record, "crs");
181
182        let has_topo = netelement_id.is_some();
183        let has_coord = lat.is_some() || lon.is_some();
184
185        if has_topo && has_coord {
186            return Err(DetectionError::InvalidSchema(format!(
187                "row {source_row}: cannot specify both 'netelement_id' and 'lat'/'lon'"
188            )));
189        }
190        if !has_topo && !has_coord {
191            return Err(DetectionError::InvalidSchema(format!(
192                "row {source_row}: must specify either 'netelement_id' or 'lat'+'lon'+'crs'"
193            )));
194        }
195
196        let intrinsic_value = match col(headers, &record, "intrinsic") {
197            Some(s) => Some(parse_intrinsic(s, source_file, source_row)?),
198            None => None,
199        };
200
201        let location = netelement_id.as_ref().map(|ne_id| TopologicalLocation {
202            netelement_id: ne_id.clone(),
203            intrinsic: intrinsic_value.unwrap_or(0.5),
204        });
205
206        let coordinates = if has_coord {
207            let lat_s = lat.ok_or_else(|| {
208                DetectionError::InvalidSchema(format!("row {source_row}: missing 'lat'"))
209            })?;
210            let lon_s = lon.ok_or_else(|| {
211                DetectionError::InvalidSchema(format!("row {source_row}: missing 'lon'"))
212            })?;
213            let crs_s = crs.ok_or(DetectionError::MissingCrs {
214                source_file: source_file.to_string(),
215                source_row,
216            })?;
217            Some(GeographicLocation {
218                latitude: parse_float(lat_s, source_file, source_row, "lat")?,
219                longitude: parse_float(lon_s, source_file, source_row, "lon")?,
220                crs: crs_s.to_string(),
221            })
222        } else {
223            None
224        };
225
226        let id = col(headers, &record, "id").map(str::to_string);
227        let source = col(headers, &record, "source").map(str::to_string);
228        let metadata = collect_metadata(headers, &record, PUNCTUAL_RESERVED);
229
230        out.push(Detection::Punctual(PunctualDetection {
231            timestamp,
232            location,
233            coordinates,
234            intrinsic: intrinsic_value,
235            id,
236            source,
237            source_file: source_file.to_string(),
238            source_row,
239            metadata,
240        }));
241    }
242    Ok(out)
243}
244
245fn parse_linear<R: std::io::Read>(
246    rdr: &mut csv::Reader<R>,
247    headers: &[String],
248    source_file: &str,
249) -> Result<Vec<Detection>, DetectionError> {
250    require_columns(headers, &["t_from", "t_to", "netelement_id"])?;
251
252    let mut out = Vec::new();
253    for (row_idx, result) in rdr.records().enumerate() {
254        let record = result.map_err(|e| DetectionError::Parse {
255            source_file: source_file.to_string(),
256            source_row: row_idx + 2,
257            message: format!("CSV read error: {e}"),
258        })?;
259        let source_row = row_idx + 2;
260
261        let t_from_s =
262            col(headers, &record, "t_from").ok_or_else(|| DetectionError::InvalidTimestamp {
263                source_file: source_file.to_string(),
264                source_row,
265                message: "empty t_from".to_string(),
266            })?;
267        let t_to_s =
268            col(headers, &record, "t_to").ok_or_else(|| DetectionError::InvalidTimestamp {
269                source_file: source_file.to_string(),
270                source_row,
271                message: "empty t_to".to_string(),
272            })?;
273        let t_from = parse_timestamp(t_from_s, source_file, source_row)?;
274        let t_to = parse_timestamp(t_to_s, source_file, source_row)?;
275
276        let netelement_id = col(headers, &record, "netelement_id")
277            .ok_or_else(|| {
278                DetectionError::InvalidSchema(format!("row {source_row}: empty 'netelement_id'"))
279            })?
280            .to_string();
281
282        let start_intrinsic = match col(headers, &record, "start_intrinsic") {
283            Some(s) => parse_intrinsic(s, source_file, source_row)?,
284            None => 0.0,
285        };
286        let end_intrinsic = match col(headers, &record, "end_intrinsic") {
287            Some(s) => parse_intrinsic(s, source_file, source_row)?,
288            None => 1.0,
289        };
290
291        let id = col(headers, &record, "id").map(str::to_string);
292        let source = col(headers, &record, "source").map(str::to_string);
293        let metadata = collect_metadata(headers, &record, LINEAR_RESERVED);
294
295        out.push(Detection::Linear(LinearDetection {
296            t_from,
297            t_to,
298            netelement_id,
299            start_intrinsic,
300            end_intrinsic,
301            id,
302            source,
303            source_file: source_file.to_string(),
304            source_row,
305            metadata,
306        }));
307    }
308    Ok(out)
309}