diff --git a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp index 61a48e6..fbfa53c 100644 --- a/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp +++ b/axiomatic_adapter/include/axiomatic_adapter/axiomatic_adapter.hpp @@ -51,6 +51,12 @@ class AxiomaticAdapter : public std::enable_shared_from_this static constexpr std::chrono::milliseconds DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS{100}; static constexpr std::chrono::milliseconds JOIN_RECEPTION_TIMEOUT_MS{100}; + /// @brief Returned from receive() when a TCP read contained only non-CAN-Stream protocol + /// messages (heartbeats, status responses, unknown IDs). Not a real error — the reception + /// thread skips it silently. External callers using receive() directly can compare against + /// this constant to distinguish "no CAN frame this round" from a genuine socket error. + static constexpr const char * NON_FRAME_PROTOCOL_MESSAGE = "axiomatic: non-CAN-Stream protocol message consumed"; + /// @brief AxiomaticAdapter Class Init /// @param ip_address Axiomatic Device IP address to connect /// @param port Axiomatic Device Port to connect to diff --git a/axiomatic_adapter/src/axiomatic_adapter.cpp b/axiomatic_adapter/src/axiomatic_adapter.cpp index 8fb1491..b3809d8 100644 --- a/axiomatic_adapter/src/axiomatic_adapter.cpp +++ b/axiomatic_adapter/src/axiomatic_adapter.cpp @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include #include @@ -112,7 +114,21 @@ class AxiomaticAdapter::AxiomaticAdapterImpl return false; } + // Disable Nagle's algorithm. With the async send worker, the calling + // thread is never blocked on write() latency, and we'd rather have small + // frequent TCP segments than bursty coalescing — the device's delayed-ACK + // behaviour otherwise produces tens-to-hundreds of milliseconds of + // segment stalls under sustained CAN traffic. Failure is non-fatal. + { + boost::system::error_code nd_ec; + tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec); + if (nd_ec) { + std::cerr << "[Axiomatic] Failed to set TCP_NODELAY: " << nd_ec.message() << std::endl; + } + } + socket_state_ = TCPSocketState::OPEN; + startSendWorker(); return true; } catch (std::exception & e) { std::cerr << "Connection failed (exception): " << e.what() << std::endl; @@ -124,6 +140,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl bool closeSocket() { if (socket_state_ != TCPSocketState::CLOSED) { + stopSendWorker(); + boost::system::error_code error_code; tcp_socket_.close(error_code); @@ -154,6 +172,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl if (!error) { receive_callback_(std::make_unique(frame)); + } else if (*error == AxiomaticAdapter::NON_FRAME_PROTOCOL_MESSAGE) { + // Heartbeat / status response / other non-CAN-Stream protocol traffic + // was consumed but yielded no CAN frame to deliver. Not a real error. } else { error_callback_(*error); } @@ -223,61 +244,81 @@ class AxiomaticAdapter::AxiomaticAdapterImpl } // --- Process the received data --- + // + // Walk the buffer looking for an Axiomatic protocol message we can decode + // into a CAN frame. Any non-CAN-Stream messages we encounter at the front + // (heartbeats, status responses, CAN FD stream, unknown IDs) are skipped + // using their declared Message Data Length so a CAN frame that follows in + // the same TCP read isn't dropped. If the buffer contains only non-frame + // protocol traffic, return the NON_FRAME_PROTOCOL_MESSAGE sentinel — the + // reception thread treats this as "no frame this round" rather than a + // socket error. + size_t pos = 0; + while (pos + HEADER_BYTES <= data.size()) { + if (!std::equal(AXIOMATIC_SYNC_PREFIX.begin(), AXIOMATIC_SYNC_PREFIX.end(), data.begin() + pos)) { + return std::make_optional("Not a valid Axiomatic message."); + } - // Check size, header info and message type - if (data.size() < AXIOMATIC_CAN_MESSAGE_HEADER.size() + 5) { - return std::make_optional("Data too short for header and control byte."); - } - if (!std::equal(AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end(), data.begin())) { - return std::make_optional("Not a valid CAN message."); - } + uint16_t msg_id = static_cast(data[pos + 6]) | (static_cast(data[pos + 7]) << 8); + uint16_t decl_data_len = static_cast(data[pos + 9]) | (static_cast(data[pos + 10]) << 8); + size_t msg_total = static_cast(HEADER_BYTES) + decl_data_len; + + if (msg_id != MSG_ID_CAN_STREAM_DEPRECATED) { + // Consume this non-CAN-Stream message and look for the next one. + // Declared length matches on-wire footprint for heartbeats / status + // responses per the protocol spec (verified by hardware capture for + // heartbeats specifically). If we'd overrun the buffer, advance to + // the end so the outer loop exits cleanly. + pos += (msg_total <= data.size() - pos) ? msg_total : (data.size() - pos); + continue; + } - uint8_t control_byte = data[11]; - // Extract timestamp size (bits 6 & 5) - size_t timestamp_size = (control_byte & 0x60) >> 5; - // Check if the frame is extended (bit 4) - bool is_can_extended = (control_byte & 0x10) >> 4; - // Extract CAN frame length (lower 4 bits) - size_t can_length = control_byte & 0x0F; - - // Determine where the CAN ID starts (after timestamp bytes) - size_t can_id_start = 12 + timestamp_size; - uint32_t can_id = 0; - size_t can_data_start = 0; - - // Ensure data is large enough for CAN ID extraction - size_t min_id_size = is_can_extended ? 4 : 2; - if (data.size() < can_id_start + min_id_size) { - return std::make_optional("Data too short for CAN ID."); - } + // CAN Stream (Message ID 1). Decode using the Control Byte at byte 11. + uint8_t control_byte = data[pos + 11]; + size_t timestamp_size = (control_byte & 0x60) >> 5; + bool is_can_extended = (control_byte & 0x10) >> 4; + size_t can_length = control_byte & 0x0F; - // Extract CAN ID (little-endian) - if (!is_can_extended) { - can_id = static_cast(data[can_id_start] | (data[can_id_start + 1] << 8)); - can_data_start = can_id_start + 2; - } else { - can_id = static_cast( - data[can_id_start] | (data[can_id_start + 1] << 8) | (data[can_id_start + 2] << 16) | - (data[can_id_start + 3] << 24)); - can_data_start = can_id_start + 4; - can_frame.set_id_as_extended(); - } + size_t can_id_start = pos + 12 + timestamp_size; + size_t min_id_size = is_can_extended ? 4 : 2; + if (data.size() < can_id_start + min_id_size) { + return std::make_optional("Data too short for CAN ID."); + } - // Ensure data is large enough for CAN payload - if (data.size() < can_data_start + can_length) { - return std::make_optional("Data too short for CAN payload."); - } + uint32_t can_id = 0; + size_t can_data_start = 0; + if (!is_can_extended) { + can_id = static_cast(data[can_id_start] | (data[can_id_start + 1] << 8)); + can_data_start = can_id_start + 2; + } else { + can_id = static_cast( + data[can_id_start] | (data[can_id_start + 1] << 8) | (data[can_id_start + 2] << 16) | + (data[can_id_start + 3] << 24)); + can_data_start = can_id_start + 4; + can_frame.set_id_as_extended(); + } - // Extract CAN data (zero-padded to 8 bytes) - std::array can_data = {0}; - std::copy_n(data.begin() + can_data_start, can_length, can_data.begin()); + if (data.size() < can_data_start + can_length) { + return std::make_optional("Data too short for CAN payload."); + } - // Set CAN frame properties - can_frame.set_can_id(can_id); - can_frame.set_len(can_length); - can_frame.set_data(can_data); + std::array can_data = {0}; + std::copy_n(data.begin() + can_data_start, can_length, can_data.begin()); - return std::nullopt; + can_frame.set_can_id(can_id); + can_frame.set_len(can_length); + can_frame.set_data(can_data); + + return std::nullopt; + } + + // We exhausted the buffer without ever finding a CAN Stream message. + // Either pos < HEADER_BYTES of remaining bytes (insufficient to parse a + // header) or we walked past everything skipping non-CAN-Stream messages. + if (data.size() < HEADER_BYTES) { + return std::make_optional("Data too short for header and control byte."); + } + return std::make_optional(AxiomaticAdapter::NON_FRAME_PROTOCOL_MESSAGE); } std::optional receive() @@ -308,11 +349,14 @@ class AxiomaticAdapter::AxiomaticAdapterImpl control_byte |= (is_extended ? (1 << 4) : 0); control_byte |= (frame_data_length & 0x0F); - // initialize the full message with the header, control bytes, timestamp bytes + // initialize the full message with the header, control bytes, timestamp bytes. + // Header layout: 6-byte sync prefix + 2-byte Message ID (LSB first) + 1-byte + // Message Version + 2-byte Message Data Length (LSB first) = 11 bytes total. std::vector full_message; - full_message.insert(full_message.end(), AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end()); - full_message.push_back(0x00); - full_message.push_back(0x00); + full_message.insert(full_message.end(), AXIOMATIC_SYNC_PREFIX.begin(), AXIOMATIC_SYNC_PREFIX.end()); + full_message.push_back(static_cast(MSG_ID_CAN_STREAM_DEPRECATED & 0xFF)); + full_message.push_back(static_cast((MSG_ID_CAN_STREAM_DEPRECATED >> 8) & 0xFF)); + full_message.push_back(0x00); // Message Version full_message.push_back(static_cast(message_length & 0xFF)); full_message.push_back(static_cast((message_length >> 8) & 0xFF)); full_message.push_back(control_byte); @@ -327,12 +371,11 @@ class AxiomaticAdapter::AxiomaticAdapterImpl // insert the can frame data full_message.insert(full_message.end(), frame_data.begin(), frame_data.end()); - try { - boost::asio::write(tcp_socket_, boost::asio::buffer(full_message.data(), full_message.size())); - } catch (const std::exception & e) { - return std::optional(std::string("TCP Send Failed: ") + e.what()); - } - return std::nullopt; + // Hand off to the send worker. Returns immediately so the calling thread — + // typically the SocketCAN reception callback — is never blocked on a slow + // or back-pressured TCP write. Actual write errors are reported through + // error_callback_ from the worker thread. + return enqueueSend(std::move(full_message)); } TCPSocketState get_socket_state() @@ -345,8 +388,110 @@ class AxiomaticAdapter::AxiomaticAdapterImpl return thread_running_; } + /// @brief Number of outbound messages dropped because the send queue was full. + /// Reset to zero when the send worker starts. + uint64_t get_send_queue_drops() + { + return send_queue_drops_.load(); + } + private: - static constexpr std::array AXIOMATIC_CAN_MESSAGE_HEADER = {'A', 'X', 'I', 'O', 0xBA, 0x36, 0x01}; + // ------ Async send worker -------------------------------------------------- + // + // send() copies the prebuilt protocol message bytes onto a bounded queue and + // returns immediately. A dedicated worker thread pops messages off the queue + // and does the actual blocking boost::asio::write() against the socket. + // + // This decouples the SocketCAN reception callback (which calls send()) from + // TCP back-pressure — under load the calling thread no longer waits on the + // kernel send buffer, so frames don't pile up in the SocketCAN kernel queue + // and get silently dropped. The cost is bounded user-space memory for the + // outbound queue and best-effort delivery semantics: actual write errors + // surface via error_callback_, not via send()'s return value. + + std::optional enqueueSend(std::vector bytes) + { + if (socket_state_ != TCPSocketState::OPEN) { + return std::make_optional( + "axiomatic: send called while socket not open"); + } + { + std::lock_guard lock(send_queue_mutex_); + if (send_queue_.size() >= SEND_QUEUE_MAX) { + // Drop the oldest queued frame. Matches the kernel SocketCAN buffer's + // overflow semantics in spirit — under sustained back-pressure we keep + // the freshest data and shed the stalest. + send_queue_.pop_front(); + send_queue_drops_.fetch_add(1, std::memory_order_relaxed); + } + send_queue_.push_back(std::move(bytes)); + } + send_queue_cv_.notify_one(); + return std::nullopt; + } + + void startSendWorker() + { + send_worker_stop_.store(false); + send_queue_drops_.store(0); + send_worker_thread_ = std::thread([this]() { sendWorkerLoop(); }); + } + + void stopSendWorker() + { + { + std::lock_guard lock(send_queue_mutex_); + send_worker_stop_.store(true); + } + send_queue_cv_.notify_all(); + if (send_worker_thread_.joinable()) { + send_worker_thread_.join(); + } + // Drop anything that didn't make it onto the wire so a future openSocket() + // doesn't replay stale frames. + std::lock_guard lock(send_queue_mutex_); + send_queue_.clear(); + } + + void sendWorkerLoop() + { + while (true) { + std::vector message; + { + std::unique_lock lock(send_queue_mutex_); + send_queue_cv_.wait(lock, [this]() { return send_worker_stop_.load() || !send_queue_.empty(); }); + if (send_worker_stop_.load() && send_queue_.empty()) { + return; + } + message = std::move(send_queue_.front()); + send_queue_.pop_front(); + } + try { + boost::asio::write(tcp_socket_, boost::asio::buffer(message.data(), message.size())); + } catch (const std::exception & e) { + if (error_callback_) { + error_callback_(std::string("TCP Send Failed: ") + e.what()); + } + } + } + } + +private: + // 6-byte sync prefix common to every Axiomatic protocol message: "AXIO" + Protocol ID 14010 + // (= 0x36BA, little-endian on the wire as 0xBA 0x36). Message ID lives at bytes 6-7 of the + // header and is checked separately so we can distinguish CAN Stream from Heartbeat / Status + // Response / CAN FD Stream / unknown. + static constexpr std::array AXIOMATIC_SYNC_PREFIX = {'A', 'X', 'I', 'O', 0xBA, 0x36}; + + // Message IDs from the Axiomatic protocol spec (v6, April 2025). + static constexpr uint16_t MSG_ID_CAN_STREAM_DEPRECATED = 1; + static constexpr uint16_t MSG_ID_STATUS_RESPONSE = 3; + static constexpr uint16_t MSG_ID_HEARTBEAT = 4; + static constexpr uint16_t MSG_ID_CAN_FD_STREAM = 5; + + // Minimum header bytes needed before we can read Message Data Length (bytes 9-10). + static constexpr size_t HEADER_BYTES = 11; + static constexpr std::chrono::milliseconds TCP_IP_CONNECTION_TIMEOUT_MS{3000}; /// @brief Socket connection state as a struct for the mutex during TCP Open Socket to update the variables together @@ -364,6 +509,15 @@ class AxiomaticAdapter::AxiomaticAdapterImpl std::atomic thread_running_; std::atomic stop_thread_requested_; + // Async send worker state. + static constexpr size_t SEND_QUEUE_MAX = 1024; + std::mutex send_queue_mutex_; + std::condition_variable send_queue_cv_; + std::deque> send_queue_; + std::thread send_worker_thread_; + std::atomic send_worker_stop_{false}; + std::atomic send_queue_drops_{0}; + // from construction std::string ip_address_; std::string port_;