1use super::{AppState, assets};
4use crate::diff::{
5 DiffInputs, DiffResult, LoadDiffInputs, compute_diff, load_diff_file, stream_common_triples,
6};
7use axum::Json;
8use axum::Router;
9use axum::body::Body;
10use axum::extract::{Query, State};
11use axum::http::{StatusCode, header};
12use axum::response::{Html, IntoResponse, Response};
13use axum::routing::{get, post};
14use oxrdf::{Quad, Term};
15use serde::{Deserialize, Serialize};
16use std::path::PathBuf;
17use std::sync::Arc;
18use tokio_stream::StreamExt as _;
19use tokio_stream::wrappers::ReceiverStream;
20
21pub fn router(state: AppState) -> Router {
22 Router::new()
23 .route("/", get(root))
24 .route("/assets/*path", get(asset))
25 .route("/api/meta", get(meta))
26 .route("/api/rows", get(rows))
27 .route("/api/load", post(load))
28 .with_state(state)
29}
30
31async fn root() -> Response {
32 match assets::lookup("/assets/app/index.html") {
33 Some(a) => Html(std::str::from_utf8(a.bytes).unwrap_or("")).into_response(),
34 None => (StatusCode::NOT_FOUND, "missing").into_response(),
35 }
36}
37
38async fn asset(axum::extract::Path(path): axum::extract::Path<String>) -> Response {
39 let full = format!("/assets/{path}");
40 match assets::lookup(&full) {
41 Some(a) => {
42 let mut resp = Response::new(Body::from(a.bytes));
43 resp.headers_mut()
44 .insert(header::CONTENT_TYPE, a.mime.parse().unwrap());
45 resp
46 }
47 None => (StatusCode::NOT_FOUND, "not found").into_response(),
48 }
49}
50
51#[derive(Serialize)]
52struct StatsDto {
53 a_total: u64,
54 b_total: u64,
55 a_only: u64,
56 b_only: u64,
57 common: u64,
58 a_skipped_bnodes: u64,
59 b_skipped_bnodes: u64,
60}
61
62#[derive(Serialize)]
63struct MetaDto {
64 version: &'static str,
65 loaded: bool,
66 from_diff_file: bool,
67 graph_a: Option<String>,
68 graph_b: Option<String>,
69 stats: Option<StatsDto>,
70 prefixes: Vec<(String, String)>,
71}
72
73async fn meta(State(s): State<AppState>) -> Json<MetaDto> {
74 let guard = s.data.lock().await;
75 match guard.as_ref() {
76 None => Json(MetaDto {
77 version: env!("CARGO_PKG_VERSION"),
78 loaded: false,
79 from_diff_file: false,
80 graph_a: None,
81 graph_b: None,
82 stats: None,
83 prefixes: vec![],
84 }),
85 Some(d) => Json(MetaDto {
86 version: env!("CARGO_PKG_VERSION"),
87 loaded: true,
88 from_diff_file: d.source_a.is_none() && d.source_b.is_none(),
89 graph_a: Some(d.graph_a.as_str().to_string()),
90 graph_b: Some(d.graph_b.as_str().to_string()),
91 stats: Some(StatsDto {
92 a_total: d.stats.a_total,
93 b_total: d.stats.b_total,
94 a_only: d.stats.a_only,
95 b_only: d.stats.b_only,
96 common: d.stats.common,
97 a_skipped_bnodes: d.stats.a_skipped_bnodes,
98 b_skipped_bnodes: d.stats.b_skipped_bnodes,
99 }),
100 prefixes: d.prefixes.clone(),
101 }),
102 }
103}
104
105#[derive(Deserialize)]
106struct RowsQuery {
107 #[serde(default)]
108 include: Option<String>,
109}
110
111#[derive(Serialize)]
112struct ObjectDto<'a> {
113 t: &'a str,
114 v: &'a str,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 dt: Option<&'a str>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 lng: Option<&'a str>,
119}
120
121#[derive(Serialize)]
122struct RowDto<'a> {
123 a: &'a str,
124 s: String,
125 p: &'a str,
126 o: ObjectDto<'a>,
127}
128
129fn write_row<W: std::io::Write>(w: &mut W, action: &str, q: &Quad) -> std::io::Result<()> {
130 let s = match &q.subject {
131 oxrdf::NamedOrBlankNode::NamedNode(n) => n.as_str().to_string(),
132 oxrdf::NamedOrBlankNode::BlankNode(b) => format!("_:{}", b.as_str()),
133 };
134 let p = q.predicate.as_str();
135 let obj = match &q.object {
136 Term::NamedNode(n) => ObjectDto {
137 t: "iri",
138 v: n.as_str(),
139 dt: None,
140 lng: None,
141 },
142 Term::BlankNode(b) => ObjectDto {
143 t: "bnode",
144 v: b.as_str(),
145 dt: None,
146 lng: None,
147 },
148 Term::Literal(l) => {
149 let lng = l.language();
150 let dt = if lng.is_some() {
151 None
152 } else {
153 Some(l.datatype().as_str())
154 };
155 ObjectDto {
156 t: "lit",
157 v: l.value(),
158 dt,
159 lng,
160 }
161 }
162 #[allow(unreachable_patterns)]
163 _ => ObjectDto {
164 t: "iri",
165 v: "",
166 dt: None,
167 lng: None,
168 },
169 };
170 let row = RowDto {
171 a: action,
172 s,
173 p,
174 o: obj,
175 };
176 serde_json::to_writer(&mut *w, &row)?;
177 w.write_all(b"\n")
178}
179
180pub fn render_diff_ndjson(data: &DiffResult) -> Vec<u8> {
182 let mut buf = Vec::with_capacity(256 * 1024);
183 for t in &data.b_only {
184 let _ = write_row(&mut buf, "+", t);
185 }
186 for t in &data.a_only {
187 let _ = write_row(&mut buf, "-", t);
188 }
189 buf
190}
191
192async fn rows(State(s): State<AppState>, Query(q): Query<RowsQuery>) -> Response {
193 let include = q.include.as_deref().unwrap_or("diff").to_string();
194 let data_arc = s.data.lock().await.clone();
195 let Some(data) = data_arc else {
196 return (StatusCode::CONFLICT, "no diff loaded").into_response();
197 };
198
199 let body: Body = match include.as_str() {
200 "diff" => {
201 let (tx, rx) = tokio::sync::mpsc::channel::<Vec<u8>>(128);
202 tokio::task::spawn_blocking(move || {
203 for t in &data.b_only {
204 let mut buf = Vec::with_capacity(256);
205 let _ = write_row(&mut buf, "+", t);
206 if tx.blocking_send(buf).is_err() {
207 return;
208 }
209 }
210 for t in &data.a_only {
211 let mut buf = Vec::with_capacity(256);
212 let _ = write_row(&mut buf, "-", t);
213 if tx.blocking_send(buf).is_err() {
214 return;
215 }
216 }
217 });
218 let stream = ReceiverStream::new(rx)
219 .map(|b| Result::<_, std::io::Error>::Ok(axum::body::Bytes::from(b)));
220 Body::from_stream(stream)
221 }
222 "common" => {
223 if data.source_a.is_none() || data.source_b.is_none() {
224 return (
225 StatusCode::CONFLICT,
226 "common triples unavailable: dataset was loaded from a diff file",
227 )
228 .into_response();
229 }
230 let res = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<u8>> {
231 let (Some(file_a), Some(file_b)) = (data.source_a.clone(), data.source_b.clone())
232 else {
233 return Ok(Vec::new());
234 };
235 let fmt_a = data.format_a;
236 let fmt_b = data.format_b;
237 let mut buf = Vec::with_capacity(256 * 1024);
238 stream_common_triples(&file_a, &file_b, fmt_a, fmt_b, |t| {
239 let q = Quad {
240 subject: t.subject.clone(),
241 predicate: t.predicate.clone(),
242 object: t.object.clone(),
243 graph_name: oxrdf::GraphName::DefaultGraph,
244 };
245 write_row(&mut buf, "=", &q).map_err(anyhow::Error::from)?;
246 Ok(())
247 })?;
248 Ok(buf)
249 })
250 .await;
251 match res {
252 Ok(Ok(b)) => Body::from(b),
253 Ok(Err(e)) => {
254 return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")).into_response();
255 }
256 Err(_) => {
257 return (StatusCode::INTERNAL_SERVER_ERROR, "task panic").into_response();
258 }
259 }
260 }
261 _ => return (StatusCode::BAD_REQUEST, "unknown include").into_response(),
262 };
263
264 let mut resp = Response::new(body);
265 resp.headers_mut().insert(
266 header::CONTENT_TYPE,
267 "application/x-ndjson".parse().unwrap(),
268 );
269 resp
270}
271
272#[derive(Deserialize)]
273struct LoadBody {
274 file_a: Option<PathBuf>,
275 file_b: Option<PathBuf>,
276 diff: Option<PathBuf>,
277 graph_a: Option<String>,
278 graph_b: Option<String>,
279 #[serde(default)]
280 ignore_blank_nodes: bool,
281}
282
283async fn load(State(s): State<AppState>, Json(body): Json<LoadBody>) -> Response {
284 let result: anyhow::Result<DiffResult> = if let Some(diff) = body.diff {
285 let inputs = LoadDiffInputs {
286 diff,
287 format: None,
288 graph_a: body.graph_a,
289 graph_b: body.graph_b,
290 };
291 match tokio::task::spawn_blocking(move || load_diff_file(&inputs)).await {
292 Ok(r) => r,
293 Err(e) => Err(anyhow::anyhow!("task panic: {e}")),
294 }
295 } else if let (Some(a), Some(b)) = (body.file_a, body.file_b) {
296 let inputs = DiffInputs {
297 file_a: a,
298 file_b: b,
299 format_a: None,
300 format_b: None,
301 graph_a: body.graph_a,
302 graph_b: body.graph_b,
303 ignore_blank_nodes: body.ignore_blank_nodes,
304 };
305 match tokio::task::spawn_blocking(move || compute_diff(&inputs)).await {
306 Ok(r) => r,
307 Err(e) => Err(anyhow::anyhow!("task panic: {e}")),
308 }
309 } else {
310 return (StatusCode::BAD_REQUEST, "provide file_a+file_b or diff").into_response();
311 };
312
313 match result {
314 Ok(mut d) => {
315 d.sort_rows();
316 *s.data.lock().await = Some(Arc::new(d));
317 (StatusCode::OK, "ok").into_response()
318 }
319 Err(e) => (StatusCode::BAD_REQUEST, format!("{e:#}")).into_response(),
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate::diff::{DiffInputs, compute_diff};
327 use std::path::PathBuf;
328
329 fn fixtures(name: &str) -> PathBuf {
330 let mut p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
331 p.push("tests");
332 p.push("fixtures");
333 p.push(name);
334 p
335 }
336
337 #[test]
338 fn ndjson_renders_one_line_per_triple() {
339 let inputs = DiffInputs {
340 file_a: fixtures("a.ttl"),
341 file_b: fixtures("b.ttl"),
342 format_a: None,
343 format_b: None,
344 graph_a: None,
345 graph_b: None,
346 ignore_blank_nodes: false,
347 };
348 let d = compute_diff(&inputs).unwrap();
349 let bytes = render_diff_ndjson(&d);
350 let s = std::str::from_utf8(&bytes).unwrap();
351 let lines: Vec<&str> = s.lines().collect();
352 assert_eq!(lines.len() as u64, d.stats.a_only + d.stats.b_only);
353 for line in lines {
354 let v: serde_json::Value = serde_json::from_str(line).unwrap();
355 assert!(matches!(v["a"].as_str(), Some("+") | Some("-")));
356 assert!(v["s"].is_string());
357 assert!(v["p"].is_string());
358 assert!(v["o"].is_object());
359 }
360 }
361}