Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class AxiomaticAdapter : public std::enable_shared_from_this<AxiomaticAdapter>
static constexpr std::chrono::milliseconds DEFAULT_SOCKET_RECEIVE_TIMEOUT_MS{100};
static constexpr std::chrono::milliseconds JOIN_RECEPTION_TIMEOUT_MS{100};

/// @brief Returned from receive() when a TCP read contained only non-CAN-Stream protocol
/// messages (heartbeats, status responses, unknown IDs). Not a real error — the reception
/// thread skips it silently. External callers using receive() directly can compare against
/// this constant to distinguish "no CAN frame this round" from a genuine socket error.
static constexpr const char * NON_FRAME_PROTOCOL_MESSAGE = "axiomatic: non-CAN-Stream protocol message consumed";

/// @brief AxiomaticAdapter Class Init
/// @param ip_address Axiomatic Device IP address to connect
/// @param port Axiomatic Device Port to connect to
Expand Down
272 changes: 213 additions & 59 deletions axiomatic_adapter/src/axiomatic_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <future>
#include <iostream>
#include <mutex>
Expand Down Expand Up @@ -112,7 +114,21 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
return false;
}

// Disable Nagle's algorithm. With the async send worker, the calling
// thread is never blocked on write() latency, and we'd rather have small
// frequent TCP segments than bursty coalescing — the device's delayed-ACK
// behaviour otherwise produces tens-to-hundreds of milliseconds of
// segment stalls under sustained CAN traffic. Failure is non-fatal.
{
boost::system::error_code nd_ec;
tcp_socket_.set_option(boost::asio::ip::tcp::no_delay(true), nd_ec);
if (nd_ec) {
std::cerr << "[Axiomatic] Failed to set TCP_NODELAY: " << nd_ec.message() << std::endl;
}
}

socket_state_ = TCPSocketState::OPEN;
startSendWorker();
return true;
} catch (std::exception & e) {
std::cerr << "Connection failed (exception): " << e.what() << std::endl;
Expand All @@ -124,6 +140,8 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
bool closeSocket()
{
if (socket_state_ != TCPSocketState::CLOSED) {
stopSendWorker();

boost::system::error_code error_code;
tcp_socket_.close(error_code);

Expand Down Expand Up @@ -154,6 +172,9 @@ class AxiomaticAdapter::AxiomaticAdapterImpl

if (!error) {
receive_callback_(std::make_unique<polymath::socketcan::CanFrame>(frame));
} else if (*error == AxiomaticAdapter::NON_FRAME_PROTOCOL_MESSAGE) {
// Heartbeat / status response / other non-CAN-Stream protocol traffic
// was consumed but yielded no CAN frame to deliver. Not a real error.
} else {
error_callback_(*error);
}
Expand Down Expand Up @@ -223,61 +244,81 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
}

// --- Process the received data ---
//
// Walk the buffer looking for an Axiomatic protocol message we can decode
// into a CAN frame. Any non-CAN-Stream messages we encounter at the front
// (heartbeats, status responses, CAN FD stream, unknown IDs) are skipped
// using their declared Message Data Length so a CAN frame that follows in
// the same TCP read isn't dropped. If the buffer contains only non-frame
// protocol traffic, return the NON_FRAME_PROTOCOL_MESSAGE sentinel — the
// reception thread treats this as "no frame this round" rather than a
// socket error.
size_t pos = 0;
while (pos + HEADER_BYTES <= data.size()) {
if (!std::equal(AXIOMATIC_SYNC_PREFIX.begin(), AXIOMATIC_SYNC_PREFIX.end(), data.begin() + pos)) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Not a valid Axiomatic message.");
}

// Check size, header info and message type
if (data.size() < AXIOMATIC_CAN_MESSAGE_HEADER.size() + 5) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Data too short for header and control byte.");
}
if (!std::equal(AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end(), data.begin())) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Not a valid CAN message.");
}
uint16_t msg_id = static_cast<uint16_t>(data[pos + 6]) | (static_cast<uint16_t>(data[pos + 7]) << 8);
uint16_t decl_data_len = static_cast<uint16_t>(data[pos + 9]) | (static_cast<uint16_t>(data[pos + 10]) << 8);
size_t msg_total = static_cast<size_t>(HEADER_BYTES) + decl_data_len;

if (msg_id != MSG_ID_CAN_STREAM_DEPRECATED) {
// Consume this non-CAN-Stream message and look for the next one.
// Declared length matches on-wire footprint for heartbeats / status
// responses per the protocol spec (verified by hardware capture for
// heartbeats specifically). If we'd overrun the buffer, advance to
// the end so the outer loop exits cleanly.
pos += (msg_total <= data.size() - pos) ? msg_total : (data.size() - pos);
continue;
}

uint8_t control_byte = data[11];
// Extract timestamp size (bits 6 & 5)
size_t timestamp_size = (control_byte & 0x60) >> 5;
// Check if the frame is extended (bit 4)
bool is_can_extended = (control_byte & 0x10) >> 4;
// Extract CAN frame length (lower 4 bits)
size_t can_length = control_byte & 0x0F;

// Determine where the CAN ID starts (after timestamp bytes)
size_t can_id_start = 12 + timestamp_size;
uint32_t can_id = 0;
size_t can_data_start = 0;

// Ensure data is large enough for CAN ID extraction
size_t min_id_size = is_can_extended ? 4 : 2;
if (data.size() < can_id_start + min_id_size) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Data too short for CAN ID.");
}
// CAN Stream (Message ID 1). Decode using the Control Byte at byte 11.
uint8_t control_byte = data[pos + 11];
size_t timestamp_size = (control_byte & 0x60) >> 5;
bool is_can_extended = (control_byte & 0x10) >> 4;
size_t can_length = control_byte & 0x0F;

// Extract CAN ID (little-endian)
if (!is_can_extended) {
can_id = static_cast<uint16_t>(data[can_id_start] | (data[can_id_start + 1] << 8));
can_data_start = can_id_start + 2;
} else {
can_id = static_cast<uint32_t>(
data[can_id_start] | (data[can_id_start + 1] << 8) | (data[can_id_start + 2] << 16) |
(data[can_id_start + 3] << 24));
can_data_start = can_id_start + 4;
can_frame.set_id_as_extended();
}
size_t can_id_start = pos + 12 + timestamp_size;
size_t min_id_size = is_can_extended ? 4 : 2;
if (data.size() < can_id_start + min_id_size) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Data too short for CAN ID.");
}

// Ensure data is large enough for CAN payload
if (data.size() < can_data_start + can_length) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Data too short for CAN payload.");
}
uint32_t can_id = 0;
size_t can_data_start = 0;
if (!is_can_extended) {
can_id = static_cast<uint16_t>(data[can_id_start] | (data[can_id_start + 1] << 8));
can_data_start = can_id_start + 2;
} else {
can_id = static_cast<uint32_t>(
data[can_id_start] | (data[can_id_start + 1] << 8) | (data[can_id_start + 2] << 16) |
(data[can_id_start + 3] << 24));
can_data_start = can_id_start + 4;
can_frame.set_id_as_extended();
}

// Extract CAN data (zero-padded to 8 bytes)
std::array<uint8_t, 8> can_data = {0};
std::copy_n(data.begin() + can_data_start, can_length, can_data.begin());
if (data.size() < can_data_start + can_length) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Data too short for CAN payload.");
}

// Set CAN frame properties
can_frame.set_can_id(can_id);
can_frame.set_len(can_length);
can_frame.set_data(can_data);
std::array<uint8_t, 8> can_data = {0};
std::copy_n(data.begin() + can_data_start, can_length, can_data.begin());

return std::nullopt;
can_frame.set_can_id(can_id);
can_frame.set_len(can_length);
can_frame.set_data(can_data);

return std::nullopt;
}

// We exhausted the buffer without ever finding a CAN Stream message.
// Either pos < HEADER_BYTES of remaining bytes (insufficient to parse a
// header) or we walked past everything skipping non-CAN-Stream messages.
if (data.size() < HEADER_BYTES) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>("Data too short for header and control byte.");
}
return std::make_optional<AxiomaticAdapter::socket_error_string_t>(AxiomaticAdapter::NON_FRAME_PROTOCOL_MESSAGE);
}

std::optional<const polymath::socketcan::CanFrame> receive()
Expand Down Expand Up @@ -308,11 +349,14 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
control_byte |= (is_extended ? (1 << 4) : 0);
control_byte |= (frame_data_length & 0x0F);

// initialize the full message with the header, control bytes, timestamp bytes
// initialize the full message with the header, control bytes, timestamp bytes.
// Header layout: 6-byte sync prefix + 2-byte Message ID (LSB first) + 1-byte
// Message Version + 2-byte Message Data Length (LSB first) = 11 bytes total.
std::vector<uint8_t> full_message;
full_message.insert(full_message.end(), AXIOMATIC_CAN_MESSAGE_HEADER.begin(), AXIOMATIC_CAN_MESSAGE_HEADER.end());
full_message.push_back(0x00);
full_message.push_back(0x00);
full_message.insert(full_message.end(), AXIOMATIC_SYNC_PREFIX.begin(), AXIOMATIC_SYNC_PREFIX.end());
full_message.push_back(static_cast<uint8_t>(MSG_ID_CAN_STREAM_DEPRECATED & 0xFF));
full_message.push_back(static_cast<uint8_t>((MSG_ID_CAN_STREAM_DEPRECATED >> 8) & 0xFF));
full_message.push_back(0x00); // Message Version
full_message.push_back(static_cast<uint8_t>(message_length & 0xFF));
full_message.push_back(static_cast<uint8_t>((message_length >> 8) & 0xFF));
full_message.push_back(control_byte);
Expand All @@ -327,12 +371,11 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
// insert the can frame data
full_message.insert(full_message.end(), frame_data.begin(), frame_data.end());

try {
boost::asio::write(tcp_socket_, boost::asio::buffer(full_message.data(), full_message.size()));
} catch (const std::exception & e) {
return std::optional<AxiomaticAdapter::socket_error_string_t>(std::string("TCP Send Failed: ") + e.what());
}
return std::nullopt;
// Hand off to the send worker. Returns immediately so the calling thread —
// typically the SocketCAN reception callback — is never blocked on a slow
// or back-pressured TCP write. Actual write errors are reported through
// error_callback_ from the worker thread.
return enqueueSend(std::move(full_message));
}

TCPSocketState get_socket_state()
Expand All @@ -345,8 +388,110 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
return thread_running_;
}

/// @brief Number of outbound messages dropped because the send queue was full.
/// Reset to zero when the send worker starts.
uint64_t get_send_queue_drops()
{
return send_queue_drops_.load();
}

private:
static constexpr std::array<uint8_t, 7> AXIOMATIC_CAN_MESSAGE_HEADER = {'A', 'X', 'I', 'O', 0xBA, 0x36, 0x01};
// ------ Async send worker --------------------------------------------------
//
// send() copies the prebuilt protocol message bytes onto a bounded queue and
// returns immediately. A dedicated worker thread pops messages off the queue
// and does the actual blocking boost::asio::write() against the socket.
//
// This decouples the SocketCAN reception callback (which calls send()) from
// TCP back-pressure — under load the calling thread no longer waits on the
// kernel send buffer, so frames don't pile up in the SocketCAN kernel queue
// and get silently dropped. The cost is bounded user-space memory for the
// outbound queue and best-effort delivery semantics: actual write errors
// surface via error_callback_, not via send()'s return value.

std::optional<AxiomaticAdapter::socket_error_string_t> enqueueSend(std::vector<uint8_t> bytes)
{
if (socket_state_ != TCPSocketState::OPEN) {
return std::make_optional<AxiomaticAdapter::socket_error_string_t>(
"axiomatic: send called while socket not open");
}
{
std::lock_guard<std::mutex> lock(send_queue_mutex_);
if (send_queue_.size() >= SEND_QUEUE_MAX) {
// Drop the oldest queued frame. Matches the kernel SocketCAN buffer's
// overflow semantics in spirit — under sustained back-pressure we keep
// the freshest data and shed the stalest.
send_queue_.pop_front();
send_queue_drops_.fetch_add(1, std::memory_order_relaxed);
}
send_queue_.push_back(std::move(bytes));
}
send_queue_cv_.notify_one();
return std::nullopt;
}

void startSendWorker()
{
send_worker_stop_.store(false);
send_queue_drops_.store(0);
send_worker_thread_ = std::thread([this]() { sendWorkerLoop(); });
}

void stopSendWorker()
{
{
std::lock_guard<std::mutex> lock(send_queue_mutex_);
send_worker_stop_.store(true);
}
send_queue_cv_.notify_all();
if (send_worker_thread_.joinable()) {
send_worker_thread_.join();
}
// Drop anything that didn't make it onto the wire so a future openSocket()
// doesn't replay stale frames.
std::lock_guard<std::mutex> lock(send_queue_mutex_);
send_queue_.clear();
}

void sendWorkerLoop()
{
while (true) {
std::vector<uint8_t> message;
{
std::unique_lock<std::mutex> lock(send_queue_mutex_);
send_queue_cv_.wait(lock, [this]() { return send_worker_stop_.load() || !send_queue_.empty(); });
if (send_worker_stop_.load() && send_queue_.empty()) {
return;
}
message = std::move(send_queue_.front());
send_queue_.pop_front();
}
try {
boost::asio::write(tcp_socket_, boost::asio::buffer(message.data(), message.size()));
} catch (const std::exception & e) {
if (error_callback_) {
error_callback_(std::string("TCP Send Failed: ") + e.what());
}
}
}
}

private:
// 6-byte sync prefix common to every Axiomatic protocol message: "AXIO" + Protocol ID 14010
// (= 0x36BA, little-endian on the wire as 0xBA 0x36). Message ID lives at bytes 6-7 of the
// header and is checked separately so we can distinguish CAN Stream from Heartbeat / Status
// Response / CAN FD Stream / unknown.
static constexpr std::array<uint8_t, 6> AXIOMATIC_SYNC_PREFIX = {'A', 'X', 'I', 'O', 0xBA, 0x36};

// Message IDs from the Axiomatic protocol spec (v6, April 2025).
static constexpr uint16_t MSG_ID_CAN_STREAM_DEPRECATED = 1;
static constexpr uint16_t MSG_ID_STATUS_RESPONSE = 3;
static constexpr uint16_t MSG_ID_HEARTBEAT = 4;
static constexpr uint16_t MSG_ID_CAN_FD_STREAM = 5;

// Minimum header bytes needed before we can read Message Data Length (bytes 9-10).
static constexpr size_t HEADER_BYTES = 11;

static constexpr std::chrono::milliseconds TCP_IP_CONNECTION_TIMEOUT_MS{3000};

/// @brief Socket connection state as a struct for the mutex during TCP Open Socket to update the variables together
Expand All @@ -364,6 +509,15 @@ class AxiomaticAdapter::AxiomaticAdapterImpl
std::atomic<bool> thread_running_;
std::atomic<bool> stop_thread_requested_;

// Async send worker state.
static constexpr size_t SEND_QUEUE_MAX = 1024;
std::mutex send_queue_mutex_;
std::condition_variable send_queue_cv_;
std::deque<std::vector<uint8_t>> send_queue_;
std::thread send_worker_thread_;
std::atomic<bool> send_worker_stop_{false};
std::atomic<uint64_t> send_queue_drops_{0};

// from construction
std::string ip_address_;
std::string port_;
Expand Down
Loading