Data Processing
Common patterns for processing sensor data in Arc
Arc programs typically process sensor data: converting units, smoothing noise, combining readings, detecting changes. This guide covers the patterns you’ll use for these tasks.
Basic Flows
A flow connects a source to a destination through a function:
temperature -> to_celsius{} -> temperature_celsius When temperature receives new data, Arc runs to_celsius and writes the result to
temperature_celsius. You don’t write loops or polling code.
The function:
func to_celsius(f f64) f64 {
return (f - 32.0) * 5.0 / 9.0
} The {} after to_celsius instantiates the function as a node in the dataflow graph.
The input comes from the flow (the value of temperature), not from parentheses.
Config Parameters
Config parameters make functions reusable. They’re set when you instantiate the function, not when data flows through:
func scale{factor f64}(value f64) f64 {
return value * factor
}
pressure -> scale{factor=2.0} -> pressure_doubled
temperature -> scale{factor=0.5} -> temperature_halved Both flows use the same function with different scaling factors. Config parameters
appear in {} after the function name. Input parameters appear in () and receive data
from the flow.
Use config parameters for:
- Calibration constants (gain, offset)
- Thresholds
- Conversion factors
- Anything that varies per sensor but stays constant during execution
func linear_scale{gain f64, offset f64}(raw f64) f64 {
return raw * gain + offset
}
pressure_raw -> linear_scale{gain=100.0, offset=5.0} -> pressure_psi Expression Flows
Simple transformations don’t need a separate function. Write the expression directly in the flow:
temperature * 9.0 / 5.0 + 32.0 -> temperature_fahrenheit This creates an implicit function that runs whenever temperature updates.
Expressions can include comparisons:
pressure > 500 -> pressure_high Now pressure_high outputs 1 when pressure exceeds 500, 0 otherwise.
You can combine channels:
(pressure_1 + pressure_2) / 2.0 -> pressure_avg Expression flows execute when any referenced channel updates. If pressure_1 and
pressure_2 update at different times, the average recalculates on each update using
the latest values from both.
Stateful Variables
Some calculations need to remember values across executions. Use $= to declare a
stateful variable:
func counter() i64 {
count $= 0
count = count + 1
return count
} The first time this runs, count is 0. After incrementing, it’s 1. The next execution
starts with count at 1, increments to 2, and so on. The $= operator sets the initial
value only on the first execution. After that, the variable retains its value.
Compare with :=, which resets every execution:
func broken_counter() i64 {
count := 0 // resets to 0 every time
count = count + 1
return count // always returns 1
} For a deeper explanation, see Stateful Variables.
Tracking Previous Values
A common pattern is comparing the current value to the previous one. Initialize the stateful variable to the input parameter:
func delta(value f64) f64 {
prev $= value
d := value - prev
prev = value
return d
}
pressure -> delta{} -> pressure_change On the first execution, prev is set to value, so d is 0. On subsequent executions,
prev holds the previous value and d is the actual change.
This works because $= evaluates its right-hand side only on the first execution. After
that, prev retains whatever was assigned to it.
Running Calculations
Accumulators track totals or counts over time:
func running_avg(value f64) f64 {
total $= 0.0
count $= 0
total = total + value
count = count + 1
return total / f64(count)
}
temperature -> running_avg{} -> temperature_avg Each execution adds to total and increments count. The average includes all values
seen so far.
For a running maximum:
func running_max(value f64) f64 {
max $= value
if value > max {
max = value
}
return max
} Initializing max $= value means the first reading becomes the initial maximum.
Exponential Moving Average
An exponential moving average (EMA) smooths noisy data while responding to changes. It weights recent values more heavily:
func ema{alpha f64}(value f64) f64 {
avg $= value
avg = alpha * value + (1.0 - alpha) * avg
return avg
}
pressure -> ema{alpha=0.2} -> pressure_smooth The alpha parameter controls responsiveness:
- Lower alpha (0.1): Heavy smoothing, slow response
- Higher alpha (0.5): Light smoothing, fast response
Multi-Input Functions
When a function needs multiple sensor values, pass the channels as config parameters:
func pressure_diff{
inlet chan f64,
outlet chan f64
}() f64 {
p1 := inlet
p2 := outlet
return p1 - p2
} Inside the function, inlet and outlet read the latest value from those channels.
But what triggers this function? It has no input parameter to receive flow data. Use
interval to run it on a schedule:
interval{period=50ms} -> pressure_diff{inlet=inlet_pressure, outlet=outlet_pressure} -> delta_p Every 50ms, Arc runs pressure_diff, which reads both pressure channels and outputs the
difference.
Functions with only config parameters (no input parameters) need something to trigger
them. Use interval for periodic execution.
Flow-Driven vs Interval-Driven
Two ways to trigger execution:
Flow-driven: The source channel triggers execution when it updates.
temperature -> to_celsius{} -> temperature_celsius Good when you want to process every sample from a sensor.
Interval-driven: A timer triggers execution at a fixed rate.
interval{period=100ms} -> read_sensors{} -> output Good when:
- The function reads multiple channels (no single source to trigger it)
- You want a consistent sample rate regardless of when sensors update
- You’re implementing a control loop that should run at a fixed frequency
Reading Channels in Functions
Inside a function body, reading a channel returns its latest value immediately:
func check_limits{sensor chan f64, limit f64}() u8 {
value := sensor // reads latest value, doesn't block
if value > limit {
return 1
}
return 0
} This is different from flow-driven execution. In a flow like
sensor -> process{} -> output, the function receives data pushed through the flow.
Inside a function body, reading a channel pulls the current value.
If nothing has been written to the channel yet, reading returns zero.
Control Flow
Use if/else for conditional logic:
func clamp{min f64, max f64}(value f64) f64 {
if value < min {
return min
}
if value > max {
return max
}
return value
} For more complex decisions, chain conditions:
func categorize(value f64) i64 {
if value < 100 {
return 0
} else if value < 500 {
return 1
} else if value < 900 {
return 2
}
return 3
} Sensor Voting
When you have redundant sensors, use voting to reject outliers. A median of three readings ignores a single faulty sensor:
func median3{
a chan f64,
b chan f64,
c chan f64
}() f64 {
va := a
vb := b
vc := c
// Find the middle value
if (va >= vb and va <= vc) or (va >= vc and va <= vb) {
return va
}
if (vb >= va and vb <= vc) or (vb >= vc and vb <= va) {
return vb
}
return vc
}
interval{period=50ms} -> median3{
a=pressure_1,
b=pressure_2,
c=pressure_3
} -> pressure_voted If one sensor fails and reads 0 while the others read 500, the median is 500. The faulty reading is ignored.
Rate of Change
Detecting how fast a value is changing requires tracking the previous value and dividing by time:
func rate{dt_ms f64}(value f64) f64 {
prev $= value
d := value - prev
prev = value
dt_s := dt_ms / 1000.0
return d / dt_s
}
pressure -> rate{dt_ms=50.0} -> pressure_rate The dt_ms config parameter is the expected time between samples in milliseconds. The
output is in units per second (e.g., psi/second if pressure is in psi).
The dt_ms parameter should match your actual sample rate. If the sensor updates
every 50ms, use dt_ms=50.0. Mismatched timing gives incorrect rate values.
Putting It Together
Here’s a complete pipeline that reads a pressure sensor, converts units, smooths the signal, computes rate of change, and outputs everything:
// Unit conversion: voltage to psi
func volts_to_psi{v_min f64, v_max f64, p_max f64}(v f64) f64 {
ratio := (v - v_min) / (v_max - v_min)
return ratio * p_max
}
// Exponential moving average
func ema{alpha f64}(value f64) f64 {
avg $= value
avg = alpha * value + (1.0 - alpha) * avg
return avg
}
// Rate of change
func rate{dt_ms f64}(value f64) f64 {
prev $= value
d := value - prev
prev = value
return d / (dt_ms / 1000.0)
}
// Pipeline: raw voltage -> psi -> smoothed -> rate
pressure_raw -> volts_to_psi{v_min=0.5, v_max=4.5, p_max=1000.0} -> pressure_psi
pressure_psi -> ema{alpha=0.3} -> pressure_smooth
pressure_smooth -> rate{dt_ms=50.0} -> pressure_rate
// Also output a high-pressure flag
pressure_smooth > 800 -> pressure_high Each stage feeds the next. When pressure_raw updates:
volts_to_psiconverts to psiemasmooths the resultratecomputes how fast the smoothed value is changing- The comparison outputs 1 if pressure exceeds 800
The raw value flows through the entire pipeline automatically.