From 6a90ded33691decb5bd168c952a7f61627e9bfc6 Mon Sep 17 00:00:00 2001 From: Weird Constructor Date: Sun, 17 Jul 2022 13:06:56 +0200 Subject: [PATCH] Fix race condition between GraphMessage and QuickMessage --- src/log.rs | 2 +- src/nodes/mod.rs | 25 +++++++++++++--------- src/nodes/node_conf.rs | 26 ++++++++++------------- src/nodes/node_exec.rs | 48 ++++++++++++++++-------------------------- src/nodes/node_prog.rs | 19 +++++++++++++++++ 5 files changed, 64 insertions(+), 56 deletions(-) diff --git a/src/log.rs b/src/log.rs index 95b86a3..8f5bfd7 100644 --- a/src/log.rs +++ b/src/log.rs @@ -9,7 +9,7 @@ use std::cell::RefCell; use std::sync::{Arc, Mutex}; lazy_static! { - static ref LOG_RECV: Arc> = { Arc::new(Mutex::new(LogReceiver::new())) }; + static ref LOG_RECV: Arc> = Arc::new(Mutex::new(LogReceiver::new())); } thread_local! { diff --git a/src/nodes/mod.rs b/src/nodes/mod.rs index 9726334..cd450c5 100644 --- a/src/nodes/mod.rs +++ b/src/nodes/mod.rs @@ -40,20 +40,25 @@ pub(crate) enum DropMsg { Atom { atom: SAtom }, } -/// Big messages for updating the NodeExecutor thread. +/// Messages for updating the NodeExecutor thread. /// Usually used for shoveling NodeProg and Nodes to and from -/// the NodeExecutor thread. +/// the NodeExecutor thread. And also parameter updates of course. #[derive(Debug)] pub enum GraphMessage { - NewNode { index: u8, node: Node }, - NewProg { prog: NodeProg, copy_old_out: bool }, - Clear { prog: NodeProg }, -} + NewNode { + index: u8, + node: Node, + }, + NewProg { + prog: NodeProg, + copy_old_out: bool, + }, + Clear { + prog: NodeProg, + }, -/// Messages for small updates between the NodeExecutor thread -/// and the NodeConfigurator. -#[derive(Debug)] -pub enum QuickMessage { + // XXX: Parameter updates used to be separate from the graph update, but this + // became a race condition and I had to revert this premature optimization. AtomUpdate { at_idx: usize, value: SAtom, diff --git a/src/nodes/node_conf.rs b/src/nodes/node_conf.rs index 890a2d9..9f83598 100644 --- a/src/nodes/node_conf.rs +++ b/src/nodes/node_conf.rs @@ -3,7 +3,7 @@ // See README.md and COPYING for details. use super::{ - FeedbackFilter, GraphMessage, NodeOp, NodeProg, QuickMessage, MAX_ALLOCATED_NODES, + FeedbackFilter, GraphMessage, NodeOp, NodeProg, MAX_ALLOCATED_NODES, MAX_AVAIL_TRACKERS, MAX_INPUTS, UNUSED_MONITOR_IDX, }; use crate::dsp::tracker::{PatternData, Tracker}; @@ -37,7 +37,7 @@ pub struct NodeInstance { /// A mapping array, to map from input index of the node /// to the modulator index. Because not every input has an /// associated modulator. - /// This is used later to send [QuickMessage::ModamtUpdate]. + /// This is used later to send [GraphMessage::ModamtUpdate]. /// The input index into this array is the index returned from /// routines like [NodeId::inp_param]. in2mod_map: [Option; MAX_INPUTS], @@ -215,8 +215,6 @@ pub(crate) struct SharedNodeConf { pub(crate) node_ctx_values: Vec>, /// For updating the NodeExecutor with graph updates. pub(crate) graph_update_prod: Producer, - /// For quick updates like UI paramter changes. - pub(crate) quick_update_prod: Producer, /// For receiving monitor data from the backend thread. pub(crate) monitor: Monitor, /// Handles deallocation of dead nodes from the backend. @@ -229,11 +227,9 @@ use super::node_exec::SharedNodeExec; impl SharedNodeConf { pub(crate) fn new() -> (Self, SharedNodeExec) { let rb_graph = RingBuffer::new(MAX_ALLOCATED_NODES * 2); - let rb_quick = RingBuffer::new(MAX_ALLOCATED_NODES * 8); let rb_drop = RingBuffer::new(MAX_ALLOCATED_NODES * 2); let (rb_graph_prod, rb_graph_con) = rb_graph.split(); - let (rb_quick_prod, rb_quick_con) = rb_quick.split(); let (rb_drop_prod, rb_drop_con) = rb_drop.split(); let drop_thread = DropThread::new(rb_drop_con); @@ -252,14 +248,12 @@ impl SharedNodeConf { Self { node_ctx_values, graph_update_prod: rb_graph_prod, - quick_update_prod: rb_quick_prod, monitor, drop_thread, }, SharedNodeExec { node_ctx_values: exec_node_ctx_vals, graph_update_con: rb_graph_con, - quick_update_con: rb_quick_con, graph_drop_prod: rb_drop_prod, monitor_backend, }, @@ -384,8 +378,8 @@ impl NodeConfigurator { if let Some(mod_idx) = mod_idx { let _ = self .shared - .quick_update_prod - .push(QuickMessage::ModamtUpdate { mod_idx, modamt }); + .graph_update_prod + .push(GraphMessage::ModamtUpdate { mod_idx, modamt }); } false @@ -438,10 +432,11 @@ impl NodeConfigurator { nparam.value = at.clone(); let at_idx = nparam.at_idx; + println!("SEND ATOM UPDATE: {}, {:?}", at_idx, at); let _ = self .shared - .quick_update_prod - .push(QuickMessage::AtomUpdate { at_idx, value: at }); + .graph_update_prod + .push(GraphMessage::AtomUpdate { at_idx, value: at }); } } else { self.param_values.insert(param, at.f()); @@ -453,8 +448,8 @@ impl NodeConfigurator { let input_idx = nparam.input_idx; let _ = self .shared - .quick_update_prod - .push(QuickMessage::ParamUpdate { input_idx, value }); + .graph_update_prod + .push(GraphMessage::ParamUpdate { input_idx, value }); } } } @@ -645,7 +640,7 @@ impl NodeConfigurator { i += 1; } - let _ = self.shared.quick_update_prod.push(QuickMessage::SetMonitor { bufs }); + let _ = self.shared.graph_update_prod.push(GraphMessage::SetMonitor { bufs }); } } @@ -965,6 +960,7 @@ impl NodeConfigurator { self.output_fb_cons = prog.take_feedback_consumer(); + println!("NEW PROG id={}", prog.unique_id); let _ = self.shared.graph_update_prod.push(GraphMessage::NewProg { prog, copy_old_out }); } diff --git a/src/nodes/node_exec.rs b/src/nodes/node_exec.rs index 3f58a10..069ffb2 100644 --- a/src/nodes/node_exec.rs +++ b/src/nodes/node_exec.rs @@ -3,7 +3,7 @@ // See README.md and COPYING for details. use super::{ - DropMsg, GraphMessage, NodeProg, QuickMessage, FB_DELAY_TIME_US, MAX_ALLOCATED_NODES, + DropMsg, GraphMessage, NodeProg, FB_DELAY_TIME_US, MAX_ALLOCATED_NODES, MAX_FB_DELAY_SIZE, MAX_SMOOTHERS, UNUSED_MONITOR_IDX, }; use crate::dsp::{Node, NodeContext, NodeId, MAX_BLOCK_SIZE}; @@ -77,8 +77,6 @@ pub(crate) struct SharedNodeExec { pub(crate) node_ctx_values: Vec>, /// For receiving Node and NodeProg updates pub(crate) graph_update_con: Consumer, - /// For quick updates like UI paramter changes. - pub(crate) quick_update_con: Consumer, /// For receiving deleted/overwritten nodes from the backend thread. pub(crate) graph_drop_prod: Producer, /// For sending feedback to the frontend thread. @@ -275,7 +273,7 @@ impl NodeExecutor { // is always sent with the new program and "should" // be up to date, even if we have a slight possible race // condition between GraphMessage::NewProg - // and QuickMessage::AtomUpdate. + // and GraphMessage::AtomUpdate. // First overwrite by the current input parameters, // to make sure _all_ inputs have a proper value @@ -322,6 +320,21 @@ impl NodeExecutor { ); }); } + GraphMessage::AtomUpdate { at_idx, value } => { + let prog = &mut self.prog; + let garbage = std::mem::replace(&mut prog.atoms[at_idx], value); + + let _ = self.shared.graph_drop_prod.push(DropMsg::Atom { atom: garbage }); + } + GraphMessage::ParamUpdate { input_idx, value } => { + self.set_param(input_idx, value); + } + GraphMessage::ModamtUpdate { mod_idx, modamt } => { + self.set_modamt(mod_idx, modamt); + } + GraphMessage::SetMonitor { bufs } => { + self.monitor_signal_cur_inp_indices = bufs; + } } } } @@ -406,31 +419,6 @@ impl NodeExecutor { } } - #[inline] - pub fn process_param_updates(&mut self, nframes: usize) { - while let Some(upd) = self.shared.quick_update_con.pop() { - match upd { - QuickMessage::AtomUpdate { at_idx, value } => { - let prog = &mut self.prog; - let garbage = std::mem::replace(&mut prog.atoms[at_idx], value); - - let _ = self.shared.graph_drop_prod.push(DropMsg::Atom { atom: garbage }); - } - QuickMessage::ParamUpdate { input_idx, value } => { - self.set_param(input_idx, value); - } - QuickMessage::ModamtUpdate { mod_idx, modamt } => { - self.set_modamt(mod_idx, modamt); - } - QuickMessage::SetMonitor { bufs } => { - self.monitor_signal_cur_inp_indices = bufs; - } - } - } - - self.process_smoothers(nframes); - } - #[inline] pub fn process(&mut self, ctx: &mut T) { // let tb = std::time::Instant::now(); @@ -442,7 +430,7 @@ impl NodeExecutor { }); } - self.process_param_updates(ctx.nframes()); + self.process_smoothers(ctx.nframes()); let nodes = &mut self.nodes; let ctx_vals = &mut self.shared.node_ctx_values; diff --git a/src/nodes/node_prog.rs b/src/nodes/node_prog.rs index 30fe34c..10b04ae 100644 --- a/src/nodes/node_prog.rs +++ b/src/nodes/node_prog.rs @@ -3,8 +3,13 @@ // See README.md and COPYING for details. use crate::dsp::{ProcBuf, SAtom}; +use std::cell::RefCell; use triple_buffer::{Input, Output, TripleBuffer}; +thread_local! { + pub static NODE_PROG_ID_COUNTER: RefCell = RefCell::new(1); +} + #[derive(Debug, Clone)] pub struct ModOp { amount: f32, @@ -189,6 +194,10 @@ pub struct NodeProg { /// Temporary hold for the producer for the `out_feedback`: pub out_fb_cons: Option>>, + + /// A unique ID assigned to the node prog. Mostly for debugging purposes. + /// You should only read this field. + pub unique_id: usize, } impl Drop for NodeProg { @@ -203,6 +212,14 @@ impl Drop for NodeProg { } } +fn new_node_prog_id() -> usize { + NODE_PROG_ID_COUNTER.with(|cnt| { + let unique_id = *cnt.borrow(); + *cnt.borrow_mut() += 1; + unique_id + }) +} + impl NodeProg { pub fn empty() -> Self { let out_fb = vec![]; @@ -219,6 +236,7 @@ impl NodeProg { out_feedback: input_fb, out_fb_cons: Some(output_fb), locked_buffers: false, + unique_id: new_node_prog_id(), } } @@ -253,6 +271,7 @@ impl NodeProg { out_feedback: input_fb, out_fb_cons: Some(output_fb), locked_buffers: false, + unique_id: new_node_prog_id(), } }