--- title: "nanonext - Messaging and Async I/O" vignette: > %\VignetteIndexEntry{nanonext - Messaging and Async I/O} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### 1. Cross-language Exchange `nanonext` provides a fast, reliable data interface between different programming languages where NNG has an implementation, including C, C++, Java, Python, Go, and Rust. This messaging interface is lightweight, robust, and has limited points of failure. It enables: - Communication between processes in the same or different languages - Distributed computing across networks or on the same machine - Real-time data pipelines where computation times exceed data frequency - Modular software design following Unix philosophy This example demonstrates numerical data exchange between R and Python (NumPy). Create socket in Python using the NNG binding 'pynng': ``` python import numpy as np import pynng socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket") ``` Create nano object in R using `nanonext`, then send a vector of 'doubles', specifying mode as 'raw': ``` r library(nanonext) n <- nano("pair", dial = "ipc:///tmp/nanonext.socket") n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw") #> [1] 0 ``` Receive in Python as a NumPy array of 'floats', and send back to R: ``` python raw = socket.recv() array = np.frombuffer(raw) print(array) #> [1.1 2.2 3.3 4.4 5.5] msg = array.tobytes() socket.send(msg) socket.close() ``` Receive in R, specifying the receive mode as 'double': ``` r n$recv(mode = "double") #> [1] 1.1 2.2 3.3 4.4 5.5 n$close() ``` ### 2. Async and Concurrency `nanonext` implements true async send and receive, leveraging NNG as a massively-scalable concurrency framework. ``` r s1 <- socket("pair", listen = "inproc://nano") s2 <- socket("pair", dial = "inproc://nano") ``` `send_aio()` and `recv_aio()` return immediately with an 'Aio' object that performs operations asynchronously. Aio objects return an unresolved value while the operation is ongoing, then automatically resolve once complete. ``` r # async receive requested, but no messages waiting yet msg <- recv_aio(s2) msg #> < recvAio | $data > msg$data #> 'unresolved' logi NA ``` For 'sendAio' objects, the result is stored at `$result`. For 'recvAio' objects, the message is stored at `$data`. ``` r res <- send_aio(s1, data.frame(a = 1, b = 2)) res #> < sendAio | $result > res$result #> [1] 0 ``` > 0 indicates successful send - the message has been accepted by the socket for sending but may still be buffered within the system. ``` r # once a message is sent, the recvAio resolves automatically msg$data #> a b #> 1 1 2 ``` Use `unresolved()` in control flow to perform actions before or after Aio resolution without blocking. ``` r msg <- recv_aio(s2) # unresolved() checks resolution status while (unresolved(msg)) { # perform other tasks while waiting send_aio(s1, "resolved") cat("unresolved") } #> unresolved # access resolved value msg$data #> [1] "resolved" ``` Explicitly wait for completion with `call_aio()` (blocking). ``` r # wait for completion and return resolved Aio call_aio(msg) # access resolved value (waiting if required): call_aio(msg)$data #> [1] "resolved" # or directly: collect_aio(msg) #> [1] "resolved" # or user-interruptible: msg[] #> [1] "resolved" close(s1) close(s2) ``` ### 3. Synchronisation Primitives `nanonext` implements cross-platform synchronisation primitives from the NNG library, enabling synchronisation between NNG events and the main R execution thread. Condition variables can signal events such as asynchronous receive completions and pipe events (connections established or dropped). Each condition variable has a value (counter) and flag (boolean). Signals increment the value; successful `wait()` or `until()` calls decrement it. A non-zero value allows waiting threads to continue. This approach is more efficient than polling - consuming no resources while waiting and synchronising with zero latency. **Example 1: Wait for connection** ``` r sock <- socket("pair", listen = "inproc://nanopipe") cv <- cv() cv_value(cv) #> [1] 0 pipe_notify(sock, cv = cv, add = TRUE, remove = TRUE) # wait(cv) would block until connection established # for illustration: sock2 <- socket("pair", dial = "inproc://nanopipe") cv_value(cv) # incremented when pipe created #> [1] 1 wait(cv) # does not block as cv value is non-zero cv_value(cv) # decremented by wait() #> [1] 0 close(sock2) cv_value(cv) # incremented when pipe destroyed #> [1] 1 close(sock) ``` **Example 2: Wait for message or disconnection** ``` r sock <- socket("pair", listen = "inproc://nanosignal") sock2 <- socket("pair", dial = "inproc://nanosignal") cv <- cv() cv_value(cv) #> [1] 0 pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE) send(sock2, "this message will wake waiting thread") #> [1] 0 r <- recv_aio(sock, cv = cv) # wakes when async receive completes wait(cv) || stop("peer disconnected") #> [1] TRUE r$data #> [1] "this message will wake waiting thread" close(sock) close(sock2) ``` When `flag = TRUE` is set for pipe notifications, `wait()` returns FALSE for pipe events (rather than TRUE for message events). This distinguishes between disconnections and successful receives, something not possible using `call_aio()` alone. This mechanism enables waiting simultaneously on multiple events while distinguishing between them. `pipe_notify()` can signal up to two condition variables per event for additional flexibility in concurrent applications.