Skip to content

Commit

Permalink
libmavconn: tcp: enable_shared_from_this
Browse files Browse the repository at this point in the history
  • Loading branch information
vooon committed Jun 28, 2016
1 parent 7a4a2ae commit 09cf185
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
8 changes: 5 additions & 3 deletions libmavconn/include/mavconn/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace mavconn {
*
* @note IPv4 only
*/
class MAVConnTCPClient : public MAVConnInterface {
class MAVConnTCPClient : public MAVConnInterface,
public std::enable_shared_from_this<MAVConnTCPClient> {
public:
static constexpr auto DEFAULT_SERVER_HOST = "localhost";
static constexpr auto DEFAULT_SERVER_PORT = 5760;
Expand Down Expand Up @@ -71,7 +72,7 @@ class MAVConnTCPClient : public MAVConnInterface {

std::atomic<bool> tx_in_progress;
std::deque<MsgBuffer> tx_q;
uint8_t rx_buf[MsgBuffer::MAX_SIZE];
std::array<uint8_t, MsgBuffer::MAX_SIZE> rx_buf;
std::recursive_mutex mutex;

/**
Expand All @@ -88,7 +89,8 @@ class MAVConnTCPClient : public MAVConnInterface {
*
* @note IPv4 only
*/
class MAVConnTCPServer : public MAVConnInterface {
class MAVConnTCPServer : public MAVConnInterface,
public std::enable_shared_from_this<MAVConnTCPServer> {
public:
static constexpr auto DEFAULT_BIND_HOST = "localhost";
static constexpr auto DEFAULT_BIND_PORT = 5760;
Expand Down
67 changes: 36 additions & 31 deletions libmavconn/src/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ MAVConnTCPClient::MAVConnTCPClient(uint8_t system_id, uint8_t component_id,
throw DeviceError("tcp", err);
}

// NOTE: shared_from_this() should not be used in constructors

// give some work to io_service before start
io_service.post(std::bind(&MAVConnTCPClient::do_recv, this));

Expand Down Expand Up @@ -110,7 +112,7 @@ void MAVConnTCPClient::client_connected(size_t server_channel)
server_channel, conn_id, to_string_ss(server_ep).c_str());

// start recv
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_recv, this));
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_recv, shared_from_this()));
}

MAVConnTCPClient::~MAVConnTCPClient()
Expand Down Expand Up @@ -150,7 +152,7 @@ void MAVConnTCPClient::send_bytes(const uint8_t *bytes, size_t length)

tx_q.emplace_back(bytes, length);
}
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, this, true));
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
}

void MAVConnTCPClient::send_message(const mavlink_message_t *message)
Expand All @@ -172,7 +174,7 @@ void MAVConnTCPClient::send_message(const mavlink_message_t *message)

tx_q.emplace_back(message);
}
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, this, true));
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
}

void MAVConnTCPClient::send_message(const mavlink::Message &message)
Expand All @@ -192,22 +194,23 @@ void MAVConnTCPClient::send_message(const mavlink::Message &message)

tx_q.emplace_back(message, get_status_p(), sys_id, comp_id);
}
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, this, true));
socket.get_io_service().post(std::bind(&MAVConnTCPClient::do_send, shared_from_this(), true));
}

void MAVConnTCPClient::do_recv()
{
auto sthis = shared_from_this();
socket.async_receive(
buffer(rx_buf, sizeof(rx_buf)),
[this] (error_code error, size_t bytes_transferred) {
buffer(rx_buf),
[sthis] (error_code error, size_t bytes_transferred) {
if (error) {
logError(PFXd "receive: %s", conn_id, error.message().c_str());
close();
logError(PFXd "receive: %s", sthis->conn_id, error.message().c_str());
sthis->close();
return;
}

parse_buffer(PFX, rx_buf, sizeof(rx_buf), bytes_transferred);
do_recv();
sthis->parse_buffer(PFX, sthis->rx_buf.data(), sthis->rx_buf.size(), bytes_transferred);
sthis->do_recv();
});
}

Expand All @@ -221,35 +224,36 @@ void MAVConnTCPClient::do_send(bool check_tx_state)
return;

tx_in_progress = true;
auto sthis = shared_from_this();
auto &buf_ref = tx_q.front();
socket.async_send(
buffer(buf_ref.dpos(), buf_ref.nbytes()),
[this, &buf_ref] (error_code error, size_t bytes_transferred) {
[sthis, &buf_ref] (error_code error, size_t bytes_transferred) {
assert(bytes_transferred <= buf_ref.len);

if (error) {
logError(PFXd "send: %s", conn_id, error.message().c_str());
close();
logError(PFXd "send: %s", sthis->conn_id, error.message().c_str());
sthis->close();
return;
}

iostat_tx_add(bytes_transferred);
lock_guard lock(mutex);
sthis->iostat_tx_add(bytes_transferred);
lock_guard lock(sthis->mutex);

if (tx_q.empty()) {
tx_in_progress = false;
if (sthis->tx_q.empty()) {
sthis->tx_in_progress = false;
return;
}

buf_ref.pos += bytes_transferred;
if (buf_ref.nbytes() == 0) {
tx_q.pop_front();
sthis->tx_q.pop_front();
}

if (!tx_q.empty())
do_send(false);
if (!sthis->tx_q.empty())
sthis->do_send(false);
else
tx_in_progress = false;
sthis->tx_in_progress = false;
});
}

Expand Down Expand Up @@ -384,26 +388,27 @@ void MAVConnTCPServer::send_message(const mavlink::Message &message)

void MAVConnTCPServer::do_accept()
{
auto sthis = shared_from_this();
auto acceptor_client = std::make_shared<MAVConnTCPClient>(sys_id, comp_id, io_service);
acceptor.async_accept(
acceptor_client->socket,
acceptor_client->server_ep,
[this, acceptor_client] (error_code error) {
[sthis, acceptor_client] (error_code error) {
if (error) {
logError(PFXd "accept: %s", conn_id, error.message().c_str());
close();
logError(PFXd "accept: %s", sthis->conn_id, error.message().c_str());
sthis->close();
return;
}

lock_guard lock(mutex);
lock_guard lock(sthis->mutex);

std::weak_ptr<MAVConnTCPClient> weak_client = acceptor_client;
acceptor_client->client_connected(conn_id);
acceptor_client->message_received_cb = std::bind(&MAVConnTCPServer::recv_message, this, std::placeholders::_1, std::placeholders::_2);
acceptor_client->port_closed_cb = [weak_client, this] () { client_closed(weak_client); };
std::weak_ptr<MAVConnTCPClient> weak_client{acceptor_client};
acceptor_client->client_connected(sthis->conn_id);
acceptor_client->message_received_cb = std::bind(&MAVConnTCPServer::recv_message, sthis, std::placeholders::_1, std::placeholders::_2);
acceptor_client->port_closed_cb = [weak_client, sthis] () { sthis->client_closed(weak_client); };

client_list.push_back(acceptor_client);
do_accept();
sthis->client_list.push_back(acceptor_client);
sthis->do_accept();
});
}

Expand Down

0 comments on commit 09cf185

Please sign in to comment.