HexoDSP/src/nodes/node_exec.rs
2021-05-18 05:11:19 +02:00

433 lines
16 KiB
Rust

// Copyright (c) 2021 Weird Constructor <weirdconstructor@gmail.com>
// This is a part of HexoDSP. Released under (A)GPLv3 or any later.
// See README.md and COPYING for details.
use super::{
GraphMessage, QuickMessage, DropMsg, NodeProg,
UNUSED_MONITOR_IDX, MAX_ALLOCATED_NODES, MAX_SMOOTHERS
};
use crate::dsp::{NodeId, Node};
use crate::util::{Smoother, AtomicFloat};
use crate::monitor::{MonitorBackend, MON_SIG_CNT};
use ringbuf::{Producer, Consumer};
use std::sync::Arc;
/// Holds the complete allocation of nodes and
/// the program. New Nodes or the program is
/// not newly allocated in the audio backend, but it is
/// copied from the input ring buffer.
/// If this turns out to be too slow, we might
/// have to push buffers of the program around.
///
pub struct NodeExecutor {
/// Contains the nodes and their state.
/// Is loaded from the input ring buffer when a corresponding
/// message arrives.
///
/// In case the previous node contained something that needs
/// deallocation, the nodes are replaced and the contents
/// is sent back using the free-ringbuffer.
pub(crate) nodes: Vec<Node>,
/// Contains the stand-by smoothing operators for incoming parameter changes.
pub(crate) smoothers: Vec<(usize, Smoother)>,
/// Contains target parameter values after a smoother finished,
/// these will refresh the input buffers:
pub(crate) target_refresh: Vec<(usize, f32)>,
/// Contains the to be executed nodes and output operations.
/// Is copied from the input ringbuffer when a corresponding
/// message arrives.
pub(crate) prog: NodeProg,
/// Holds the input vector indices which are to be monitored by the frontend.
pub(crate) monitor_signal_cur_inp_indices: [usize; MON_SIG_CNT],
/// The sample rate
pub(crate) sample_rate: f32,
/// The connection with the [crate::nodes::NodeConfigurator].
shared: SharedNodeExec,
}
/// Contains anything that connects the [NodeExecutor] with the frontend part.
pub(crate) struct SharedNodeExec {
/// Holds two context values interleaved.
/// The first for each node is the LED value and the second is a
/// phase value. The LED will be displayed in the hex matrix, while the
/// phase might be used to display an envelope's play position.
pub(crate) node_ctx_values: Vec<Arc<AtomicFloat>>,
/// For receiving Node and NodeProg updates
pub(crate) graph_update_con: Consumer<GraphMessage>,
/// For quick updates like UI paramter changes.
pub(crate) quick_update_con: Consumer<QuickMessage>,
/// For receiving deleted/overwritten nodes from the backend thread.
pub(crate) graph_drop_prod: Producer<DropMsg>,
/// For sending feedback to the frontend thread.
pub(crate) monitor_backend: MonitorBackend,
}
pub trait NodeAudioContext {
fn nframes(&self) -> usize;
fn output(&mut self, channel: usize, frame: usize, v: f32);
fn input(&mut self, channel: usize, frame: usize) -> f32;
}
impl NodeExecutor {
pub(crate) fn new(shared: SharedNodeExec) -> Self {
let mut nodes = Vec::new();
nodes.resize_with(MAX_ALLOCATED_NODES, || Node::Nop);
let mut smoothers = Vec::new();
smoothers.resize_with(MAX_SMOOTHERS, || (0, Smoother::new()));
let target_refresh = Vec::with_capacity(MAX_SMOOTHERS);
NodeExecutor {
nodes,
smoothers,
target_refresh,
sample_rate: 44100.0,
prog: NodeProg::empty(),
monitor_signal_cur_inp_indices: [UNUSED_MONITOR_IDX; MON_SIG_CNT],
shared,
}
}
#[inline]
pub fn process_graph_updates(&mut self) {
while let Some(upd) = self.shared.graph_update_con.pop() {
match upd {
GraphMessage::NewNode { index, mut node } => {
node.set_sample_rate(self.sample_rate);
let prev_node =
std::mem::replace(
&mut self.nodes[index as usize],
node);
let _ =
self.shared.graph_drop_prod.push(
DropMsg::Node { node: prev_node });
},
GraphMessage::Clear { prog } => {
for n in self.nodes.iter_mut() {
if n.to_id(0) != NodeId::Nop {
let prev_node = std::mem::replace(n, Node::Nop);
let _ =
self.shared.graph_drop_prod.push(
DropMsg::Node { node: prev_node });
}
}
self.monitor_signal_cur_inp_indices =
[UNUSED_MONITOR_IDX; MON_SIG_CNT];
let prev_prog = std::mem::replace(&mut self.prog, prog);
let _ =
self.shared.graph_drop_prod.push(
DropMsg::Prog { prog: prev_prog });
},
GraphMessage::NewProg { prog, copy_old_out } => {
let mut prev_prog = std::mem::replace(&mut self.prog, prog);
self.monitor_signal_cur_inp_indices =
[UNUSED_MONITOR_IDX; MON_SIG_CNT];
// XXX: Copying from the old vector works, because we only
// append nodes to the _end_ of the node instance vector.
// If we do a garbage collection, we can't do this.
//
// XXX: Also, we need to initialize the input parameter
// vector, because we don't know if they are updated from
// the new program outputs anymore. So we need to
// copy the old paramters to the inputs.
//
// => This does not apply to atom data, because that
// is always sent with the new program and "should"
// be up to date, even if we have a slight possible race
// condition between GraphMessage::NewProg
// and QuickMessage::AtomUpdate.
// First overwrite by the current input parameters,
// to make sure _all_ inputs have a proper value
// (not just those that existed before).
//
// We preserve the modulation history in the next step.
// This is also to make sure that new input ports
// have a proper value too.
self.prog.initialize_input_buffers();
if copy_old_out {
// XXX: The following is commented out, because presisting
// the output proc buffers does not make sense anymore.
// Because we don't allow cycles, so there is no
// way that a node can read from the previous
// iteration anyways.
//
// // Swap the old out buffers into the new NodeProg
// // TODO: If we toss away most of the buffers anyways,
// // we could optimize this step with more
// // intelligence in the matrix compiler.
// for (old_pb, new_pb) in
// prev_prog.out.iter_mut().zip(
// self.prog.out.iter_mut())
// {
// std::mem::swap(old_pb, new_pb);
// }
// Then overwrite the inputs by the more current previous
// input processing buffers, so we keep any modulation
// (smoothed) history of the block too.
self.prog.swap_previous_outputs(&mut prev_prog);
}
self.prog.assign_outputs();
let _ =
self.shared.graph_drop_prod.push(
DropMsg::Prog { prog: prev_prog });
},
}
}
}
pub fn set_sample_rate(&mut self, sample_rate: f32) {
self.sample_rate = sample_rate;
for n in self.nodes.iter_mut() {
n.set_sample_rate(sample_rate);
}
for sm in self.smoothers.iter_mut() {
sm.1.set_sample_rate(sample_rate);
}
}
#[inline]
pub fn get_nodes(&self) -> &Vec<Node> { &self.nodes }
#[inline]
pub fn get_prog(&self) -> &NodeProg { &self.prog }
#[inline]
fn set_param(&mut self, input_idx: usize, value: f32) {
let prog = &mut self.prog;
if input_idx >= prog.params.len() {
return;
}
// First check if we already have a running smoother for this param:
for (sm_inp_idx, smoother) in
self.smoothers
.iter_mut()
.filter(|s| !s.1.is_done())
{
if *sm_inp_idx == input_idx {
smoother.set(prog.params[input_idx], value);
//d// println!("RE-SET SMOOTHER {} {:6.3} (old = {:6.3})",
//d// input_idx, value, prog.params[input_idx]);
return;
}
}
// Find unused smoother and set it:
if let Some(sm) =
self.smoothers
.iter_mut()
.filter(|s| s.1.is_done())
.next()
{
sm.0 = input_idx;
sm.1.set(prog.params[input_idx], value);
//d// println!("SET SMOOTHER {} {:6.3} (old = {:6.3})",
//d// input_idx, value, prog.params[input_idx]);
}
}
#[inline]
fn process_smoothers(&mut self, nframes: usize) {
let prog = &mut self.prog;
while let Some((idx, v)) = self.target_refresh.pop() {
prog.inp[idx].fill(v);
}
for (idx, smoother) in
self.smoothers
.iter_mut()
.filter(|s|
!s.1.is_done())
{
let inp = &mut prog.inp[*idx];
let mut last_v = 0.0;
for frame in 0..nframes {
let v = smoother.next();
inp.write(frame, v);
last_v = v;
}
prog.params[*idx] = last_v;
self.target_refresh.push((*idx, last_v));
}
}
#[inline]
pub fn process_param_updates(&mut self, nframes: usize) {
while let Some(upd) = self.shared.quick_update_con.pop() {
match upd {
QuickMessage::AtomUpdate { at_idx, value } => {
let prog = &mut self.prog;
let garbage =
std::mem::replace(
&mut prog.atoms[at_idx],
value);
let _ =
self.shared.graph_drop_prod.push(
DropMsg::Atom { atom: garbage });
},
QuickMessage::ParamUpdate { input_idx, value } => {
self.set_param(input_idx, value);
},
QuickMessage::SetMonitor { bufs } => {
self.monitor_signal_cur_inp_indices = bufs;
},
}
}
self.process_smoothers(nframes);
}
#[inline]
pub fn process<T: NodeAudioContext>(&mut self, ctx: &mut T) {
// let tb = std::time::Instant::now();
self.process_param_updates(ctx.nframes());
let nodes = &mut self.nodes;
let ctx_vals = &mut self.shared.node_ctx_values;
let prog = &mut self.prog;
let prog_out_fb = prog.out_feedback.input_buffer();
for op in prog.prog.iter() {
let out = op.out_idxlen;
let inp = op.in_idxlen;
let at = op.at_idxlen;
let ctx_idx = op.idx as usize * 2;
nodes[op.idx as usize]
.process(
ctx,
&prog.atoms[at.0..at.1],
&prog.inp[inp.0..inp.1],
&prog.cur_inp[inp.0..inp.1],
&mut prog.out[out.0..out.1],
&ctx_vals[ctx_idx..ctx_idx + 2]);
let last_frame_idx = ctx.nframes() - 1;
for (pb, out_buf_idx) in
prog.out[out.0..out.1].iter()
.zip(out.0..out.1)
{
prog_out_fb[out_buf_idx] = pb.read(last_frame_idx);
}
}
prog.out_feedback.publish();
self.shared.monitor_backend.check_recycle();
// let ta = std::time::Instant::now();
for (i, idx) in self.monitor_signal_cur_inp_indices.iter().enumerate() {
if *idx == UNUSED_MONITOR_IDX {
continue;
}
if let Some(mut mon) = self.shared.monitor_backend.get_unused_mon_buf() {
if i > 2 {
mon.feed(i, ctx.nframes(), &prog.out[*idx]);
} else {
mon.feed(i, ctx.nframes(), &prog.cur_inp[*idx]);
}
self.shared.monitor_backend.send_mon_buf(mon);
}
}
// let ta = std::time::Instant::now().duration_since(ta);
// let tb = std::time::Instant::now().duration_since(tb);
// println!("ta Elapsed: {:?}", ta);
// println!("tb Elapsed: {:?}", tb);
}
/// This is a convenience function used for testing
/// the DSP graph output in automated tests for this crate.
///
/// The sample rate that is used to run the DSP code is 44100 Hz.
///
/// Relying on the behvaiour of this function for production code
/// is not it's intended usecase and changes might break your code.
///
/// * `seconds`: The number of seconds to run the DSP thread for.
/// * `realtime`: If this is set, the function will sleep.
///
/// You can use it's source as reference for your own audio
/// DSP thread processing function.
pub fn test_run(&mut self, seconds: f32, realtime: bool) -> (Vec<f32>, Vec<f32>) {
const SAMPLE_RATE : f32 = 44100.0;
self.set_sample_rate(SAMPLE_RATE);
self.process_graph_updates();
let mut nframes = (seconds * SAMPLE_RATE) as usize;
let input = vec![0.0; nframes];
let mut output_l = vec![0.0; nframes];
let mut output_r = vec![0.0; nframes];
for i in 0..nframes {
output_l[i] = 0.0;
output_r[i] = 0.0;
}
let mut offs = 0;
while nframes > 0 {
let cur_nframes =
if nframes >= crate::dsp::MAX_BLOCK_SIZE {
crate::dsp::MAX_BLOCK_SIZE
} else {
nframes
};
nframes -= cur_nframes;
let mut context = crate::Context {
nframes: cur_nframes,
output: &mut [&mut output_l[offs..(offs + cur_nframes)],
&mut output_r[offs..(offs + cur_nframes)]],
input: &[&input[offs..(offs + cur_nframes)]],
};
self.process(&mut context);
if realtime {
let micros =
((crate::dsp::MAX_BLOCK_SIZE as u64) * 1000000)
/ (SAMPLE_RATE as u64);
std::thread::sleep(
std::time::Duration::from_micros(micros));
}
offs += cur_nframes;
}
(output_l, output_r)
}
}