Skip to content

Commit

Permalink
GH-1637: Add some timestamp support to Read/Observe Composite operation
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Bernard <[email protected]>
  • Loading branch information
JaroslawLegierski and sbernard31 committed Sep 23, 2024
1 parent ccd7bc9 commit 02b7538
Show file tree
Hide file tree
Showing 22 changed files with 413 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void dataReceived(Registration registration, TimestampedLwM2mNodes data,

if (registration != null) {
try {
String jsonContent = EventServlet.this.mapper.writeValueAsString(data.getNodes());
String jsonContent = EventServlet.this.mapper.writeValueAsString(data.getMostRecentNodes());

String eventData = new StringBuilder("{\"ep\":\"") //
.append(registration.getEndpoint()) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
Expand All @@ -59,22 +62,27 @@
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.TimestampedLwM2mNode;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.node.codec.DefaultLwM2mEncoder;
import org.eclipse.leshan.core.node.codec.LwM2mEncoder;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.BindingMode;
import org.eclipse.leshan.core.request.CancelObservationRequest;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.DeregisterRequest;
import org.eclipse.leshan.core.request.ObserveCompositeRequest;
import org.eclipse.leshan.core.request.ObserveRequest;
import org.eclipse.leshan.core.request.ReadCompositeRequest;
import org.eclipse.leshan.core.request.ReadRequest;
import org.eclipse.leshan.core.request.RegisterRequest;
import org.eclipse.leshan.core.request.UpdateRequest;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.TimeoutException;
import org.eclipse.leshan.core.response.CancelObservationResponse;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ObserveCompositeResponse;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.core.response.ReadCompositeResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.integration.tests.util.LeshanTestServer;
Expand Down Expand Up @@ -524,4 +532,95 @@ public void observe_timestamped(String givenServerEndpointProvider) throws Excep
ObserveResponse cancelResponse = cancelFuture.get(1, TimeUnit.SECONDS);
assertThat(cancelResponse.getTimestampedLwM2mNode()).isEqualTo(timestampedNode);
}

@TestAllTransportLayer
public void read_composite_timestamped(String givenServerEndpointProvider) throws Exception {

// register client
LockStepLwM2mClient client = new LockStepLwM2mClient(server.getEndpoint(Protocol.COAP).getURI());
Token token = client
.sendLwM2mRequest(new RegisterRequest(client.getEndpointName(), 60l, "1.1", EnumSet.of(BindingMode.U),
null, null, linkParser.parseCoreLinkFormat("</1>,</2>,</3>".getBytes()), null));
client.expectResponse().token(token).go();
server.waitForNewRegistrationOf(client.getEndpointName());

Registration registration = server.getRegistrationService().getByEndpoint(client.getEndpointName());

// create timestamped data
List<LwM2mPath> paths = new ArrayList<>();
paths.add(new LwM2mPath("/1/0/1"));
paths.add(new LwM2mPath("/3/0/15"));
TimestampedLwM2mNodes.Builder builder = new TimestampedLwM2mNodes.Builder();
Instant t1 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
builder.put(t1, paths.get(0), LwM2mSingleResource.newIntegerResource(1, 3600));
builder.put(t1, paths.get(1), LwM2mSingleResource.newStringResource(15, "Europe/Belgrade"));
TimestampedLwM2mNodes timestampednodes = builder.build();

LwM2mEncoder encoder = new DefaultLwM2mEncoder();

byte[] payload = encoder.encodeTimestampedNodes(timestampednodes, ContentFormat.SENML_JSON,
client.getLwM2mModel());

// send read request
Future<ReadCompositeResponse> future = Executors.newSingleThreadExecutor().submit(() -> {
// send a request with 1 seconds timeout
return server.send(registration,
new ReadCompositeRequest(ContentFormat.SENML_JSON, ContentFormat.SENML_JSON, "/1/0/1", "/3/0/15"),
1000);
});

// wait for request and send response
client.expectRequest().storeToken("TKN").storeMID("MID").go();
client.sendResponse(Type.ACK, ResponseCode.CONTENT).loadMID("MID").loadToken("TKN")
.payload(payload, ContentFormat.SENML_JSON_CODE).go();

// check response received at server side
ReadCompositeResponse response = future.get(1, TimeUnit.SECONDS);
assertThat(response.getTimestampedLwM2mNode()).isEqualTo(timestampednodes);
}

@TestAllTransportLayer
public void observe_composite_timestamped(String givenServerEndpointProvider) throws Exception {

// register client
LockStepLwM2mClient client = new LockStepLwM2mClient(server.getEndpoint(Protocol.COAP).getURI());
Token token = client
.sendLwM2mRequest(new RegisterRequest(client.getEndpointName(), 60l, "1.1", EnumSet.of(BindingMode.U),
null, null, linkParser.parseCoreLinkFormat("</1>,</2>,</3>".getBytes()), null));
client.expectResponse().token(token).go();
server.waitForNewRegistrationOf(client.getEndpointName());

Registration registration = server.getRegistrationService().getByEndpoint(client.getEndpointName());

// create timestamped data
List<LwM2mPath> paths = new ArrayList<>();
paths.add(new LwM2mPath("/1/0/1"));
paths.add(new LwM2mPath("/3/0/15"));
TimestampedLwM2mNodes.Builder builder = new TimestampedLwM2mNodes.Builder();
Instant t1 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
builder.put(t1, paths.get(0), LwM2mSingleResource.newIntegerResource(1, 3600));
builder.put(t1, paths.get(1), LwM2mSingleResource.newStringResource(15, "Europe/Belgrade"));
TimestampedLwM2mNodes timestampednodes = builder.build();

LwM2mEncoder encoder = new DefaultLwM2mEncoder();

byte[] payload = encoder.encodeTimestampedNodes(timestampednodes, ContentFormat.SENML_JSON,
client.getLwM2mModel());

// send read request
Future<ObserveCompositeResponse> future = Executors.newSingleThreadExecutor().submit(() -> {
// send a request with 1 seconds timeout
return server.send(registration, new ObserveCompositeRequest(ContentFormat.SENML_JSON,
ContentFormat.SENML_JSON, "/1/0/1", "/3/0/15"), 1000);
});

// wait for request and send response
client.expectRequest().storeToken("TKN").storeMID("MID").go();
client.sendResponse(Type.ACK, ResponseCode.CONTENT).loadMID("MID").loadToken("TKN")
.payload(payload, ContentFormat.SENML_JSON_CODE).go();

// check response received at server side
ObserveCompositeResponse response = future.get(1, TimeUnit.SECONDS);
assertThat(response.getTimestampedLwM2mNode()).isEqualTo(timestampednodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void can_observecomposite_timestamped_nodes(List<LwM2mPath> paths, Timest
// verify result
ObserveCompositeResponse response = server.waitForNotificationOf(observation);
assertThat(response).hasContentFormat(contentFormat, givenServerEndpointProvider);
assertThat(response.getContent()).containsExactlyEntriesOf(timestampednodes.getNodes());
assertThat(response.getContent()).containsExactlyEntriesOf(timestampednodes.getMostRecentNodes());
assertThat(response.getTimestampedLwM2mNodes()).isEqualTo(timestampednodes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private void assertSuccessfulSendAfterAddressChanged() throws InterruptedExcepti

// wait for data and check result
TimestampedLwM2mNodes data = server.waitForData(client.getEndpointName(), 1, TimeUnit.SECONDS);
Map<LwM2mPath, LwM2mNode> nodes = data.getNodes();
Map<LwM2mPath, LwM2mNode> nodes = data.getMostRecentNodes();
LwM2mResource modelnumber = (LwM2mResource) nodes.get(new LwM2mPath("/3/0/1"));
assertThat(modelnumber.getId()).isEqualTo(1);
assertThat(modelnumber.getValue()).isEqualTo("IT-TEST-123");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void can_send_resources(ContentFormat contentformat, Protocol givenProtoc

// wait for data and check result
TimestampedLwM2mNodes data = server.waitForData(client.getEndpointName(), 1, TimeUnit.SECONDS);
Map<LwM2mPath, LwM2mNode> nodes = data.getNodes();
Map<LwM2mPath, LwM2mNode> nodes = data.getMostRecentNodes();
LwM2mResource modelnumber = (LwM2mResource) nodes.get(new LwM2mPath("/3/0/1"));
assertThat(modelnumber.getId()).isEqualTo(1);
assertThat(modelnumber.getValue()).isEqualTo("IT-TEST-123");
Expand Down Expand Up @@ -159,7 +159,7 @@ public void can_send_resources_asynchronously(ContentFormat contentformat, Proto

// wait for data and check result
TimestampedLwM2mNodes data = server.waitForData(client.getEndpointName(), 1, TimeUnit.SECONDS);
Map<LwM2mPath, LwM2mNode> nodes = data.getNodes();
Map<LwM2mPath, LwM2mNode> nodes = data.getMostRecentNodes();
LwM2mResource modelnumber = (LwM2mResource) nodes.get(new LwM2mPath("/3/0/1"));
assertThat(modelnumber.getId()).isEqualTo(1);
assertThat(modelnumber.getValue()).isEqualTo("IT-TEST-123");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,12 @@ public void visit(CancelObservationRequest request) {

@Override
public void visit(ReadCompositeRequest request) {
response = new ReadCompositeResponse(code, null, errorMessage, null);
response = new ReadCompositeResponse(code, null, null, errorMessage, null);
}

@Override
public void visit(ObserveCompositeRequest request) {
response = new ObserveCompositeResponse(code, null, null, null, errorMessage, null);
response = new ObserveCompositeResponse(code, null, null, null, null, errorMessage, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void test_successful_data_send() {

// ensure that sent values equals collected ones.
TimestampedLwM2mNodes lastValuesSent = fakeDataSenderManager.getLastValuesSent();
assertEquals(currentValues, lastValuesSent.getNodes());
assertEquals(currentValues, lastValuesSent.getMostRecentNodes());

// re send to ensure data was flushed (we do not resent same data)
Map<LwM2mPath, LwM2mNode> newValue = fakeDataSenderManager.changeCurrentValues(givenServer, givenPaths);
Expand All @@ -77,7 +77,7 @@ public void test_successful_data_send() {
lastValuesSent = fakeDataSenderManager.getLastValuesSent();

assertEquals(1, lastValuesSent.getTimestamps().size());
assertEquals(newValue, lastValuesSent.getNodes());
assertEquals(newValue, lastValuesSent.getMostRecentNodes());
}

@Test
Expand Down Expand Up @@ -126,7 +126,7 @@ public void test_unsuccessful_data_send() {

// ensure that sent values equals collected ones.
TimestampedLwM2mNodes lastValuesSent = fakeDataSenderManager.getLastValuesSent();
assertEquals(currentValues, lastValuesSent.getNodes());
assertEquals(currentValues, lastValuesSent.getMostRecentNodes());
}

@Test
Expand All @@ -146,7 +146,7 @@ public void test_error_during_data_send() {

// ensure that sent values equals collected ones.
TimestampedLwM2mNodes lastValuesSent = fakeDataSenderManager.getLastValuesSent();
assertEquals(currentValues, lastValuesSent.getNodes());
assertEquals(currentValues, lastValuesSent.getMostRecentNodes());
}

@Test
Expand All @@ -163,13 +163,13 @@ public void test_successful_data_send_without_flush() {

// ensure that sent values equals collected ones.
TimestampedLwM2mNodes lastValuesSent = fakeDataSenderManager.getLastValuesSent();
assertEquals(currentValues, lastValuesSent.getNodes());
assertEquals(currentValues, lastValuesSent.getMostRecentNodes());

// re send to ensure data was not flushed
manualDataSender.sendCollectedData(givenServer, ContentFormat.SENML_CBOR, 0, false);

lastValuesSent = fakeDataSenderManager.getLastValuesSent();
assertEquals(currentValues, lastValuesSent.getNodes());
assertEquals(currentValues, lastValuesSent.getMostRecentNodes());
}

static class FakeDataSenderManager extends DataSenderManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,31 @@ public Map<LwM2mPath, LwM2mNode> getNodesAt(Instant timestamp) {
* Get all collected nodes as {@link LwM2mPath}-{@link LwM2mNode} map ignoring timestamp information. In case of the
* same path conflict the most recent one is taken. Null timestamp is considered as most recent one.
*/
public Map<LwM2mPath, LwM2mNode> getNodes() {
public Map<LwM2mPath, LwM2mNode> getFlattenNodes() {
Map<LwM2mPath, LwM2mNode> result = new HashMap<>();
for (Map.Entry<Instant, Map<LwM2mPath, LwM2mNode>> entry : timestampedPathNodesMap.entrySet()) {
result.putAll(entry.getValue());
}
return Collections.unmodifiableMap(result);
}

/**
* Get all collected nodes as {@link LwM2mPath}-{@link LwM2mNode} map from the most recent timestamp. Null is
* considered as most recent one.
*/
public Map<LwM2mPath, LwM2mNode> getMostRecentNodes() {
return timestampedPathNodesMap.values().iterator().next();
}

/**
* Get the most recent timestamp and return a {@link TimestampedLwM2mNodes} containing value for this timestamp.
* Null is considered as most recent one.
*/
public TimestampedLwM2mNodes getMostRecentTimestampedNodes() {
Entry<Instant, Map<LwM2mPath, LwM2mNode>> entry = timestampedPathNodesMap.entrySet().iterator().next();
return new TimestampedLwM2mNodes(Collections.singletonMap(entry.getKey(), entry.getValue()));
}

/**
* Returns the all sorted timestamps of contained nodes with ascending order. Null timestamp is considered as most
* recent one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public byte[] encodeTimestampedNodes(TimestampedLwM2mNodes timestampedNodes, Con
if (encoder instanceof TimestampedMultiNodeEncoder) {
return ((TimestampedMultiNodeEncoder) encoder).encodeTimestampedNodes(timestampedNodes, model, converter);
} else if (encoder instanceof MultiNodeEncoder) {
return ((MultiNodeEncoder) encoder).encodeNodes(timestampedNodes.getNodes(), model, converter);
return ((MultiNodeEncoder) encoder).encodeNodes(timestampedNodes.getMostRecentNodes(), model, converter);
} else {
throw new CodecException("Encoder does not support multiple nodes encoding for this content format: %s",
format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*******************************************************************************/
package org.eclipse.leshan.core.request;

import java.time.Instant;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
Expand Down Expand Up @@ -72,37 +73,41 @@ public SendRequest(ContentFormat format, TimestampedLwM2mNodes timestampedNodes,
throw new InvalidRequestException("Content format MUST be SenML_CBOR or SenML_JSON but was " + format);
}
// Validate Nodes
validateNodes(timestampedNodes.getNodes());
validateNodes(timestampedNodes);

this.format = format;
}

private void validateNodes(Map<LwM2mPath, LwM2mNode> nodes) {
if (nodes == null || nodes.size() == 0) {
private void validateNodes(TimestampedLwM2mNodes timestampedNodes) {
if (timestampedNodes == null || timestampedNodes.isEmpty() || timestampedNodes.getFlattenNodes().isEmpty()) {
throw new InvalidRequestException(
"SendRequest MUST NOT have empty payload (at least 1 node should be present)");
}
for (Entry<LwM2mPath, LwM2mNode> entry : nodes.entrySet()) {
LwM2mPath path = entry.getKey();
LwM2mNode node = entry.getValue();
if (path == null) {
throw new InvalidRequestException("Invalid key for entry (null, %s) : path MUST NOT be null", node);
}
if (node == null) {
throw new InvalidRequestException("Invalid value for entry (%s, null) : node MUST NOT be null ", path);
}

if (path.isObject() && node instanceof LwM2mObject)
return;
if (path.isObjectInstance() && node instanceof LwM2mObjectInstance)
return;
if (path.isResource() && node instanceof LwM2mSingleResource)
return;
if (path.isResourceInstance() && node instanceof LwM2mResourceInstance)
return;

throw new InvalidRequestException("Invalid value : path (%s) should not refer to a %s value", path,
node.getClass().getSimpleName());
for (Instant t : timestampedNodes.getTimestamps()) {
for (Entry<LwM2mPath, LwM2mNode> entry : timestampedNodes.getNodesAt(t).entrySet()) {
LwM2mPath path = entry.getKey();
LwM2mNode node = entry.getValue();
if (path == null) {
throw new InvalidRequestException("Invalid key for entry (null, %s) : path MUST NOT be null", node);
}
if (node == null) {
throw new InvalidRequestException("Invalid value for entry (%s, null) : node MUST NOT be null ",
path);
}

if (path.isObject() && node instanceof LwM2mObject)
return;
if (path.isObjectInstance() && node instanceof LwM2mObjectInstance)
return;
if (path.isResource() && node instanceof LwM2mSingleResource)
return;
if (path.isResourceInstance() && node instanceof LwM2mResourceInstance)
return;

throw new InvalidRequestException("Invalid value : path (%s) should not refer to a %s value", path,
node.getClass().getSimpleName());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class CancelCompositeObservationResponse extends ObserveCompositeResponse
public CancelCompositeObservationResponse(ResponseCode code, Map<LwM2mPath, LwM2mNode> content,
TimestampedLwM2mNodes timestampedValues, CompositeObservation observation, String errorMessage,
Object coapResponse) {
super(code, content, null, observation, errorMessage, coapResponse);
super(code, content, timestampedValues, null, observation, errorMessage, coapResponse);
}

@Override
Expand Down
Loading

0 comments on commit 02b7538

Please sign in to comment.