From 2264647e19c905cffc026dcf89d1e0ef6ecc80f6 Mon Sep 17 00:00:00 2001 From: Iago Bonnici <iago.bonnici@umontpellier.fr> Date: Mon, 10 Mar 2025 12:33:53 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Redirect=20undetermined=20samples?= =?UTF-8?q?=20to=20files.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/dmox.rs | 40 ++++++++++++++++++-- src/config.rs | 4 +- src/dispatcher.rs | 8 ++-- src/lib.rs | 95 +++++++++++++++++++++++++++++------------------ src/writer.rs | 6 +-- 5 files changed, 106 insertions(+), 47 deletions(-) diff --git a/src/bin/dmox.rs b/src/bin/dmox.rs index 8597590..6da01f5 100644 --- a/src/bin/dmox.rs +++ b/src/bin/dmox.rs @@ -1,11 +1,11 @@ //! Main entry point into the CLI. -use std::{fs, path::PathBuf, process, thread::sleep, time::Duration}; +use std::{ffi::OsStr, fs, path::PathBuf, process, thread::sleep, time::Duration}; use clap::Parser; use colored::Colorize; use dmox::{ - config::{Distance, MAX_MODULE_SIZE}, + config::{Distance, UndeterminedPath, MAX_MODULE_SIZE}, Config, }; use snafu::{ensure, ResultExt, Snafu}; @@ -64,6 +64,12 @@ struct Cli { #[arg(long)] samples: PathBuf, + /// Optional output filename stub to collect reads with undetermined sample. + /// For example `--undetermined nomatch` will produce + /// `./nomatch.R1.fq.gz` and `./nomatch.R2.fq.gz`. + #[arg(long)] + undetermined: Option<PathBuf>, + /// Raise to output BX codes as part of demultiplexed fastq identifiers. #[arg( long, @@ -153,6 +159,7 @@ fn main() { } } +#[allow(clippy::too_many_lines)] // Numerous single-line fields. fn run() -> Result<(), Error> { // Parse CLI args. let Cli { @@ -168,6 +175,7 @@ fn run() -> Result<(), Error> { max_distance, schema, samples, + undetermined, bx, rx, qx, @@ -211,9 +219,23 @@ fn run() -> Result<(), Error> { .get() }; - // Prepare output folder. + // Prepare output files/folders. fs::create_dir_all(&samples).with_context(|_| OutputFolderErr { path: &samples })?; + let undetermined = undetermined + .map(|input| -> Result<_, Error> { + Ok(( + input.file_name().map(OsStr::to_os_string).ok_or_else(|| { + ConfigErr { + mess: format!("not a filename stub: {}", input.display()), + } + .build() + })?, + input, + )) + }) + .transpose()?; + let config = Config { module_size, n_modules, @@ -229,6 +251,18 @@ fn run() -> Result<(), Error> { path.push(format!("{id}.{infix}.fq.gz")); path }), + undetermined: undetermined.map(|(stub, base)| { + // (not exactly sure why type inference needs help here, + // but seems related to https://users.rust-lang.org/t/type-inference-fails-for-collection-of-trait-objects/85920) + let u: Box<UndeterminedPath> = Box::new(move |infix| { + let mut stub = stub.clone(); + stub.push("."); + stub.push(infix); + stub.push(".fq.gz"); + base.with_file_name(stub) + }); + u + }), output_bx: bx, output_rx: rx, output_qx: qx, diff --git a/src/config.rs b/src/config.rs index 9a9e0ea..4addf90 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,6 +17,7 @@ pub struct Config { pub schema: PathBuf, pub barcodes_table_file: PathBuf, pub sample: Box<SamplePath>, + pub undetermined: Option<Box<UndeterminedPath>>, pub output_bx: bool, pub output_rx: bool, pub output_qx: bool, @@ -33,7 +34,8 @@ pub enum Distance { Levenshtein, } -type SamplePath = dyn Fn(&str, &str) -> PathBuf; // { (sample id, infix) -> path } +pub type SamplePath = dyn Fn(&str, &str) -> PathBuf; // { (sample id, infix) -> path } +pub type UndeterminedPath = dyn Fn(&str) -> PathBuf; // { infix -> path } // Useful for allocation-free levenshtein distance. pub const MAX_MODULE_SIZE: usize = 6; diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 9a0c8eb..2f186df 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -16,20 +16,20 @@ use std::{ use priority_queue::PriorityQueue; use snafu::Snafu; -use crate::{writer, ContextualizedWriter}; +use crate::{writer, ContextualizedWriter, SampleCode}; /// { sample code ↦ ([R1, R2], number of blocks ready to output) } -pub(crate) type BufferMap = HashMap<usize, ([Vec<u8>; 2], usize)>; +pub(crate) type BufferMap = HashMap<SampleCode, ([Vec<u8>; 2], usize)>; pub(crate) struct Inner<W: Write> { /// Writer threads to dispatch jobs to. pub(crate) workers: Vec<writer::Outer<W>>, /// Writers to sample files to zip/write buffers to. - pub(crate) all_writers: HashMap<usize, [ContextualizedWriter<W>; 2]>, + pub(crate) all_writers: HashMap<SampleCode, [ContextualizedWriter<W>; 2]>, /// Buffers filled by the main thread with the data to be written. pub(crate) buffers: Arc<Mutex<BufferMap>>, /// { Sample code ↦ number of fastq blocks ready for writing } - pub(crate) queue: Arc<Mutex<PriorityQueue<usize, usize>>>, + pub(crate) queue: Arc<Mutex<PriorityQueue<SampleCode, usize>>>, /// Receive writer thread outputs over this channel. pub(crate) writers_output: Receiver<writer::Output<W>>, /// Receive termination signal over this channel. diff --git a/src/lib.rs b/src/lib.rs index 1280265..c0c042a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,9 @@ pub(crate) const MODULE_LETTERS: &[u8; 4] = b"ABCD"; /// One barcode is 4 modules (watch goofy letters ordering!). type Barcode = [usize; 4]; // [A, C, B, D] +/// Sample identifier may be undetermined. +type SampleCode = Option<usize>; + #[allow(clippy::too_many_lines)] // Ongoing experiment. pub fn demultiplex(cf: &Config) -> Result<(), Error> { let tic = Instant::now(); @@ -141,13 +144,20 @@ pub fn demultiplex(cf: &Config) -> Result<(), Error> { let mut counters = IndexMap::new(); //---------------------------------------------------------------------------------------------- - // There are two outputs files per sample, + // There are two outputs files per sample + the undetermined files, // and compressing/writing seems to be the CPU bottleneck here. // For this reason: prepare one writer for every output file, // dedicated to compress/write output in its own file, // then spawn as many threads as required to parallelize these. - println!("Prepare sampler writers."); + println!("Prepare samples writers."); + println!( + " {}", + (cf.sample)("<sample_id>", "{I1,I2,R1,R2}") + .display() + .to_string() + .blue() + ); // Buffers are filled by this main thread, then consumed by writer threads. let mut output_buffers = dispatcher::BufferMap::new(); @@ -160,30 +170,42 @@ pub fn demultiplex(cf: &Config) -> Result<(), Error> { // Monitor to get a sense whether this optimisation is useful. let mut out_allocs_created = 0; + let new_output_writer = |path: PathBuf| -> Result<_, Error> { + // Create writer with zip | disk IO stream. + let file = io::create_file(&path)?; + let writer = BufWriter::with_capacity(cf.writers_capacity, file); + let gz = GzEncoder::new(writer, Compression::new(cf.output_compression_level)); + let writer = ContextualizedWriter::new(path, gz); + Ok(writer) + }; + let mut new_output_counter = || { + ( + OUTPUT_INFIXES.map(|_| { + out_allocs_created += 1; + Vec::new() + }), + 0, + ) + }; for (&code, sample_id) in &samples.map { all_writers.insert( - code, + Some(code), #[allow(unstable_name_collisions)] // Purposedly downgrading from nightly. - OUTPUT_INFIXES.try_map(|infix| -> Result<_, Error> { - // Create writer with zip | disk IO stream. - let path = (cf.sample)(&ful(sample_id), infix); - let file = io::create_file(&path)?; - let writer = BufWriter::with_capacity(cf.writers_capacity, file); - let gz = GzEncoder::new(writer, Compression::new(cf.output_compression_level)); - let writer = ContextualizedWriter::new(path, gz); - Ok(writer) - })?, + OUTPUT_INFIXES + .try_map(|infix| new_output_writer((cf.sample)(&ful(sample_id), infix)))?, ); - output_buffers.insert( - code, - ( - OUTPUT_INFIXES.map(|_| { - out_allocs_created += 1; - Vec::new() - }), - 0, - ), + output_buffers.insert(Some(code), new_output_counter()); + } + // + One for undetermined samples. + if let Some(undetermined) = &cf.undetermined { + println!("Open undetermined output files."); + println!(" {}", undetermined("{R1,R2}").display().to_string().blue()); + all_writers.insert( + None, + #[allow(unstable_name_collisions)] // Purposedly downgrading from nightly. + OUTPUT_INFIXES.try_map(|infix| new_output_writer(undetermined(infix)))?, ); + output_buffers.insert(None, new_output_counter()); } //---------------------------------------------------------------------------------------------- @@ -376,23 +398,27 @@ pub fn demultiplex(cf: &Config) -> Result<(), Error> { //------------------------------------------------------------------------------------------ // Construct output fastq blocks. - let [[i1, i2], [r1, r2]] = if let Some(code) = sample_code { - // Select correct sample to write to. - ensure!( - samples.map.contains_key(&code), - NoSampleErr { - letter: samples.letter, - code, - context: context(i1.1.into()), - } - ); + let [[i1, i2], [r1, r2]] = if let (None, None) = (&cf.undetermined, sample_code) { + // Unclear sample code and no dedicated output file: skip. + [[i1, i2], [r1, r2]].map(|[(a, _), (b, _)]| [a, b]) + } else { + if let Some(code) = sample_code { + ensure!( + samples.map.contains_key(&code), + NoSampleErr { + letter: samples.letter, + code, + context: context(i1.1.into()), + } + ); + } // Then for both reads.. let [[i1, i2], r1r2] = [[i1, i2], [r1, r2]].map(|[(a, _), (b, _)]| [a, b]); // Pick the right output buffers. let mut buffers_lock = output_buffers.lock().expect("poisoned buffers?"); let mut queue_lock = queue.lock().expect("poisoned queue?"); - let (buffers, n_blocks) = buffers_lock.get_mut(&code).unwrap(); + let (buffers, n_blocks) = buffers_lock.get_mut(&sample_code).unwrap(); // Push fastq blocks inside. for (r, buffer) in r1r2.iter().zip(buffers) { macro_rules! w { @@ -430,13 +456,10 @@ pub fn demultiplex(cf: &Config) -> Result<(), Error> { } *n_blocks += 1; // Tell the dispatcher it's ready. - queue_lock.change_priority(&code, *n_blocks); + queue_lock.change_priority(&sample_code, *n_blocks); drop(queue_lock); drop(buffers_lock); [[i1, i2], r1r2] - } else { - // Unclear sample code: can't determine which file to write to: skip. - [[i1, i2], [r1, r2]].map(|[(a, _), (b, _)]| [a, b]) }; //------------------------------------------------------------------------------------------ diff --git a/src/writer.rs b/src/writer.rs index 4580c2d..262eb5a 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -13,13 +13,13 @@ use std::{ use snafu::{ResultExt, Snafu}; -use crate::{io::Context, Block, ContextualizedWriter}; +use crate::{io::Context, Block, ContextualizedWriter, SampleCode}; /// Messages received. pub(crate) enum Input<W: Write> { Write { /// Which sample files to output to. - sample_code: usize, + sample_code: SampleCode, /// Formatted [R1, R2] blocks, ready to be zipped. buffers: [Vec<u8>; 2], /// The corresponding writers to push the buffers within. @@ -38,7 +38,7 @@ pub(crate) enum Output<W: Write> { /// Thread id, useful for the main thread to mark this one ready for a new job. id: usize, /// Sample files just written, useful for the main thread to attribute the writers. - sample_code: usize, + sample_code: SampleCode, /// Consumed, cleared buffers, ready for recycling. buffers: [Vec<u8>; 2], /// These writers finished the job, ready for redispatching. -- GitLab