Skip to main content

tp_lib_core/io/
rinf.rs

1//! ERA RINF SPARQL retrieval module (feature 006).
2//!
3//! Provides:
4//! - The [`SparqlClient`] trait so production code can hit the real endpoint
5//!   via [`UreqSparqlClient`] while tests inject deterministic fixtures.
6//! - A tiny inline WKT `LINESTRING` parser (avoids pulling a wkt crate).
7//! - Builders for the two SPARQL queries documented in
8//!   `specs/006-download-rinf-topology/research.md`.
9//! - Row-to-core-model mappers that produce [`Netelement`] /
10//!   [`NetRelation`] instances ready for downstream workflows.
11
12use std::time::Duration;
13
14use chrono::NaiveDate;
15use geo::{LineString, Point};
16use serde_json::Value;
17
18use crate::errors::ProjectionError;
19use crate::models::{
20    NetRelation, Netelement, RinfNavigability, RinfNetelementRow, RinfNetrelationRow,
21};
22
23/// Pluggable SPARQL transport — production uses ureq, tests use mocks.
24pub trait SparqlClient: Send + Sync {
25    /// Execute a SPARQL query and return parsed JSON (SPARQL-Results 1.1 shape).
26    fn query(&self, endpoint_url: &str, sparql: &str) -> Result<Value, ProjectionError>;
27}
28
29/// Default blocking SPARQL client backed by [`ureq`].
30pub struct UreqSparqlClient {
31    timeout: Duration,
32}
33
34impl Default for UreqSparqlClient {
35    fn default() -> Self {
36        Self {
37            timeout: Duration::from_secs(60),
38        }
39    }
40}
41
42impl UreqSparqlClient {
43    pub fn new(timeout: Duration) -> Self {
44        Self { timeout }
45    }
46}
47
48impl SparqlClient for UreqSparqlClient {
49    fn query(&self, endpoint_url: &str, sparql: &str) -> Result<Value, ProjectionError> {
50        let agent = ureq::AgentBuilder::new()
51            .timeout(self.timeout)
52            .user_agent("tp-lib/006-rinf-retrieval")
53            .build();
54        let response = agent
55            .post(endpoint_url)
56            .set("Accept", "application/sparql-results+json")
57            .set("Content-Type", "application/sparql-query")
58            .send_string(sparql)
59            .map_err(|e| ProjectionError::RinfRetrievalFailed(format!("HTTP error: {e}")))?;
60        let json: Value = response
61            .into_json()
62            .map_err(|e| ProjectionError::RinfRetrievalFailed(format!("JSON parse error: {e}")))?;
63        Ok(json)
64    }
65}
66
67/// Build the netelements SPARQL query for a given closed WGS84 polygon WKT.
68pub fn build_netelements_query(polygon_wkt: &str) -> String {
69    format!(
70        r#"PREFIX era: <http://data.europa.eu/949/>
71PREFIX gsp: <http://www.opengis.net/ont/geosparql#>
72PREFIX geof: <http://www.opengis.net/def/function/geosparql/>
73PREFIX time: <http://www.w3.org/2006/time#>
74PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
75
76SELECT ?netelement ?netelement_wkt ?valid_from_date ?valid_to_date
77WHERE {{
78  ?netelement a era:LinearElement ;
79              era:validity/time:hasBeginning/time:inXSDDate ?valid_from_date ;
80              gsp:hasGeometry/gsp:asWKT ?netelement_wkt .
81  FILTER(geof:sfIntersects(
82    ?netelement_wkt,
83    "{polygon}"^^gsp:wktLiteral
84  ))
85    OPTIONAL {{
86      ?netelement era:validity/time:hasEnd/time:inXSDDate ?valid_to_date .
87      FILTER (xsd:date(now()) >= ?valid_to_date)
88    }}
89    FILTER (xsd:date(now()) >= ?valid_from_date && !BOUND(?valid_to_date))
90}}"#,
91        polygon = polygon_wkt
92    )
93}
94
95/// Build the netrelations SPARQL query for a list of seed element IRIs.
96pub fn build_netrelations_query(seed_iris: &[String]) -> String {
97    let values = seed_iris
98        .iter()
99        .map(|iri| format!("<{}>", iri))
100        .collect::<Vec<_>>()
101        .join(" ");
102    format!(
103        r#"PREFIX era: <http://data.europa.eu/949/>
104PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
105PREFIX time: <http://www.w3.org/2006/time#>
106
107SELECT ?netrelation ?netelementA ?netelementB ?isOnOriginOfElementA ?isOnOriginOfElementB ?navigability ?valid_from_date ?valid_to_date
108WHERE {{
109  VALUES ?seed_element {{ {values} }}
110  {{
111    BIND(?seed_element AS ?netelementA)
112    ?netrelation a era:NetRelation ;
113                 era:elementA ?netelementA ;
114                 era:elementB ?netelementB ;
115                 era:isOnOriginOfElementA ?isOnOriginOfElementA ;
116                 era:isOnOriginOfElementB ?isOnOriginOfElementB ;
117                 era:navigability ?navigability ;
118                 era:validity/time:hasBeginning/time:inXSDDate ?valid_from_date .
119    OPTIONAL {{
120      ?netrelation era:validity/time:hasEnd/time:inXSDDate ?valid_to_date .
121      FILTER (xsd:date(now()) >= ?valid_to_date)
122    }}
123    FILTER (xsd:date(now()) >= ?valid_from_date && !BOUND(?valid_to_date))
124  }}
125  UNION
126  {{
127    BIND(?seed_element AS ?netelementB)
128    ?netrelation a era:NetRelation ;
129                 era:elementA ?netelementA ;
130                 era:elementB ?netelementB ;
131                 era:isOnOriginOfElementA ?isOnOriginOfElementA ;
132                 era:isOnOriginOfElementB ?isOnOriginOfElementB ;
133                 era:navigability ?navigability ;
134                 era:validity/time:hasBeginning/time:inXSDDate ?valid_from_date .
135    OPTIONAL {{
136      ?netrelation era:validity/time:hasEnd/time:inXSDDate ?valid_to_date .
137      FILTER (xsd:date(now()) >= ?valid_to_date)
138    }}
139    FILTER (xsd:date(now()) >= ?valid_from_date && !BOUND(?valid_to_date))
140  }}
141}}"#
142    )
143}
144
145/// Parse a `LINESTRING(...)` WKT into a [`LineString<f64>`].
146///
147/// Inline parser to avoid pulling a wkt-crate dependency. Accepts upper or
148/// lower-case keyword and any whitespace between tokens. Does NOT handle Z/M.
149pub fn parse_wkt_linestring(wkt: &str) -> Result<LineString<f64>, ProjectionError> {
150    let trimmed = wkt.trim();
151    let upper = trimmed.to_ascii_uppercase();
152    let body = if let Some(rest) = upper.strip_prefix("LINESTRING") {
153        rest
154    } else {
155        return Err(ProjectionError::RinfIncompleteTopology(format!(
156            "WKT is not a LINESTRING: {trimmed}"
157        )));
158    };
159    // Use the original (un-uppercased) string for coordinate parsing — but
160    // since `body` was sliced from `upper`, recompute the same slice on the
161    // original. Easiest: just lowercase numbers are identical in either case.
162    let body = body.trim();
163    let inner = body
164        .strip_prefix('(')
165        .and_then(|s| s.strip_suffix(')'))
166        .ok_or_else(|| {
167            ProjectionError::RinfIncompleteTopology(format!(
168                "Malformed LINESTRING parentheses: {trimmed}"
169            ))
170        })?;
171    let mut coords: Vec<(f64, f64)> = Vec::new();
172    for pair in inner.split(',') {
173        let mut nums = pair.split_whitespace();
174        let lon = nums
175            .next()
176            .and_then(|s| s.parse::<f64>().ok())
177            .ok_or_else(|| {
178                ProjectionError::RinfIncompleteTopology(format!(
179                    "Missing or invalid longitude in WKT: {trimmed}"
180                ))
181            })?;
182        let lat = nums
183            .next()
184            .and_then(|s| s.parse::<f64>().ok())
185            .ok_or_else(|| {
186                ProjectionError::RinfIncompleteTopology(format!(
187                    "Missing or invalid latitude in WKT: {trimmed}"
188                ))
189            })?;
190        coords.push((lon, lat));
191    }
192    if coords.len() < 2 {
193        return Err(ProjectionError::RinfIncompleteTopology(format!(
194            "LINESTRING needs >=2 points: {trimmed}"
195        )));
196    }
197    Ok(LineString::from(coords))
198}
199
200/// Approximate length of a WGS84 LineString in meters (great-circle, equirectangular).
201pub fn linestring_length_meters(ls: &LineString<f64>) -> f64 {
202    let pts: Vec<Point<f64>> = ls.points().collect();
203    let mut total = 0.0;
204    for w in pts.windows(2) {
205        let (a, b) = (w[0], w[1]);
206        let lat_mid = (a.y() + b.y()) / 2.0;
207        let dx = (b.x() - a.x()) * 111_320.0 * lat_mid.to_radians().cos();
208        let dy = (b.y() - a.y()) * 111_320.0;
209        total += (dx * dx + dy * dy).sqrt();
210    }
211    total
212}
213
214/// Extract the IRI tail to use as a stable id.
215fn iri_to_id(iri: &str) -> String {
216    iri.rsplit(['/', '#']).next().unwrap_or(iri).to_string()
217}
218
219fn binding_value<'a>(row: &'a Value, key: &str) -> Option<&'a str> {
220    row.get(key)?.get("value")?.as_str()
221}
222
223fn parse_bool(s: &str) -> bool {
224    matches!(s.trim().to_ascii_lowercase().as_str(), "true" | "1")
225}
226
227fn parse_navigability(iri_or_label: &str) -> RinfNavigability {
228    let tail = iri_to_id(iri_or_label).to_ascii_lowercase();
229    match tail.as_str() {
230        "both" => RinfNavigability::Both,
231        "ab" | "atob" | "anbi" => RinfNavigability::AB,
232        "ba" | "btoa" | "binba" => RinfNavigability::BA,
233        "none" | "non-navigable" => RinfNavigability::None,
234        _ => RinfNavigability::Both,
235    }
236}
237
238/// Parse the SPARQL-JSON response for the netelements query.
239pub fn parse_netelements_response(json: &Value) -> Result<Vec<RinfNetelementRow>, ProjectionError> {
240    let bindings = json
241        .get("results")
242        .and_then(|r| r.get("bindings"))
243        .and_then(|b| b.as_array())
244        .ok_or_else(|| {
245            ProjectionError::RinfRetrievalFailed(
246                "Netelements response missing results.bindings array".to_string(),
247            )
248        })?;
249
250    let mut out = Vec::with_capacity(bindings.len());
251    for row in bindings {
252        let iri = binding_value(row, "netelement").ok_or_else(|| {
253            ProjectionError::RinfRetrievalFailed("Missing ?netelement binding".to_string())
254        })?;
255        let wkt = binding_value(row, "netelement_wkt").ok_or_else(|| {
256            ProjectionError::RinfRetrievalFailed("Missing ?netelement_wkt binding".to_string())
257        })?;
258        let ls = parse_wkt_linestring(wkt)?;
259        let count = ls.coords().count();
260        let length = linestring_length_meters(&ls);
261        out.push(RinfNetelementRow {
262            netelement_iri: iri.to_string(),
263            netelement_id: iri_to_id(iri),
264            wkt: wkt.to_string(),
265            geometry_point_count: count,
266            length_meters: length,
267        });
268    }
269    Ok(out)
270}
271
272/// Parse the SPARQL-JSON response for the netrelations query.
273pub fn parse_netrelations_response(
274    json: &Value,
275) -> Result<Vec<RinfNetrelationRow>, ProjectionError> {
276    let bindings = json
277        .get("results")
278        .and_then(|r| r.get("bindings"))
279        .and_then(|b| b.as_array())
280        .ok_or_else(|| {
281            ProjectionError::RinfRetrievalFailed(
282                "Netrelations response missing results.bindings array".to_string(),
283            )
284        })?;
285    let today = chrono::Utc::now().date_naive();
286    let mut out = Vec::with_capacity(bindings.len());
287    for row in bindings {
288        let iri = binding_value(row, "netrelation").ok_or_else(|| {
289            ProjectionError::RinfRetrievalFailed("Missing ?netrelation binding".to_string())
290        })?;
291        let a = binding_value(row, "netelementA").ok_or_else(|| {
292            ProjectionError::RinfRetrievalFailed("Missing ?netelementA binding".to_string())
293        })?;
294        let b = binding_value(row, "netelementB").ok_or_else(|| {
295            ProjectionError::RinfRetrievalFailed("Missing ?netelementB binding".to_string())
296        })?;
297        let on_a = binding_value(row, "isOnOriginOfElementA")
298            .map(parse_bool)
299            .unwrap_or(false);
300        let on_b = binding_value(row, "isOnOriginOfElementB")
301            .map(parse_bool)
302            .unwrap_or(false);
303        let nav = binding_value(row, "navigability")
304            .map(parse_navigability)
305            .unwrap_or(RinfNavigability::Both);
306        let valid_on_date = binding_value(row, "valid_from_date")
307            .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
308            .unwrap_or(today);
309        out.push(RinfNetrelationRow {
310            netrelation_iri: iri.to_string(),
311            element_a_id: iri_to_id(a),
312            element_b_id: iri_to_id(b),
313            is_on_origin_of_element_a: on_a,
314            is_on_origin_of_element_b: on_b,
315            navigability: nav,
316            valid_on_date,
317        });
318    }
319    Ok(out)
320}
321
322/// Map parsed netelement rows to core [`Netelement`] structs.
323///
324/// Returns the netelements plus a parallel `(id, length_meters, point_count)`
325/// vector used by the validator to detect coarse geometries.
326#[allow(clippy::type_complexity)]
327pub fn map_netelements_to_core(
328    rows: &[RinfNetelementRow],
329) -> Result<(Vec<Netelement>, Vec<(String, f64, usize)>), ProjectionError> {
330    let mut nes = Vec::with_capacity(rows.len());
331    let mut lengths = Vec::with_capacity(rows.len());
332    for r in rows {
333        let ls = parse_wkt_linestring(&r.wkt)?;
334        let length = linestring_length_meters(&ls);
335        let count = ls.coords().count();
336        let ne = Netelement::new(r.netelement_id.clone(), ls, "EPSG:4326".to_string())?;
337        lengths.push((r.netelement_id.clone(), length, count));
338        nes.push(ne);
339    }
340    Ok((nes, lengths))
341}
342
343/// Map parsed netrelation rows to core [`NetRelation`] structs.
344///
345/// Drops rows whose endpoints don't reference loaded netelements.
346pub fn map_netrelations_to_core(
347    rows: &[RinfNetrelationRow],
348    netelements: &[Netelement],
349) -> Result<Vec<NetRelation>, ProjectionError> {
350    use std::collections::HashSet;
351    let known: HashSet<&str> = netelements.iter().map(|n| n.id.as_str()).collect();
352    let mut out = Vec::with_capacity(rows.len());
353    for r in rows {
354        if !known.contains(r.element_a_id.as_str()) || !known.contains(r.element_b_id.as_str()) {
355            continue;
356        }
357        if r.element_a_id == r.element_b_id {
358            continue;
359        }
360        let (fwd, bwd) = match r.navigability {
361            RinfNavigability::Both => (true, true),
362            RinfNavigability::AB => (true, false),
363            RinfNavigability::BA => (false, true),
364            RinfNavigability::None => (false, false),
365        };
366        let pos_a: u8 = if r.is_on_origin_of_element_a { 0 } else { 1 };
367        let pos_b: u8 = if r.is_on_origin_of_element_b { 0 } else { 1 };
368        let id = iri_to_id(&r.netrelation_iri);
369        let nr = NetRelation::new(
370            id,
371            r.element_a_id.clone(),
372            r.element_b_id.clone(),
373            pos_a,
374            pos_b,
375            fwd,
376            bwd,
377        )?;
378        out.push(nr);
379    }
380    Ok(out)
381}
382
383/// High-level helper: fetch + parse netelements for a search polygon.
384pub fn fetch_netelements(
385    client: &dyn SparqlClient,
386    endpoint_url: &str,
387    polygon_wkt: &str,
388) -> Result<Vec<RinfNetelementRow>, ProjectionError> {
389    let query = build_netelements_query(polygon_wkt);
390    let json = client.query(endpoint_url, &query)?;
391    parse_netelements_response(&json)
392}
393
394/// High-level helper: fetch + parse netrelations for the given seed IRIs.
395pub fn fetch_netrelations(
396    client: &dyn SparqlClient,
397    endpoint_url: &str,
398    seed_iris: &[String],
399) -> Result<Vec<RinfNetrelationRow>, ProjectionError> {
400    if seed_iris.is_empty() {
401        return Ok(Vec::new());
402    }
403    let query = build_netrelations_query(seed_iris);
404    let json = client.query(endpoint_url, &query)?;
405    parse_netrelations_response(&json)
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use serde_json::json;
412    use std::sync::atomic::{AtomicUsize, Ordering};
413
414    #[test]
415    fn wkt_linestring_parses_basic() {
416        let ls = parse_wkt_linestring("LINESTRING(11.0 60.0, 11.1 60.1)").unwrap();
417        assert_eq!(ls.coords().count(), 2);
418    }
419
420    #[test]
421    fn wkt_linestring_rejects_single_point() {
422        assert!(parse_wkt_linestring("LINESTRING(11.0 60.0)").is_err());
423    }
424
425    #[test]
426    fn iri_tail_is_used_as_id() {
427        assert_eq!(
428            iri_to_id("http://data.europa.eu/949/linearElement/SMOKE-A"),
429            "SMOKE-A"
430        );
431    }
432
433    #[test]
434    fn netelements_query_contains_polygon() {
435        let q = build_netelements_query("POLYGON((1 2, 3 4))");
436        assert!(q.contains("POLYGON((1 2, 3 4))"));
437        assert!(q.contains("sfIntersects"));
438    }
439
440    #[test]
441    fn wkt_linestring_rejects_non_linestring() {
442        let err = parse_wkt_linestring("POINT(11.0 60.0)").unwrap_err();
443        assert!(err.to_string().contains("not a LINESTRING"));
444    }
445
446    #[test]
447    fn wkt_linestring_rejects_malformed_coordinates() {
448        let err = parse_wkt_linestring("LINESTRING(11.0, 11.1 60.1)").unwrap_err();
449        assert!(err.to_string().contains("longitude") || err.to_string().contains("latitude"));
450    }
451
452    #[test]
453    fn parse_netelements_response_rejects_missing_bindings() {
454        let err = parse_netelements_response(&json!({"results": {}})).unwrap_err();
455        assert!(err.to_string().contains("results.bindings"));
456    }
457
458    #[test]
459    fn parse_netelements_response_maps_rows() {
460        let input = json!({
461            "results": {
462                "bindings": [
463                    {
464                        "netelement": {"value": "http://example/linearElement/NE-A"},
465                        "netelement_wkt": {"value": "LINESTRING(11.0 60.0, 11.1 60.1)"}
466                    }
467                ]
468            }
469        });
470
471        let rows = parse_netelements_response(&input).unwrap();
472        assert_eq!(rows.len(), 1);
473        assert_eq!(rows[0].netelement_id, "NE-A");
474        assert_eq!(rows[0].geometry_point_count, 2);
475        assert!(rows[0].length_meters > 0.0);
476    }
477
478    #[test]
479    fn parse_netrelations_response_uses_defaults() {
480        let input = json!({
481            "results": {
482                "bindings": [
483                    {
484                        "netrelation": {"value": "http://example/netRelation/NR-1"},
485                        "netelementA": {"value": "http://example/linearElement/NE-A"},
486                        "netelementB": {"value": "http://example/linearElement/NE-B"}
487                    }
488                ]
489            }
490        });
491
492        let rows = parse_netrelations_response(&input).unwrap();
493        assert_eq!(rows.len(), 1);
494        assert_eq!(rows[0].element_a_id, "NE-A");
495        assert_eq!(rows[0].element_b_id, "NE-B");
496        assert_eq!(rows[0].navigability, RinfNavigability::Both);
497        assert!(!rows[0].is_on_origin_of_element_a);
498        assert!(!rows[0].is_on_origin_of_element_b);
499    }
500
501    #[test]
502    fn map_netrelations_to_core_filters_unknown_and_self_loops() {
503        let ne_a = Netelement::new(
504            "NE-A".to_string(),
505            parse_wkt_linestring("LINESTRING(11.0 60.0, 11.1 60.1)").unwrap(),
506            "EPSG:4326".to_string(),
507        )
508        .unwrap();
509        let ne_b = Netelement::new(
510            "NE-B".to_string(),
511            parse_wkt_linestring("LINESTRING(11.1 60.1, 11.2 60.2)").unwrap(),
512            "EPSG:4326".to_string(),
513        )
514        .unwrap();
515
516        let rows = vec![
517            RinfNetrelationRow {
518                netrelation_iri: "http://example/netRelation/NR-valid".to_string(),
519                element_a_id: "NE-A".to_string(),
520                element_b_id: "NE-B".to_string(),
521                is_on_origin_of_element_a: true,
522                is_on_origin_of_element_b: false,
523                navigability: RinfNavigability::AB,
524                valid_on_date: chrono::Utc::now().date_naive(),
525            },
526            RinfNetrelationRow {
527                netrelation_iri: "http://example/netRelation/NR-unknown".to_string(),
528                element_a_id: "NE-A".to_string(),
529                element_b_id: "NE-X".to_string(),
530                is_on_origin_of_element_a: true,
531                is_on_origin_of_element_b: false,
532                navigability: RinfNavigability::Both,
533                valid_on_date: chrono::Utc::now().date_naive(),
534            },
535            RinfNetrelationRow {
536                netrelation_iri: "http://example/netRelation/NR-self".to_string(),
537                element_a_id: "NE-A".to_string(),
538                element_b_id: "NE-A".to_string(),
539                is_on_origin_of_element_a: true,
540                is_on_origin_of_element_b: false,
541                navigability: RinfNavigability::Both,
542                valid_on_date: chrono::Utc::now().date_naive(),
543            },
544        ];
545
546        let mapped = map_netrelations_to_core(&rows, &[ne_a, ne_b]).unwrap();
547        assert_eq!(mapped.len(), 1);
548        assert_eq!(mapped[0].id, "NR-valid");
549        assert!(mapped[0].navigable_forward);
550        assert!(!mapped[0].navigable_backward);
551    }
552
553    struct CountingClient {
554        calls: AtomicUsize,
555        payload: Value,
556    }
557
558    impl SparqlClient for CountingClient {
559        fn query(&self, _endpoint_url: &str, _sparql: &str) -> Result<Value, ProjectionError> {
560            self.calls.fetch_add(1, Ordering::SeqCst);
561            Ok(self.payload.clone())
562        }
563    }
564
565    #[test]
566    fn fetch_netrelations_with_empty_seeds_short_circuits() {
567        let client = CountingClient {
568            calls: AtomicUsize::new(0),
569            payload: json!({"results": {"bindings": []}}),
570        };
571
572        let out = fetch_netrelations(&client, "https://example.invalid", &[]).unwrap();
573        assert!(out.is_empty());
574        assert_eq!(client.calls.load(Ordering::SeqCst), 0);
575    }
576
577    #[test]
578    fn fetch_netelements_executes_query_and_parses() {
579        let client = CountingClient {
580            calls: AtomicUsize::new(0),
581            payload: json!({
582                "results": {
583                    "bindings": [
584                        {
585                            "netelement": {"value": "http://example/linearElement/NE-A"},
586                            "netelement_wkt": {"value": "LINESTRING(11.0 60.0, 11.1 60.1)"}
587                        }
588                    ]
589                }
590            }),
591        };
592
593        let out = fetch_netelements(
594            &client,
595            "https://example.invalid",
596            "POLYGON((0 0,1 0,1 1,0 1,0 0))",
597        )
598        .unwrap();
599        assert_eq!(out.len(), 1);
600        assert_eq!(out[0].netelement_id, "NE-A");
601        assert_eq!(client.calls.load(Ordering::SeqCst), 1);
602    }
603}