// Copyright (c) 2021 Weird Constructor // This file is a part of HexoDSP. Released under GPL-3.0-or-later. // See README.md and COPYING for details. use crate::dsp::MAX_BLOCK_SIZE; use ringbuf::{Consumer, Producer, RingBuffer}; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use crate::util::PerfTimer; /// 3 inputs, 3 outputs of signal monitors. pub const MON_SIG_CNT: usize = 6; /// Just some base to determine the monitor buffer sizes. const IMAGINARY_MAX_SAMPLE_RATE: usize = 48000; /// The number of minmax samples to hold. pub const MONITOR_MINMAX_SAMPLES: usize = 160; /// The length in seconds of the MONITOR_MINMAX_SAMPLES const MONITOR_MINMAX_LEN_S: usize = 3; /// The sleep time of the thread that receives monitoring data /// from the backend/audio thread. /// It should be within the time of a frame of the UI thread for /// smooth updates. The maximum is thus about 16ms. /// The processing of the audio buffer is somewhere in the us /// area. const MONITOR_PROC_THREAD_INTERVAL_MS: u64 = 10; // TODO / FIXME: We should recalculate this on the basis of the // real actual sample rate, otherwise the monitor scope // is going to be too fast. /// The number of audio samples over which to calculate /// one min/max sample. Typically something around 750. const MONITOR_INPUT_LEN_PER_SAMPLE: usize = (MONITOR_MINMAX_LEN_S * IMAGINARY_MAX_SAMPLE_RATE) / MONITOR_MINMAX_SAMPLES; /// Maximum number of monitor buffers to hold in the backend. /// Typically there are only 16-32ms of monitor content floating /// around, as the monitor processing thread regularily /// processes the monitor. const MONITOR_BUF_COUNT: usize = // 2 for safety margin 2 * (IMAGINARY_MAX_SAMPLE_RATE / MAX_BLOCK_SIZE); pub struct MonitorBackend { rb_mon_prod: Producer, rb_recycle_con: Consumer, /// Holds enough monitor buffers to hold about 1-2 seconds /// of data. The [MonitorBuf] boxes are written in the /// backend and then sent via [MonitorBackend::rb_mon_prod] to the frontend. /// The frontend then sends the used [MonitorBufPtr] back /// via quick_update_con. unused_monitor_buffers: Vec, } impl MonitorBackend { /// Checks if there are any used monitor buffers to be /// collected. pub fn check_recycle(&mut self) { while let Some(buf) = self.rb_recycle_con.pop() { self.unused_monitor_buffers.push(buf); } } /// Hands out an unused [MonitorBuf] for filling and /// sending to the [MonitorProcessor] thread. pub fn get_unused_mon_buf(&mut self) -> Option { self.unused_monitor_buffers.pop() } /// A helper function for writing tests. /// Returns the number of [MonitorBuf] we can hand out /// until there are none anymore. pub fn count_unused_mon_bufs(&self) -> usize { self.unused_monitor_buffers.len() } /// Sends a [MonitorBuf] to the [MonitorProcessor]. pub fn send_mon_buf(&mut self, buf: MonitorBufPtr) { match self.rb_mon_prod.push(buf) { Ok(_) => (), Err(buf) => self.unused_monitor_buffers.push(buf), } } } /// Implements the logic for min/maxing a single signal channel/line. pub struct MonitorMinMax { /// Index of the signal in the [MonitorBuf] sig_idx: usize, /// A ring buffer of min/max samples, written to by `buf_write_ptr`. buf: [(f32, f32); MONITOR_MINMAX_SAMPLES], /// The pointer/index into `buf` to the next update to write. buf_write_ptr: usize, /// Holds the currently accumulated min/max values and the length /// of so far processed audio rate samples. Once MONITOR_INPUT_LEN_PER_SAMPLE /// is reached, this will be written into `buf`. cur_min_max: (f32, f32, usize), } impl MonitorMinMax { pub fn new(sig_idx: usize) -> Self { Self { sig_idx, buf: [(0.0, 0.0); MONITOR_MINMAX_SAMPLES], buf_write_ptr: 0, cur_min_max: (100.0, -100.0, 0), } } /// Processes a monitoring buffer received from the Backend. /// It returns `true` when a new data point was calculated. pub fn process(&mut self, mon_buf: &mut MonitorBufPtr) -> bool { let mut new_data = false; while let Some(sample) = mon_buf.next_sample_for_signal(self.sig_idx) { self.cur_min_max.0 = self.cur_min_max.0.min(sample); self.cur_min_max.1 = self.cur_min_max.1.max(sample); self.cur_min_max.2 += 1; if self.cur_min_max.2 >= MONITOR_INPUT_LEN_PER_SAMPLE { self.buf[self.buf_write_ptr] = (self.cur_min_max.0, self.cur_min_max.1); new_data = true; self.buf_write_ptr = (self.buf_write_ptr + 1) % self.buf.len(); self.cur_min_max.0 = 100.0; self.cur_min_max.1 = -100.0; self.cur_min_max.2 = 0; } } new_data } } /// Represents a bunch of min/max samples. /// Usually copied from the MonitorProcessor thread /// to the frontend if required. #[derive(Debug, Clone, Copy)] pub struct MinMaxMonitorSamples { samples: [(f32, f32); MONITOR_MINMAX_SAMPLES], buf_ptr: usize, } impl MinMaxMonitorSamples { pub fn new() -> Self { Self { samples: [(0.0, 0.0); MONITOR_MINMAX_SAMPLES], buf_ptr: 0 } } fn copy_from(&mut self, min_max_slice: (usize, &[(f32, f32)])) { self.samples.copy_from_slice(min_max_slice.1); self.buf_ptr = min_max_slice.0; } fn copy_to(&self, sms: &mut MinMaxMonitorSamples) { sms.buf_ptr = self.buf_ptr; sms.samples.copy_from_slice(&self.samples[..]); } /// Gets the sample at the offset relative to the start of the min_max_slice. pub fn at(&self, offs: usize) -> &(f32, f32) { let idx = (self.buf_ptr + offs) % self.samples.len(); &self.samples[idx] } pub fn len(&self) -> usize { MONITOR_MINMAX_SAMPLES } pub fn is_empty(&self) -> bool { false } } impl Default for MinMaxMonitorSamples { fn default() -> Self { Self::new() } } impl std::ops::Index for MinMaxMonitorSamples { type Output = (f32, f32); fn index(&self, idx: usize) -> &Self::Output { &self.at(idx) } } /// The actual frontend API for the MonitorProcessor. /// We start an extra thread for handling monitored signals from the /// MonitorBackend, because we can't guarantee that the UI thread /// is actually started or working. Also because we want to be independent /// of whether a UI is started at all. /// /// Just call [Monitor::get_minmax_monitor_samples] and you will always get /// the most current data. pub struct Monitor { terminate_proc: Arc, proc_thread: Option>, new_data: Arc, monitor_samples: Arc>, monitor_samples_copy: [MinMaxMonitorSamples; MON_SIG_CNT], } impl Monitor { pub fn new( rb_mon_con: Consumer, rb_recycle_prod: Producer, ) -> Self { let terminate_proc = Arc::new(AtomicBool::new(false)); let th_terminate = terminate_proc.clone(); let monitor_samples = Arc::new(Mutex::new([MinMaxMonitorSamples::new(); MON_SIG_CNT])); let th_mon_samples = monitor_samples.clone(); let new_data = Arc::new(AtomicBool::new(false)); let th_new_data = new_data.clone(); let th = std::thread::spawn(move || { let mut proc = MonitorProcessor::new(rb_mon_con, rb_recycle_prod); loop { if th_terminate.load(std::sync::atomic::Ordering::Relaxed) { return; } // let ta = std::time::Instant::now(); proc.process(); // let t0 = std::time::Instant::now().duration_since(ta); if proc.check_new_data() { let mut ms = th_mon_samples.lock().expect("Unpoisoned Lock for monitor_samples"); for i in 0..MON_SIG_CNT { ms[i].copy_from(proc.minmax_slice_for_signal(i)); } th_new_data.store(true, std::sync::atomic::Ordering::Relaxed); } // let ta = std::time::Instant::now().duration_since(ta); // println!("txx Elapsed: {:?} | {:?}", t0, ta); std::thread::sleep(std::time::Duration::from_millis( MONITOR_PROC_THREAD_INTERVAL_MS, )); } }); Self { proc_thread: Some(th), terminate_proc, monitor_samples, monitor_samples_copy: [MinMaxMonitorSamples::new(); MON_SIG_CNT], new_data, } } pub fn get_minmax_monitor_samples(&mut self, idx: usize) -> &MinMaxMonitorSamples { // TODO / FIXME: We should be using a triple buffer here // for access to the set of MinMaxMonitorSamples. But I was // too lazy and think we can bear with a slightly sluggish // UI. Anyways, if we get a sluggish UI, we have to look here. let mut pt = PerfTimer::new("MMMSamp").off(); if self.new_data.load(std::sync::atomic::Ordering::Relaxed) { let ms = self.monitor_samples.lock().expect("Unpoisoned Lock for monitor_samples"); pt.print("XXX"); for i in 0..MON_SIG_CNT { ms[i].copy_to(&mut self.monitor_samples_copy[i]); } self.new_data.store(false, std::sync::atomic::Ordering::Relaxed); pt.print("YYY"); } &self.monitor_samples_copy[idx] } } impl Drop for Monitor { fn drop(&mut self) { self.terminate_proc.store(true, std::sync::atomic::Ordering::Relaxed); let _ = self.proc_thread.take().unwrap().join(); } } /// Coordinates the processing of incoming MonitorBufs. pub struct MonitorProcessor { rb_mon_con: Consumer, rb_recycle_prod: Producer, new_data: bool, procs: Vec, } impl MonitorProcessor { pub fn new( rb_mon_con: Consumer, rb_recycle_prod: Producer, ) -> Self { let mut procs = vec![]; for i in 0..MON_SIG_CNT { procs.push(MonitorMinMax::new(i)); } Self { rb_mon_con, rb_recycle_prod, procs, new_data: false } } /// Helper function for tests, to access the current state of /// the min/max buffers. pub fn minmax_slice_for_signal(&self, idx: usize) -> (usize, &[(f32, f32)]) { let buf_ptr = self.procs[idx].buf_write_ptr; (buf_ptr, &self.procs[idx].buf[..]) } /// Internal helper function for `process`. fn process_mon_buf(&mut self, mon_buf: &mut MonitorBufPtr) { for proc in self.procs.iter_mut() { if proc.process(mon_buf) { self.new_data = true; } } } /// Processes all queued [MonitorBuf] instances and sends /// then back to the [MonitorBackend] thread after /// used for recycling. pub fn process(&mut self) { while let Some(mut buf) = self.rb_mon_con.pop() { self.process_mon_buf(&mut buf); buf.reset(); let _ = self.rb_recycle_prod.push(buf); } } /// Returns true, when a new data point was received. /// Resets the internal flag until the next time new data is received. pub fn check_new_data(&mut self) -> bool { let new_data = self.new_data; self.new_data = false; new_data } } /// Creates a pair of interconnected MonitorBackend and MonitorProcessor /// instances, to be sent to different threads. pub fn new_monitor_processor() -> (MonitorBackend, Monitor) { let rb_monitor = RingBuffer::new(MONITOR_BUF_COUNT); let rb_recycle = RingBuffer::new(MONITOR_BUF_COUNT); let (rb_mon_prod, rb_mon_con) = rb_monitor.split(); let (rb_recycle_prod, rb_recycle_con) = rb_recycle.split(); let mut unused_monitor_buffers = Vec::with_capacity(MONITOR_BUF_COUNT); for _ in 0..MONITOR_BUF_COUNT { unused_monitor_buffers.push(MonitorBuf::alloc()); } let backend = MonitorBackend { rb_mon_prod, rb_recycle_con, unused_monitor_buffers }; let frontend = Monitor::new(rb_mon_con, rb_recycle_prod); (backend, frontend) } /// This structure holds the output of the 6 cell inputs and outputs /// that is currently being monitored by the frontend. pub struct MonitorBuf { /// Holds the data of the signals. Each signal has it's /// own length. The lengths of the individual elements is /// reflected in the `len` attribute. sig_blocks: [f32; MON_SIG_CNT * MAX_BLOCK_SIZE], /// Holds the lengths of the individual signal data blocks in `sig_blocks`. len: [usize; MON_SIG_CNT], /// Holds the lengths of the individual signal data blocks in `sig_blocks`. read_idx: [usize; MON_SIG_CNT], } /// A trait that represents any kind of monitorable sources /// that provides at least MAX_BLOCK_SIZE samples. pub trait MonitorSource { fn copy_to(&self, len: usize, slice: &mut [f32]); } impl MonitorSource for &[f32] { fn copy_to(&self, len: usize, slice: &mut [f32]) { slice.copy_from_slice(&self[0..len]) } } impl MonitorBuf { /// Allocates a monitor buffer that holds up to 6 signals. pub fn alloc() -> MonitorBufPtr { Box::new(Self { sig_blocks: [0.0; MON_SIG_CNT * MAX_BLOCK_SIZE], len: [0; MON_SIG_CNT], read_idx: [0; MON_SIG_CNT], }) } pub fn reset(&mut self) { self.len = [0; MON_SIG_CNT]; self.read_idx = [0; MON_SIG_CNT]; } #[inline(always)] pub fn next_sample_for_signal(&mut self, idx: usize) -> Option { let rd_idx = self.read_idx[idx]; if rd_idx >= self.len[idx] { return None; } self.read_idx[idx] = rd_idx + 1; let sb_idx = idx * MAX_BLOCK_SIZE; Some(self.sig_blocks[sb_idx + rd_idx]) } pub fn feed(&mut self, idx: usize, len: usize, data: T) where T: MonitorSource, { let sb_idx = idx * MAX_BLOCK_SIZE; data.copy_to(len, &mut self.sig_blocks[sb_idx..(sb_idx + len)]); self.len[idx] = len; } } /// Pointer type for the [MonitorBuf] pub type MonitorBufPtr = Box; #[cfg(test)] mod tests { use super::*; fn send_n_monitor_bufs(backend: &mut MonitorBackend, first: f32, last: f32, count: usize) { for _ in 0..count { let mut mon = backend.get_unused_mon_buf().unwrap(); let mut samples: Vec = vec![]; for _ in 0..MAX_BLOCK_SIZE { samples.push(0.0); } samples[0] = first; samples[MAX_BLOCK_SIZE - 1] = last; mon.feed(0, MAX_BLOCK_SIZE, &samples[..]); backend.send_mon_buf(mon); } } fn wait_for_monitor_process() { // FIXME: This could in theory do some spin waiting for // the new_data flag! std::thread::sleep(std::time::Duration::from_millis(3 * MONITOR_PROC_THREAD_INTERVAL_MS)); } #[test] fn check_monitor_proc() { let (mut backend, mut frontend) = new_monitor_processor(); let count1 = (MONITOR_INPUT_LEN_PER_SAMPLE / MAX_BLOCK_SIZE) + 1; let count2 = 2 * ((MONITOR_INPUT_LEN_PER_SAMPLE / MAX_BLOCK_SIZE) + 1); send_n_monitor_bufs(&mut backend, -0.9, 0.8, count1); send_n_monitor_bufs(&mut backend, -0.7, 0.6, count2); wait_for_monitor_process(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 1], (-0.7, 0.6)); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 2], (-0.7, 0.8)); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 3], (-0.9, 0.8)); assert_eq!(backend.count_unused_mon_bufs(), MONITOR_BUF_COUNT - count1 - count2); backend.check_recycle(); assert_eq!(backend.count_unused_mon_bufs(), MONITOR_BUF_COUNT); } #[test] fn check_monitor_partial() { let (mut backend, mut frontend) = new_monitor_processor(); let count1 = MONITOR_INPUT_LEN_PER_SAMPLE / MAX_BLOCK_SIZE; send_n_monitor_bufs(&mut backend, -0.9, 0.8, count1); wait_for_monitor_process(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 1], (0.0, 0.0)); send_n_monitor_bufs(&mut backend, -0.9, 0.8, 1); wait_for_monitor_process(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 1], (-0.9, 0.8)); } #[test] fn check_monitor_fragment() { let (mut backend, mut frontend) = new_monitor_processor(); let count1 = MONITOR_INPUT_LEN_PER_SAMPLE / MAX_BLOCK_SIZE; let rest = MONITOR_INPUT_LEN_PER_SAMPLE - count1 * MAX_BLOCK_SIZE; send_n_monitor_bufs(&mut backend, -0.9, 0.8, count1); wait_for_monitor_process(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!(sl[0], (0.0, 0.0)); let mut mon = backend.get_unused_mon_buf().unwrap(); let mut samples: Vec = vec![]; let part1_len = rest - 1; for _ in 0..part1_len { samples.push(0.0); } samples[0] = -0.9; samples[part1_len - 1] = -0.95; mon.feed(0, part1_len, &samples[..]); backend.send_mon_buf(mon); wait_for_monitor_process(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 1], (0.0, 0.0)); let mut mon = backend.get_unused_mon_buf().unwrap(); mon.feed(0, 1, &[0.86][..]); backend.send_mon_buf(mon); wait_for_monitor_process(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!(sl[MONITOR_MINMAX_SAMPLES - 1], (-0.95, 0.86)); } #[test] fn check_monitor_wrap_buf() { let (mut backend, mut frontend) = new_monitor_processor(); let count1 = (MONITOR_INPUT_LEN_PER_SAMPLE / MAX_BLOCK_SIZE) + 1; for i in 0..MONITOR_MINMAX_SAMPLES { let v = i as f32 / MONITOR_MINMAX_SAMPLES as f32; send_n_monitor_bufs(&mut backend, -0.9, v, count1); // Give the MonitorProcessor some time to work on the buffers. std::thread::sleep(std::time::Duration::from_millis(5)); backend.check_recycle(); } wait_for_monitor_process(); backend.check_recycle(); let sl = frontend.get_minmax_monitor_samples(0); assert_eq!((sl[MONITOR_MINMAX_SAMPLES - 1].1 * 10000.0).floor() as u32, 9937); assert_eq!(backend.count_unused_mon_bufs(), MONITOR_BUF_COUNT); } }