Fix race condition between GraphMessage and QuickMessage

This commit is contained in:
Weird Constructor 2022-07-17 13:06:56 +02:00
parent c5c26bdc3e
commit 6a90ded336
5 changed files with 64 additions and 56 deletions

View file

@ -9,7 +9,7 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
lazy_static! { lazy_static! {
static ref LOG_RECV: Arc<Mutex<LogReceiver>> = { Arc::new(Mutex::new(LogReceiver::new())) }; static ref LOG_RECV: Arc<Mutex<LogReceiver>> = Arc::new(Mutex::new(LogReceiver::new()));
} }
thread_local! { thread_local! {

View file

@ -40,20 +40,25 @@ pub(crate) enum DropMsg {
Atom { atom: SAtom }, Atom { atom: SAtom },
} }
/// Big messages for updating the NodeExecutor thread. /// Messages for updating the NodeExecutor thread.
/// Usually used for shoveling NodeProg and Nodes to and from /// Usually used for shoveling NodeProg and Nodes to and from
/// the NodeExecutor thread. /// the NodeExecutor thread. And also parameter updates of course.
#[derive(Debug)] #[derive(Debug)]
pub enum GraphMessage { pub enum GraphMessage {
NewNode { index: u8, node: Node }, NewNode {
NewProg { prog: NodeProg, copy_old_out: bool }, index: u8,
Clear { prog: NodeProg }, node: Node,
} },
NewProg {
prog: NodeProg,
copy_old_out: bool,
},
Clear {
prog: NodeProg,
},
/// Messages for small updates between the NodeExecutor thread // XXX: Parameter updates used to be separate from the graph update, but this
/// and the NodeConfigurator. // became a race condition and I had to revert this premature optimization.
#[derive(Debug)]
pub enum QuickMessage {
AtomUpdate { AtomUpdate {
at_idx: usize, at_idx: usize,
value: SAtom, value: SAtom,

View file

@ -3,7 +3,7 @@
// See README.md and COPYING for details. // See README.md and COPYING for details.
use super::{ use super::{
FeedbackFilter, GraphMessage, NodeOp, NodeProg, QuickMessage, MAX_ALLOCATED_NODES, FeedbackFilter, GraphMessage, NodeOp, NodeProg, MAX_ALLOCATED_NODES,
MAX_AVAIL_TRACKERS, MAX_INPUTS, UNUSED_MONITOR_IDX, MAX_AVAIL_TRACKERS, MAX_INPUTS, UNUSED_MONITOR_IDX,
}; };
use crate::dsp::tracker::{PatternData, Tracker}; use crate::dsp::tracker::{PatternData, Tracker};
@ -37,7 +37,7 @@ pub struct NodeInstance {
/// A mapping array, to map from input index of the node /// A mapping array, to map from input index of the node
/// to the modulator index. Because not every input has an /// to the modulator index. Because not every input has an
/// associated modulator. /// associated modulator.
/// This is used later to send [QuickMessage::ModamtUpdate]. /// This is used later to send [GraphMessage::ModamtUpdate].
/// The input index into this array is the index returned from /// The input index into this array is the index returned from
/// routines like [NodeId::inp_param]. /// routines like [NodeId::inp_param].
in2mod_map: [Option<usize>; MAX_INPUTS], in2mod_map: [Option<usize>; MAX_INPUTS],
@ -215,8 +215,6 @@ pub(crate) struct SharedNodeConf {
pub(crate) node_ctx_values: Vec<Arc<AtomicFloat>>, pub(crate) node_ctx_values: Vec<Arc<AtomicFloat>>,
/// For updating the NodeExecutor with graph updates. /// For updating the NodeExecutor with graph updates.
pub(crate) graph_update_prod: Producer<GraphMessage>, pub(crate) graph_update_prod: Producer<GraphMessage>,
/// For quick updates like UI paramter changes.
pub(crate) quick_update_prod: Producer<QuickMessage>,
/// For receiving monitor data from the backend thread. /// For receiving monitor data from the backend thread.
pub(crate) monitor: Monitor, pub(crate) monitor: Monitor,
/// Handles deallocation of dead nodes from the backend. /// Handles deallocation of dead nodes from the backend.
@ -229,11 +227,9 @@ use super::node_exec::SharedNodeExec;
impl SharedNodeConf { impl SharedNodeConf {
pub(crate) fn new() -> (Self, SharedNodeExec) { pub(crate) fn new() -> (Self, SharedNodeExec) {
let rb_graph = RingBuffer::new(MAX_ALLOCATED_NODES * 2); let rb_graph = RingBuffer::new(MAX_ALLOCATED_NODES * 2);
let rb_quick = RingBuffer::new(MAX_ALLOCATED_NODES * 8);
let rb_drop = RingBuffer::new(MAX_ALLOCATED_NODES * 2); let rb_drop = RingBuffer::new(MAX_ALLOCATED_NODES * 2);
let (rb_graph_prod, rb_graph_con) = rb_graph.split(); let (rb_graph_prod, rb_graph_con) = rb_graph.split();
let (rb_quick_prod, rb_quick_con) = rb_quick.split();
let (rb_drop_prod, rb_drop_con) = rb_drop.split(); let (rb_drop_prod, rb_drop_con) = rb_drop.split();
let drop_thread = DropThread::new(rb_drop_con); let drop_thread = DropThread::new(rb_drop_con);
@ -252,14 +248,12 @@ impl SharedNodeConf {
Self { Self {
node_ctx_values, node_ctx_values,
graph_update_prod: rb_graph_prod, graph_update_prod: rb_graph_prod,
quick_update_prod: rb_quick_prod,
monitor, monitor,
drop_thread, drop_thread,
}, },
SharedNodeExec { SharedNodeExec {
node_ctx_values: exec_node_ctx_vals, node_ctx_values: exec_node_ctx_vals,
graph_update_con: rb_graph_con, graph_update_con: rb_graph_con,
quick_update_con: rb_quick_con,
graph_drop_prod: rb_drop_prod, graph_drop_prod: rb_drop_prod,
monitor_backend, monitor_backend,
}, },
@ -384,8 +378,8 @@ impl NodeConfigurator {
if let Some(mod_idx) = mod_idx { if let Some(mod_idx) = mod_idx {
let _ = self let _ = self
.shared .shared
.quick_update_prod .graph_update_prod
.push(QuickMessage::ModamtUpdate { mod_idx, modamt }); .push(GraphMessage::ModamtUpdate { mod_idx, modamt });
} }
false false
@ -438,10 +432,11 @@ impl NodeConfigurator {
nparam.value = at.clone(); nparam.value = at.clone();
let at_idx = nparam.at_idx; let at_idx = nparam.at_idx;
println!("SEND ATOM UPDATE: {}, {:?}", at_idx, at);
let _ = self let _ = self
.shared .shared
.quick_update_prod .graph_update_prod
.push(QuickMessage::AtomUpdate { at_idx, value: at }); .push(GraphMessage::AtomUpdate { at_idx, value: at });
} }
} else { } else {
self.param_values.insert(param, at.f()); self.param_values.insert(param, at.f());
@ -453,8 +448,8 @@ impl NodeConfigurator {
let input_idx = nparam.input_idx; let input_idx = nparam.input_idx;
let _ = self let _ = self
.shared .shared
.quick_update_prod .graph_update_prod
.push(QuickMessage::ParamUpdate { input_idx, value }); .push(GraphMessage::ParamUpdate { input_idx, value });
} }
} }
} }
@ -645,7 +640,7 @@ impl NodeConfigurator {
i += 1; i += 1;
} }
let _ = self.shared.quick_update_prod.push(QuickMessage::SetMonitor { bufs }); let _ = self.shared.graph_update_prod.push(GraphMessage::SetMonitor { bufs });
} }
} }
@ -965,6 +960,7 @@ impl NodeConfigurator {
self.output_fb_cons = prog.take_feedback_consumer(); self.output_fb_cons = prog.take_feedback_consumer();
println!("NEW PROG id={}", prog.unique_id);
let _ = self.shared.graph_update_prod.push(GraphMessage::NewProg { prog, copy_old_out }); let _ = self.shared.graph_update_prod.push(GraphMessage::NewProg { prog, copy_old_out });
} }

View file

@ -3,7 +3,7 @@
// See README.md and COPYING for details. // See README.md and COPYING for details.
use super::{ use super::{
DropMsg, GraphMessage, NodeProg, QuickMessage, FB_DELAY_TIME_US, MAX_ALLOCATED_NODES, DropMsg, GraphMessage, NodeProg, FB_DELAY_TIME_US, MAX_ALLOCATED_NODES,
MAX_FB_DELAY_SIZE, MAX_SMOOTHERS, UNUSED_MONITOR_IDX, MAX_FB_DELAY_SIZE, MAX_SMOOTHERS, UNUSED_MONITOR_IDX,
}; };
use crate::dsp::{Node, NodeContext, NodeId, MAX_BLOCK_SIZE}; use crate::dsp::{Node, NodeContext, NodeId, MAX_BLOCK_SIZE};
@ -77,8 +77,6 @@ pub(crate) struct SharedNodeExec {
pub(crate) node_ctx_values: Vec<Arc<AtomicFloat>>, pub(crate) node_ctx_values: Vec<Arc<AtomicFloat>>,
/// For receiving Node and NodeProg updates /// For receiving Node and NodeProg updates
pub(crate) graph_update_con: Consumer<GraphMessage>, pub(crate) graph_update_con: Consumer<GraphMessage>,
/// For quick updates like UI paramter changes.
pub(crate) quick_update_con: Consumer<QuickMessage>,
/// For receiving deleted/overwritten nodes from the backend thread. /// For receiving deleted/overwritten nodes from the backend thread.
pub(crate) graph_drop_prod: Producer<DropMsg>, pub(crate) graph_drop_prod: Producer<DropMsg>,
/// For sending feedback to the frontend thread. /// For sending feedback to the frontend thread.
@ -275,7 +273,7 @@ impl NodeExecutor {
// is always sent with the new program and "should" // is always sent with the new program and "should"
// be up to date, even if we have a slight possible race // be up to date, even if we have a slight possible race
// condition between GraphMessage::NewProg // condition between GraphMessage::NewProg
// and QuickMessage::AtomUpdate. // and GraphMessage::AtomUpdate.
// First overwrite by the current input parameters, // First overwrite by the current input parameters,
// to make sure _all_ inputs have a proper value // to make sure _all_ inputs have a proper value
@ -322,6 +320,21 @@ impl NodeExecutor {
); );
}); });
} }
GraphMessage::AtomUpdate { at_idx, value } => {
let prog = &mut self.prog;
let garbage = std::mem::replace(&mut prog.atoms[at_idx], value);
let _ = self.shared.graph_drop_prod.push(DropMsg::Atom { atom: garbage });
}
GraphMessage::ParamUpdate { input_idx, value } => {
self.set_param(input_idx, value);
}
GraphMessage::ModamtUpdate { mod_idx, modamt } => {
self.set_modamt(mod_idx, modamt);
}
GraphMessage::SetMonitor { bufs } => {
self.monitor_signal_cur_inp_indices = bufs;
}
} }
} }
} }
@ -406,31 +419,6 @@ impl NodeExecutor {
} }
} }
#[inline]
pub fn process_param_updates(&mut self, nframes: usize) {
while let Some(upd) = self.shared.quick_update_con.pop() {
match upd {
QuickMessage::AtomUpdate { at_idx, value } => {
let prog = &mut self.prog;
let garbage = std::mem::replace(&mut prog.atoms[at_idx], value);
let _ = self.shared.graph_drop_prod.push(DropMsg::Atom { atom: garbage });
}
QuickMessage::ParamUpdate { input_idx, value } => {
self.set_param(input_idx, value);
}
QuickMessage::ModamtUpdate { mod_idx, modamt } => {
self.set_modamt(mod_idx, modamt);
}
QuickMessage::SetMonitor { bufs } => {
self.monitor_signal_cur_inp_indices = bufs;
}
}
}
self.process_smoothers(nframes);
}
#[inline] #[inline]
pub fn process<T: NodeAudioContext>(&mut self, ctx: &mut T) { pub fn process<T: NodeAudioContext>(&mut self, ctx: &mut T) {
// let tb = std::time::Instant::now(); // let tb = std::time::Instant::now();
@ -442,7 +430,7 @@ impl NodeExecutor {
}); });
} }
self.process_param_updates(ctx.nframes()); self.process_smoothers(ctx.nframes());
let nodes = &mut self.nodes; let nodes = &mut self.nodes;
let ctx_vals = &mut self.shared.node_ctx_values; let ctx_vals = &mut self.shared.node_ctx_values;

View file

@ -3,8 +3,13 @@
// See README.md and COPYING for details. // See README.md and COPYING for details.
use crate::dsp::{ProcBuf, SAtom}; use crate::dsp::{ProcBuf, SAtom};
use std::cell::RefCell;
use triple_buffer::{Input, Output, TripleBuffer}; use triple_buffer::{Input, Output, TripleBuffer};
thread_local! {
pub static NODE_PROG_ID_COUNTER: RefCell<usize> = RefCell::new(1);
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ModOp { pub struct ModOp {
amount: f32, amount: f32,
@ -189,6 +194,10 @@ pub struct NodeProg {
/// Temporary hold for the producer for the `out_feedback`: /// Temporary hold for the producer for the `out_feedback`:
pub out_fb_cons: Option<Output<Vec<f32>>>, pub out_fb_cons: Option<Output<Vec<f32>>>,
/// A unique ID assigned to the node prog. Mostly for debugging purposes.
/// You should only read this field.
pub unique_id: usize,
} }
impl Drop for NodeProg { impl Drop for NodeProg {
@ -203,6 +212,14 @@ impl Drop for NodeProg {
} }
} }
fn new_node_prog_id() -> usize {
NODE_PROG_ID_COUNTER.with(|cnt| {
let unique_id = *cnt.borrow();
*cnt.borrow_mut() += 1;
unique_id
})
}
impl NodeProg { impl NodeProg {
pub fn empty() -> Self { pub fn empty() -> Self {
let out_fb = vec![]; let out_fb = vec![];
@ -219,6 +236,7 @@ impl NodeProg {
out_feedback: input_fb, out_feedback: input_fb,
out_fb_cons: Some(output_fb), out_fb_cons: Some(output_fb),
locked_buffers: false, locked_buffers: false,
unique_id: new_node_prog_id(),
} }
} }
@ -253,6 +271,7 @@ impl NodeProg {
out_feedback: input_fb, out_feedback: input_fb,
out_fb_cons: Some(output_fb), out_fb_cons: Some(output_fb),
locked_buffers: false, locked_buffers: false,
unique_id: new_node_prog_id(),
} }
} }