Skip to content

Commit

Permalink
Add a service to trigger functionality (#1611)
Browse files Browse the repository at this point in the history
* initial commit to allow plugin to call a service

Signed-off-by: Liam Han <[email protected]>

* adding tutorial and modifying the world sdf

Signed-off-by: Liam Han <[email protected]>

* added test for single input and single service output

Signed-off-by: Liam Han <[email protected]>

* added test for single input and multiple service output

Signed-off-by: Liam Han <[email protected]>

* added test for invalid matching service name => timeout

Signed-off-by: Liam Han <[email protected]>

* modified variables the camelCase

Signed-off-by: Liam Han <[email protected]>

* fixed typo, indentation, grammar, lines that exceeded 80 char

Signed-off-by: Liam Han <[email protected]>

* fixing ubuntu bionic ci issue

Signed-off-by: Liam Han <[email protected]>

* silly syntax mistake on expect_eq

Signed-off-by: Liam Han <[email protected]>

* added three more test cases that addesses incorrect response type, incorrect request type and false result

Signed-off-by: Liam Han <[email protected]>

* WIP: major restructuring and currently working. Requires more cleanup and test

Signed-off-by: Liam Han <[email protected]>

* WIP: fixed preprocessor define bug

Signed-off-by: Liam Han <[email protected]>

* WIP: working but extremely convoluted

Signed-off-by: Liam Han <[email protected]>

* WIP major modification but a lot of errors and tests failed

Signed-off-by: Liam Han <[email protected]>

* stable version: had to revert back to previous work. all tests passed

Signed-off-by: Liam Han <[email protected]>

* modified to use blocking Request method as well as reduce a service worker thread to just one thread with the publisher. all tests passed

Signed-off-by: Liam Han <[email protected]>

* stable version: had to revert back to previous work. all tests passed

Signed-off-by: Liam Han <[email protected]>

* successfully reverted and tested

Signed-off-by: Liam Han <[email protected]>

* fixing PR suggestions

Signed-off-by: Liam Han <[email protected]>

* changed string with 'serv' to 'srv' and included <mutex> to the header

Signed-off-by: Liam Han <[email protected]>

* fixed indentation and removed rep.set_data since it's unused on the client service

Signed-off-by: Liam Han <[email protected]>

* getting rid of the id

Signed-off-by: Liam Han <[email protected]>

* fixed race condition resulting seldom test failure

Signed-off-by: Liam Han <[email protected]>

* changed from triggerSrv to serviceCount. This compensates for the two threads running at different rate

Signed-off-by: Liam Han <[email protected]>

* braces indentation

Signed-off-by: Mabel Zhang <[email protected]>

* addressing gnu c compiler (gcc) warnings

Signed-off-by: Liam Han <[email protected]>

Signed-off-by: Liam Han <[email protected]>
Signed-off-by: Mabel Zhang <[email protected]>
Co-authored-by: Mabel Zhang <[email protected]>
  • Loading branch information
liamhan0905 and mabelzhang authored Sep 7, 2022
1 parent 69e7757 commit 0cbe6e7
Show file tree
Hide file tree
Showing 6 changed files with 664 additions and 55 deletions.
35 changes: 30 additions & 5 deletions examples/worlds/triggered_publisher.sdf
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,17 @@ start falling.
<time>0.001</time>
<enabled>true</enabled>
</plugin>
<plugin filename="ignition-gazebo-detachable-joint-system" name="ignition::gazebo::systems::DetachableJoint">
<plugin
filename="ignition-gazebo-detachable-joint-system"
name="ignition::gazebo::systems::DetachableJoint">
<parent_link>body</parent_link>
<child_model>box1</child_model>
<child_link>box_body</child_link>
<topic>/box1/detach</topic>
</plugin>
<plugin filename="ignition-gazebo-detachable-joint-system" name="ignition::gazebo::systems::DetachableJoint">
<plugin
filename="ignition-gazebo-detachable-joint-system"
name="ignition::gazebo::systems::DetachableJoint">
<parent_link>body</parent_link>
<child_model>box2</child_model>
<child_link>box_body</child_link>
Expand Down Expand Up @@ -448,19 +452,40 @@ start falling.
</link>
</model>

<plugin filename="ignition-gazebo-triggered-publisher-system" name="ignition::gazebo::systems::TriggeredPublisher">
<plugin
filename="ignition-gazebo-triggered-publisher-system"
name="ignition::gazebo::systems::TriggeredPublisher">
<input type="ignition.msgs.Empty" topic="/start"/>
<output type="ignition.msgs.Twist" topic="/cmd_vel">
linear: {x: 3}
</output>
</plugin>
<plugin filename="ignition-gazebo-triggered-publisher-system" name="ignition::gazebo::systems::TriggeredPublisher">
<plugin
filename="ignition-gazebo-triggered-publisher-system"
name="ignition::gazebo::systems::TriggeredPublisher">
<input type="ignition.msgs.Empty" topic="/reset_robot"/>
<output type="ignition.msgs.Twist" topic="/cmd_vel">
linear: {x: 0}
</output>
<service
name="/world/triggered_publisher/set_pose"
reqType="ignition.msgs.Pose"
repType="ignition.msgs.Boolean"
timeout="3000"
reqMsg="name: 'blue_vehicle', position: {x: -3, z: 0.325}">
</service>
</plugin>
<plugin
filename="ignition-gazebo-triggered-publisher-system"
name="ignition::gazebo::systems::TriggeredPublisher">
<input type="ignition.msgs.Boolean" topic="/trigger/touched">
<match>data: true</match>
</input>
<output type="ignition.msgs.Empty" topic="/box1/detach"/>
</plugin>
<plugin filename="ignition-gazebo-triggered-publisher-system" name="ignition::gazebo::systems::TriggeredPublisher">
<plugin
filename="ignition-gazebo-triggered-publisher-system"
name="ignition::gazebo::systems::TriggeredPublisher">
<input type="ignition.msgs.Altimeter" topic="/altimeter">
<match field="vertical_position" tol="0.2">-7.5</match>
</input>
Expand Down
158 changes: 140 additions & 18 deletions src/systems/triggered_publisher/TriggeredPublisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ FullMatcher::FullMatcher(const std::string &_msgType, bool _logicType,
: InputMatcher(_msgType), logicType(_logicType)
{
if (nullptr == this->matchMsg || !this->matchMsg->IsInitialized())
{
return;
}

this->valid = google::protobuf::TextFormat::ParseFromString(
_matchString, this->matchMsg.get());
Expand All @@ -273,7 +275,9 @@ FieldMatcher::FieldMatcher(const std::string &_msgType, bool _logicType,
fieldName(_fieldName)
{
if (nullptr == this->matchMsg || !this->matchMsg->IsInitialized())
{
return;
}

transport::ProtoMsg *matcherSubMsg{nullptr};
if (!FindFieldSubMessage(this->matchMsg.get(), _fieldName,
Expand All @@ -294,7 +298,9 @@ FieldMatcher::FieldMatcher(const std::string &_msgType, bool _logicType,
}

if (nullptr == matcherSubMsg)
{
return;
}

bool result = google::protobuf::TextFormat::ParseFieldValueFromString(
_fieldString, this->fieldDescMatcher.back(), matcherSubMsg);
Expand Down Expand Up @@ -548,7 +554,9 @@ void TriggeredPublisher::Configure(const Entity &,
{
int ms = sdfClone->Get<int>("delay_ms");
if (ms > 0)
{
this->delay = std::chrono::milliseconds(ms);
}
}

if (sdfClone->HasElement("output"))
Expand Down Expand Up @@ -595,9 +603,52 @@ void TriggeredPublisher::Configure(const Entity &,
}
}
}
else

if (sdfClone->HasElement("service"))
{
ignerr << "No ouptut specified" << std::endl;
for (auto serviceElem = sdfClone->GetElement("service"); serviceElem;
serviceElem = serviceElem->GetNextElement("service"))
{
SrvOutputInfo serviceInfo;
serviceInfo.srvName = serviceElem->Get<std::string>("name");
if (serviceInfo.srvName.empty())
{
ignerr << "Service name cannot be empty\n";
return;
}
serviceInfo.reqType = serviceElem->Get<std::string>("reqType");
if (serviceInfo.reqType.empty())
{
ignerr << "Service request type cannot be empty\n";
return;
}
serviceInfo.repType = serviceElem->Get<std::string>("repType");
if (serviceInfo.repType.empty())
{
ignerr << "Service reply type cannot be empty\n";
return;
}
serviceInfo.reqMsg = serviceElem->Get<std::string>("reqMsg");
if (serviceInfo.reqMsg.empty())
{
ignerr << "Service request message cannot be empty\n";
return;
}
std::string timeoutInfo = serviceElem->Get<std::string>("timeout");
if (timeoutInfo.empty())
{
ignerr << "Timeout value cannot be empty\n";
return;
}

serviceInfo.timeout = std::stoi(timeoutInfo);
this->srvOutputInfo.push_back(std::move(serviceInfo));
}
}
if (!sdfClone->HasElement("service") && !sdfClone->HasElement("output"))
{
ignerr << "No output and service specified. Make sure to specify at least"
"one of them." << std::endl;
return;
}

Expand All @@ -606,23 +657,27 @@ void TriggeredPublisher::Configure(const Entity &,
{
if (this->MatchInput(_msg))
{
if (this->delay > 0ms)
{
std::lock_guard<std::mutex> lock(this->publishQueueMutex);
this->publishQueue.push_back(this->delay);
}
else
{
if (this->delay > 0ms)
{
std::lock_guard<std::mutex> lock(this->publishQueueMutex);
this->publishQueue.push_back(this->delay);
}
else
{
{
std::lock_guard<std::mutex> lock(this->publishCountMutex);
++this->publishCount;
}
this->newMatchSignal.notify_one();
std::lock_guard<std::mutex> lock(this->publishCountMutex);
++this->publishCount;
}
this->newMatchSignal.notify_one();
}
if (this->srvOutputInfo.size() > 0)
{
std::lock_guard<std::mutex> lock(this->triggerSrvMutex);
++this->serviceCount;
}
}
});

if (!this->node.Subscribe(this->inputTopic, msgCb))
{
ignerr << "Input subscriber could not be created for topic ["
Expand All @@ -645,11 +700,69 @@ void TriggeredPublisher::Configure(const Entity &,
std::thread(std::bind(&TriggeredPublisher::DoWork, this));
}

//////////////////////////////////////////////////
void TriggeredPublisher::PublishMsg(std::size_t pending)
{
for (auto &info : this->outputInfo)
{
for (std::size_t i = 0; i < pending; ++i)
{
info.pub.Publish(*info.msgData);
}
}
}

//////////////////////////////////////////////////
void TriggeredPublisher::CallService(std::size_t pendingSrv)
{
for (auto &serviceInfo : this->srvOutputInfo)
{
for (std::size_t i = 0; i < pendingSrv; ++i)
{
bool result;
auto req = msgs::Factory::New(serviceInfo.reqType, serviceInfo.reqMsg);
if (!req)
{
ignerr << "Unable to create request for type ["
<< serviceInfo.reqType << "].\n";
return;
}

auto rep = msgs::Factory::New(serviceInfo.repType);
if (!rep)
{
ignerr << "Unable to create response for type ["
<< serviceInfo.repType << "].\n";
return;
}

bool executed = this->node.Request(serviceInfo.srvName, *req,
serviceInfo.timeout, *rep, result);
if (executed)
{
if (!result)
{
ignerr << "Service call [" << serviceInfo.srvName << "] failed\n";
}
else
{
ignmsg << "Service call [" << serviceInfo.srvName << "] succeeded\n";
}
}
else
{
ignerr << "Service call [" << serviceInfo.srvName << "] timed out\n";
}
}
}
}

//////////////////////////////////////////////////
void TriggeredPublisher::DoWork()
{
while (!this->done)
{
// check whether to publish a msg by checking publishCount
std::size_t pending{0};
{
using namespace std::chrono_literals;
Expand All @@ -661,18 +774,25 @@ void TriggeredPublisher::DoWork()
});

if (this->publishCount == 0 || this->done)
{
continue;

}
std::swap(pending, this->publishCount);
}

for (auto &info : this->outputInfo)
PublishMsg(pending);

// check whether to call a service by checking serviceCount
std::size_t pendingSrv{0};
{
for (std::size_t i = 0; i < pending; ++i)
{
info.pub.Publish(*info.msgData);
std::lock_guard<std::mutex> lock(this->triggerSrvMutex);
if (this->serviceCount == 0 || this->done){
continue;
}
std::swap(pendingSrv, this->serviceCount);
}

CallService(pendingSrv);
}
}

Expand Down Expand Up @@ -712,7 +832,9 @@ void TriggeredPublisher::PreUpdate(const ignition::gazebo::UpdateInfo &_info,
}

if (notify)
{
this->newMatchSignal.notify_one();
}
}

//////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 0cbe6e7

Please sign in to comment.