Streaming State Management¶
Architecture for real-time hardware control via Marlin firmware.
Overview¶
The FiberPath GUI streaming system enables direct G-code execution on Marlin-compatible 3-axis winding machines via serial connection. The architecture uses a persistent Python subprocess with zero-lag progress reporting and refined state management.
Architecture¶
┌─────────────────────────────────────┐
│ StreamPanel (React) │ User interactions
│ - Connection UI │ - Play/Pause/Cancel buttons
│ - Manual control │ - File selection
│ - Progress display │ - Command log
└──────────┬──────────────────────────┘
│ invoke() + listen()
▼
┌─────────────────────────────────────┐
│ Tauri Event System │ Async pub/sub
│ - stream-progress │
│ - stream-complete │
│ - stream-error │
└──────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ MarlinState (marlin.rs) │ Rust state manager
│ - Child process │ - Request router
│ - stdin writer │ - stdout reader
│ - Response correlation │
└──────────┬──────────────────────────┘
│ subprocess
▼
┌─────────────────────────────────────┐
│ fiberpath stream CLI │ Python subprocess
│ - JSON protocol │ - Serial I/O
│ - Marlin protocol handler │ - Queue management
│ - Zero-lag progress │
└──────────┬──────────────────────────┘
│ serial port
▼
┌─────────────────────────────────────┐
│ Marlin Firmware │ Hardware controller
│ - G-code parser │
│ - Motion control │
│ - "ok" responses │
└─────────────────────────────────────┘
Rust State Manager¶
MarlinState Structure¶
pub struct MarlinState {
process: Option<Child>,
stdin: Option<ChildStdin>,
router: ResponseRouter,
}
Components:
process: Python subprocess handlestdin: Write channel for sending commandsrouter: Routes responses to waiting handlers
Response Router¶
struct ResponseRouter {
next_request_id: Arc<AtomicU64>,
pending_responses: Arc<Mutex<HashMap<u64, oneshot::Sender<MarlinResponse>>>>,
}
Mechanism:
- Single Reader: One thread reads all stdout
- Request Correlation: Commands get unique IDs, responses match back
- Event Broadcasting: Progress updates emitted to frontend
- No Race Conditions: Owned stdout reader prevents concurrent access
Pattern:
// Spawn reader thread on startup
fn spawn_reader(&self, stdout: ChildStdout, app: AppHandle) {
std::thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines() {
let response: MarlinResponse = serde_json::from_str(&line)?;
if let Some(req_id) = response.request_id() {
// Route to waiting handler
pending_responses.remove(&req_id).unwrap().send(response);
} else {
// Broadcast as event
app.emit("stream-progress", &response);
}
}
});
}
Response Types¶
#[derive(Serialize, Deserialize)]
#[serde(tag = "status")]
pub enum MarlinResponse {
Ok { ports: Option<Vec<SerialPort>>, ... },
Connected { port: String, baud_rate: u32, ... },
Disconnected { message: Option<String>, ... },
Streaming { file: String, total_commands: usize, ... },
Progress { commands_sent: usize, commands_total: usize, command: String, ... },
Complete { commands_sent: usize, commands_total: usize },
Paused { ... },
Resumed { ... },
Stopped { disconnected: bool, ... },
Cancelled { ... },
Error { code: String, message: String, ... },
}
Discriminated Union: status field determines variant (JSON tagged enum).
Command Flow¶
1. Connection¶
Frontend:
await invoke("marlin_connect", { port: "COM3", baudRate: 115200 });
Rust:
#[tauri::command]
async fn marlin_connect(
state: tauri::State<'_, MarlinStateWrapper>,
port: String,
baud_rate: u32,
) -> Result<MarlinResponse, String> {
let mut state = state.0.lock().await;
state.connect(port, baud_rate).await
}
Flow:
- Spawn Python subprocess:
fiberpath stream --json - Start stdout reader thread
- Send
{"command": "connect", "port": "COM3", "baudRate": 115200, "requestId": 1} - Wait for
{"status": "connected", "requestId": 1, ...} - Return to frontend
2. Manual Command¶
Frontend:
await invoke("marlin_send_command", { command: "G28" });
Rust:
#[tauri::command]
async fn marlin_send_command(
state: tauri::State<'_, MarlinStateWrapper>,
command: String,
) -> Result<MarlinResponse, String> {
let mut state = state.0.lock().await;
state.send_command(command).await
}
Flow:
- Write to stdin:
{"command": "send", "gcode": "G28", "requestId": 2} - Wait for
{"status": "ok", "requestId": 2, "responses": ["ok"]} - Return responses to frontend
3. File Streaming¶
Frontend:
await invoke("marlin_stream_file", {
filePath: "/path/to/output.gcode",
dryRun: false,
});
// Listen for progress
const unlisten = await listen("stream-progress", (event) => {
const { commandsSent, commandsTotal } = event.payload;
console.log(`${commandsSent}/${commandsTotal}`);
});
Rust:
#[tauri::command]
async fn marlin_stream_file(
state: tauri::State<'_, MarlinStateWrapper>,
file_path: String,
dry_run: bool,
) -> Result<MarlinResponse, String> {
let mut state = state.0.lock().await;
state.stream_file(file_path, dry_run).await
}
Flow:
- Write to stdin:
{"command": "stream", "file": "output.gcode", "dryRun": false, "requestId": 3} - Receive
{"status": "streaming", "file": "...", "totalCommands": 5000, "requestId": 3} - Zero-lag progress: Reader thread emits progress events directly:
{"status": "progress", "commandsSent": 100, "commandsTotal": 5000, ...}(no requestId)- Frontend updates progress bar in real-time
- On completion:
{"status": "complete", "commandsSent": 5000, "commandsTotal": 5000}
4. Pause/Resume/Cancel¶
Pause:
await invoke("marlin_pause");
state.send_request(MarlinRequest::Pause).await
Sends: {"command": "pause", "requestId": 4}
Resume:
await invoke("marlin_resume");
Sends: {"command": "resume", "requestId": 5}
Cancel (v0.5.0):
await invoke("marlin_cancel");
Sends: {"command": "cancel", "requestId": 6}
Behavior:
- Stop: Clears queue, disconnects from serial port
- Cancel: Clears queue, keeps connection alive (orange button when paused)
Zero-Lag Progress (v0.5.0)¶
Problem¶
Original architecture had progress lag:
Frontend polls → Rust queries Python → Python responds → Rust returns → Frontend updates
Latency: ~200-500ms per update.
Solution¶
Shared state polling in Python subprocess:
# Python CLI (simplified)
class StreamingState:
commands_sent: int
commands_total: int
current_command: str
# Streaming thread
while commands:
send_command(commands[i])
state.commands_sent = i + 1
state.current_command = commands[i]
# Progress reporter thread (separate)
while streaming:
emit_json({
"status": "progress",
"commandsSent": state.commands_sent,
"commandsTotal": state.commands_total,
"command": state.current_command,
})
sleep(0.1) # 100ms update interval
Result: Progress updates arrive continuously without frontend polling.
Frontend Integration¶
Event Listeners¶
import { listen } from "@tauri-apps/api/event";
// Progress updates
const unlistenProgress = await listen<ProgressPayload>(
"stream-progress",
(event) => {
setProgress(event.payload.commandsSent / event.payload.commandsTotal);
setCurrentCommand(event.payload.command);
}
);
// Completion
const unlistenComplete = await listen<CompletePayload>(
"stream-complete",
(event) => {
console.log("Streaming complete:", event.payload);
setIsStreaming(false);
}
);
// Errors
const unlistenError = await listen<ErrorPayload>("stream-error", (event) => {
console.error("Streaming error:", event.payload.message);
showErrorDialog(event.payload.message);
});
// Cleanup on unmount
return () => {
unlistenProgress();
unlistenComplete();
unlistenError();
};
React Component State¶
React component (pseudocode):
function StreamPanel() {
const [isConnected, setIsConnected] = useState(false);
const [isStreaming, setIsStreaming] = useState(false);
const [isPaused, setIsPaused] = useState(false);
const [progress, setProgress] = useState(0);
const [currentCommand, setCurrentCommand] = useState('');
const connect = async () => {
const response = await invoke('marlin_connect', { port, baudRate });
if (response.status === 'connected') {
setIsConnected(true);
}
};
const streamFile = async () => {
const response = await invoke('marlin_stream_file', { filePath, dryRun: false });
if (response.status === 'streaming') {
setIsStreaming(true);
}
};
const pause = async () => {
await invoke('marlin_pause');
setIsPaused(true);
};
const resume = async () => {
await invoke('marlin_resume');
setIsPaused(false);
};
const cancel = async () => {
await invoke('marlin_cancel');
setIsStreaming(false);
setIsPaused(false);
setProgress(0);
};
useEffect(() => {
const unlisten = listen('stream-progress', (e) => {
setProgress(e.payload.commandsSent / e.payload.commandsTotal);
setCurrentCommand(e.payload.command);
});
return () => unlisten();
}, []);
return (
<div>
{!isConnected && <button onClick={connect}>Connect</button>}
{isConnected && !isStreaming && <button onClick={streamFile}>Start</button>}
{isStreaming && !isPaused && <button onClick={pause}>Pause</button>}
{isStreaming && isPaused && (
<>
<button onClick={resume}>Resume</button>
<button onClick={cancel}>Cancel</button>
</>
)}
<progress value={progress} />
<div>Current: {currentCommand}</div>
</div>
);
}
Error Handling¶
Connection Errors¶
MarlinResponse::Error {
code: "CONNECTION_FAILED",
message: "Unable to open serial port: Access denied",
}
Common Causes:
- Port not found
- Port in use by another application
- Permissions (Linux: not in dialout group)
Streaming Errors¶
MarlinResponse::Error {
code: "STREAM_FAILED",
message: "Marlin returned 'error' for command: G1 X1000000",
}
Common Causes:
- Invalid G-code
- Out-of-bounds movement
- Homing not performed
v0.5.0 Enhancements¶
Cancel Job¶
Before (v4.0): Only "Stop" button (emergency stop, disconnects).
After (v0.5.0):
- Stop: Emergency stop + disconnect (red button, always visible)
- Cancel: Graceful cancel + keep connection (orange button, only when paused)
Use Cases:
- Cancel: Realize mistake mid-job, stop gracefully, stay connected for manual recovery
- Stop: Emergency condition, disconnect immediately
Application State Machine¶
Refined State Handling:
- Clean state after stop/cancel/reconnect
- Clear selected file anytime (manual file control)
- Prevent double-connections
- Handle disconnect during streaming
State Machine:
Disconnected → Connect → Connected
Connected → Stream → Streaming
Streaming → Pause → Paused
Paused → Resume → Streaming
Paused → Cancel → Connected
Any → Stop → Disconnected
Any → Disconnect → Disconnected
Testing¶
Manual Testing Checklist¶
- [ ] Connect to serial port
- [ ] Send manual G-code (G28, G1 X10)
- [ ] Stream small file (<100 commands)
- [ ] Stream large file (1000+ commands)
- [ ] Pause mid-stream
- [ ] Resume after pause
- [ ] Cancel after pause (connection stays alive)
- [ ] Stop during streaming (disconnects)
- [ ] Disconnect and reconnect
- [ ] Handle connection failures gracefully
Dry Run Mode¶
await invoke("marlin_stream_file", { filePath, dryRun: true });
Behavior: Simulates streaming without serial connection. Useful for:
- Testing progress updates
- Validating G-code file
- UI testing without hardware
Performance¶
Update Frequency¶
- Progress: 10 Hz (every 100ms)
- Command logging: All commands (no throttling)
Memory¶
- Command queue: Held in Python process memory
- Log buffer: Limited to last 1000 commands in frontend
Latency¶
- Command → Response: ~10-50ms (serial + Marlin processing)
- Progress → UI: ~100ms (update interval)
Troubleshooting¶
No progress updates¶
Check: Ensure stream-progress listener is attached before starting stream.
Progress lags behind¶
Cause: Frontend re-rendering too often.
Fix: Use shallow selectors, memoize expensive computations.
Commands not executing¶
Check: Marlin firmware homed? (G28)
Connection timeout¶
Increase: Baud rate or check USB cable quality.
Next Steps¶
- CLI Integration - How Rust bridges to Python
- State Management - React state patterns
- Marlin Streaming Guide - User documentation