Skip to content

Streaming State Management

Architecture for real-time hardware control via Marlin firmware.

Overview

The FiberPath GUI streaming system enables direct G-code execution on Marlin-compatible 3-axis winding machines via serial connection. The architecture uses a persistent Python subprocess with zero-lag progress reporting and refined state management.

Architecture

┌─────────────────────────────────────┐
│  StreamPanel (React)                │  User interactions
│  - Connection UI                    │  - Play/Pause/Cancel buttons
│  - Manual control                   │  - File selection
│  - Progress display                 │  - Command log
└──────────┬──────────────────────────┘
           │ invoke() + listen()
           ▼
┌─────────────────────────────────────┐
│  Tauri Event System                 │  Async pub/sub
│  - stream-progress                  │
│  - stream-complete                  │
│  - stream-error                     │
└──────────┬──────────────────────────┘
           │
           ▼
┌─────────────────────────────────────┐
│  MarlinState (marlin.rs)            │  Rust state manager
│  - Child process                    │  - Request router
│  - stdin writer                     │  - stdout reader
│  - Response correlation             │
└──────────┬──────────────────────────┘
           │ subprocess
           ▼
┌─────────────────────────────────────┐
│  fiberpath stream CLI               │  Python subprocess
│  - JSON protocol                    │  - Serial I/O
│  - Marlin protocol handler          │  - Queue management
│  - Zero-lag progress                │
└──────────┬──────────────────────────┘
           │ serial port
           ▼
┌─────────────────────────────────────┐
│  Marlin Firmware                    │  Hardware controller
│  - G-code parser                    │
│  - Motion control                   │
│  - "ok" responses                   │
└─────────────────────────────────────┘

Rust State Manager

MarlinState Structure

pub struct MarlinState {
    process: Option<Child>,
    stdin: Option<ChildStdin>,
    router: ResponseRouter,
}

Components:

  • process: Python subprocess handle
  • stdin: Write channel for sending commands
  • router: Routes responses to waiting handlers

Response Router

struct ResponseRouter {
    next_request_id: Arc<AtomicU64>,
    pending_responses: Arc<Mutex<HashMap<u64, oneshot::Sender<MarlinResponse>>>>,
}

Mechanism:

  • Single Reader: One thread reads all stdout
  • Request Correlation: Commands get unique IDs, responses match back
  • Event Broadcasting: Progress updates emitted to frontend
  • No Race Conditions: Owned stdout reader prevents concurrent access

Pattern:

// Spawn reader thread on startup
fn spawn_reader(&self, stdout: ChildStdout, app: AppHandle) {
    std::thread::spawn(move || {
        let reader = BufReader::new(stdout);
        for line in reader.lines() {
            let response: MarlinResponse = serde_json::from_str(&line)?;

            if let Some(req_id) = response.request_id() {
                // Route to waiting handler
                pending_responses.remove(&req_id).unwrap().send(response);
            } else {
                // Broadcast as event
                app.emit("stream-progress", &response);
            }
        }
    });
}

Response Types

#[derive(Serialize, Deserialize)]
#[serde(tag = "status")]
pub enum MarlinResponse {
    Ok { ports: Option<Vec<SerialPort>>, ... },
    Connected { port: String, baud_rate: u32, ... },
    Disconnected { message: Option<String>, ... },
    Streaming { file: String, total_commands: usize, ... },
    Progress { commands_sent: usize, commands_total: usize, command: String, ... },
    Complete { commands_sent: usize, commands_total: usize },
    Paused { ... },
    Resumed { ... },
    Stopped { disconnected: bool, ... },
    Cancelled { ... },
    Error { code: String, message: String, ... },
}

Discriminated Union: status field determines variant (JSON tagged enum).

Command Flow

1. Connection

Frontend:

await invoke("marlin_connect", { port: "COM3", baudRate: 115200 });

Rust:

#[tauri::command]
async fn marlin_connect(
    state: tauri::State<'_, MarlinStateWrapper>,
    port: String,
    baud_rate: u32,
) -> Result<MarlinResponse, String> {
    let mut state = state.0.lock().await;
    state.connect(port, baud_rate).await
}

Flow:

  1. Spawn Python subprocess: fiberpath stream --json
  2. Start stdout reader thread
  3. Send {"command": "connect", "port": "COM3", "baudRate": 115200, "requestId": 1}
  4. Wait for {"status": "connected", "requestId": 1, ...}
  5. Return to frontend

2. Manual Command

Frontend:

await invoke("marlin_send_command", { command: "G28" });

Rust:

#[tauri::command]
async fn marlin_send_command(
    state: tauri::State<'_, MarlinStateWrapper>,
    command: String,
) -> Result<MarlinResponse, String> {
    let mut state = state.0.lock().await;
    state.send_command(command).await
}

Flow:

  1. Write to stdin: {"command": "send", "gcode": "G28", "requestId": 2}
  2. Wait for {"status": "ok", "requestId": 2, "responses": ["ok"]}
  3. Return responses to frontend

3. File Streaming

Frontend:

await invoke("marlin_stream_file", {
  filePath: "/path/to/output.gcode",
  dryRun: false,
});

// Listen for progress
const unlisten = await listen("stream-progress", (event) => {
  const { commandsSent, commandsTotal } = event.payload;
  console.log(`${commandsSent}/${commandsTotal}`);
});

Rust:

#[tauri::command]
async fn marlin_stream_file(
    state: tauri::State<'_, MarlinStateWrapper>,
    file_path: String,
    dry_run: bool,
) -> Result<MarlinResponse, String> {
    let mut state = state.0.lock().await;
    state.stream_file(file_path, dry_run).await
}

Flow:

  1. Write to stdin: {"command": "stream", "file": "output.gcode", "dryRun": false, "requestId": 3}
  2. Receive {"status": "streaming", "file": "...", "totalCommands": 5000, "requestId": 3}
  3. Zero-lag progress: Reader thread emits progress events directly:
  4. {"status": "progress", "commandsSent": 100, "commandsTotal": 5000, ...} (no requestId)
  5. Frontend updates progress bar in real-time
  6. On completion: {"status": "complete", "commandsSent": 5000, "commandsTotal": 5000}

4. Pause/Resume/Cancel

Pause:

await invoke("marlin_pause");
state.send_request(MarlinRequest::Pause).await

Sends: {"command": "pause", "requestId": 4}

Resume:

await invoke("marlin_resume");

Sends: {"command": "resume", "requestId": 5}

Cancel (v0.5.0):

await invoke("marlin_cancel");

Sends: {"command": "cancel", "requestId": 6}

Behavior:

  • Stop: Clears queue, disconnects from serial port
  • Cancel: Clears queue, keeps connection alive (orange button when paused)

Zero-Lag Progress (v0.5.0)

Problem

Original architecture had progress lag:

Frontend polls → Rust queries Python → Python responds → Rust returns → Frontend updates

Latency: ~200-500ms per update.

Solution

Shared state polling in Python subprocess:

# Python CLI (simplified)
class StreamingState:
    commands_sent: int
    commands_total: int
    current_command: str

# Streaming thread
while commands:
    send_command(commands[i])
    state.commands_sent = i + 1
    state.current_command = commands[i]

# Progress reporter thread (separate)
while streaming:
    emit_json({
        "status": "progress",
        "commandsSent": state.commands_sent,
        "commandsTotal": state.commands_total,
        "command": state.current_command,
    })
    sleep(0.1)  # 100ms update interval

Result: Progress updates arrive continuously without frontend polling.

Frontend Integration

Event Listeners

import { listen } from "@tauri-apps/api/event";

// Progress updates
const unlistenProgress = await listen<ProgressPayload>(
  "stream-progress",
  (event) => {
    setProgress(event.payload.commandsSent / event.payload.commandsTotal);
    setCurrentCommand(event.payload.command);
  }
);

// Completion
const unlistenComplete = await listen<CompletePayload>(
  "stream-complete",
  (event) => {
    console.log("Streaming complete:", event.payload);
    setIsStreaming(false);
  }
);

// Errors
const unlistenError = await listen<ErrorPayload>("stream-error", (event) => {
  console.error("Streaming error:", event.payload.message);
  showErrorDialog(event.payload.message);
});

// Cleanup on unmount
return () => {
  unlistenProgress();
  unlistenComplete();
  unlistenError();
};

React Component State

React component (pseudocode):

function StreamPanel() {
  const [isConnected, setIsConnected] = useState(false);
  const [isStreaming, setIsStreaming] = useState(false);
  const [isPaused, setIsPaused] = useState(false);
  const [progress, setProgress] = useState(0);
  const [currentCommand, setCurrentCommand] = useState('');

  const connect = async () => {
    const response = await invoke('marlin_connect', { port, baudRate });
    if (response.status === 'connected') {
      setIsConnected(true);
    }
  };

  const streamFile = async () => {
    const response = await invoke('marlin_stream_file', { filePath, dryRun: false });
    if (response.status === 'streaming') {
      setIsStreaming(true);
    }
  };

  const pause = async () => {
    await invoke('marlin_pause');
    setIsPaused(true);
  };

  const resume = async () => {
    await invoke('marlin_resume');
    setIsPaused(false);
  };

  const cancel = async () => {
    await invoke('marlin_cancel');
    setIsStreaming(false);
    setIsPaused(false);
    setProgress(0);
  };

  useEffect(() => {
    const unlisten = listen('stream-progress', (e) => {
      setProgress(e.payload.commandsSent / e.payload.commandsTotal);
      setCurrentCommand(e.payload.command);
    });
    return () => unlisten();
  }, []);

  return (
    <div>
      {!isConnected && <button onClick={connect}>Connect</button>}
      {isConnected && !isStreaming && <button onClick={streamFile}>Start</button>}
      {isStreaming && !isPaused && <button onClick={pause}>Pause</button>}
      {isStreaming && isPaused && (
        <>
          <button onClick={resume}>Resume</button>
          <button onClick={cancel}>Cancel</button>
        </>
      )}
      <progress value={progress} />
      <div>Current: {currentCommand}</div>
    </div>
  );
}

Error Handling

Connection Errors

MarlinResponse::Error {
    code: "CONNECTION_FAILED",
    message: "Unable to open serial port: Access denied",
}

Common Causes:

  • Port not found
  • Port in use by another application
  • Permissions (Linux: not in dialout group)

Streaming Errors

MarlinResponse::Error {
    code: "STREAM_FAILED",
    message: "Marlin returned 'error' for command: G1 X1000000",
}

Common Causes:

  • Invalid G-code
  • Out-of-bounds movement
  • Homing not performed

v0.5.0 Enhancements

Cancel Job

Before (v4.0): Only "Stop" button (emergency stop, disconnects).

After (v0.5.0):

  • Stop: Emergency stop + disconnect (red button, always visible)
  • Cancel: Graceful cancel + keep connection (orange button, only when paused)

Use Cases:

  • Cancel: Realize mistake mid-job, stop gracefully, stay connected for manual recovery
  • Stop: Emergency condition, disconnect immediately

Application State Machine

Refined State Handling:

  • Clean state after stop/cancel/reconnect
  • Clear selected file anytime (manual file control)
  • Prevent double-connections
  • Handle disconnect during streaming

State Machine:

Disconnected → Connect → Connected
Connected → Stream → Streaming
Streaming → Pause → Paused
Paused → Resume → Streaming
Paused → Cancel → Connected
Any → Stop → Disconnected
Any → Disconnect → Disconnected

Testing

Manual Testing Checklist

  • [ ] Connect to serial port
  • [ ] Send manual G-code (G28, G1 X10)
  • [ ] Stream small file (<100 commands)
  • [ ] Stream large file (1000+ commands)
  • [ ] Pause mid-stream
  • [ ] Resume after pause
  • [ ] Cancel after pause (connection stays alive)
  • [ ] Stop during streaming (disconnects)
  • [ ] Disconnect and reconnect
  • [ ] Handle connection failures gracefully

Dry Run Mode

await invoke("marlin_stream_file", { filePath, dryRun: true });

Behavior: Simulates streaming without serial connection. Useful for:

  • Testing progress updates
  • Validating G-code file
  • UI testing without hardware

Performance

Update Frequency

  • Progress: 10 Hz (every 100ms)
  • Command logging: All commands (no throttling)

Memory

  • Command queue: Held in Python process memory
  • Log buffer: Limited to last 1000 commands in frontend

Latency

  • Command → Response: ~10-50ms (serial + Marlin processing)
  • Progress → UI: ~100ms (update interval)

Troubleshooting

No progress updates

Check: Ensure stream-progress listener is attached before starting stream.

Progress lags behind

Cause: Frontend re-rendering too often.

Fix: Use shallow selectors, memoize expensive computations.

Commands not executing

Check: Marlin firmware homed? (G28)

Connection timeout

Increase: Baud rate or check USB cable quality.

Next Steps