Skip to content

Commit

Permalink
GH-1645: Add very simple Queue mode support to Leshan Server Demo.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Aug 29, 2024
1 parent ecf87f3 commit baa124f
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private static Server createJettyServer(LeshanServerDemoCLI cli, LeshanServer lw
ServletHolder eventServletHolder = new ServletHolder(eventServlet);
root.addServlet(eventServletHolder, "/api/event/*");

ServletHolder clientServletHolder = new ServletHolder(new ClientServlet(lwServer));
ServletHolder clientServletHolder = new ServletHolder(new ClientServlet(lwServer, eventServlet));
root.addServlet(clientServletHolder, "/api/clients/*");

ServletHolder securityServletHolder;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public EventServlet(LeshanServer server) {
this.mapper = mapper;
}

private synchronized void sendEvent(String event, String data, String endpoint) {
public synchronized void sendEvent(String event, String data, String endpoint) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching {} event from endpoint {}", event, endpoint);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*******************************************************************************
* Copyright (c) 2024 Sierra Wireless and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v20.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Sierra Wireless - initial API and implementation
*******************************************************************************/
package org.eclipse.leshan.demo.server.servlet.queuemode;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.DownlinkDeviceManagementRequest;
import org.eclipse.leshan.core.request.exception.ClientSleepingException;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.server.LeshanServer;
import org.eclipse.leshan.server.queue.PresenceListener;
import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.registration.RegistrationListener;
import org.eclipse.leshan.server.registration.RegistrationUpdate;

/**
* This is a very simple in memory way to store request when device is sleeping.
*/
public class QueueHandler {

private final LeshanServer server;
private final ConcurrentMap<String, QueueRequestData> requestsToSend;

private class QueueRequestData {
private long timeout;
private DownlinkDeviceManagementRequest<? extends LwM2mResponse> request;
private CompletableFuture<LwM2mResponse> responseFuture;
}

public QueueHandler(LeshanServer server) {
this.server = server;
this.requestsToSend = new ConcurrentHashMap<>();

// Handle Presence Service Event
server.getPresenceService().addListener(new PresenceListener() {
@Override
public void onSleeping(Registration registration) {
}

@Override
public void onAwake(Registration registration) {
// try to send store request
QueueRequestData data = requestsToSend.remove(registration.getId());
if (data != null) {
try {
server.send(registration, data.request, data.timeout, //
r -> {
data.responseFuture.complete(r);
}, //
err -> {
data.responseFuture.completeExceptionally(err);
});
} catch (RuntimeException e) {
data.responseFuture.completeExceptionally(e);
}
}
}
});

// Handle Registration Service Event
server.getRegistrationService().addListener(new RegistrationListener() {
@Override
public void updated(RegistrationUpdate update, Registration updatedReg, Registration previousReg) {
}

@Override
public void unregistered(Registration registration, Collection<Observation> observations, boolean expired,
Registration newReg) {
QueueRequestData data = requestsToSend.remove(registration.getId());
if (data != null) {
data.responseFuture.cancel(false);
}
}

@Override
public void registered(Registration registration, Registration previousReg,
Collection<Observation> previousObservations) {
}
});

}

public CompletableFuture<LwM2mResponse> send(Registration destination, DownlinkDeviceManagementRequest<?> request,
long timeoutInMs) throws InterruptedException {

// is client awake ?
boolean clientAwake = server.getPresenceService().isClientAwake(destination);

// client is awake we try to send request now
CompletableFuture<LwM2mResponse> future = new CompletableFuture<>();
if (clientAwake) {
try {
// TODO ideally we should use async way to send request
LwM2mResponse response = server.send(destination, request, timeoutInMs);
future.complete(response);
} catch (ClientSleepingException e) {
clientAwake = false;
}
}

// client is not awake we queue the request for later.
if (!clientAwake) {
QueueRequestData data = new QueueRequestData();
data.request = request;
data.responseFuture = future;
data.timeout = timeoutInMs;
QueueRequestData previous = requestsToSend.put(destination.getId(), data);
// Cancel previous future as we store only last request
if (previous != null) {
previous.responseFuture.cancel(false);
}

// We want to be sure there still a registration for this ID
// The idea of this code is to avoid race condition which could lead to memory leak,
// In case where Registration is removed before we push QueueRequestData.
// Any better idea to handle this is welcomed.
requestsToSend.compute(destination.getId(), (id, currentData) -> {
if (server.getRegistrationService().getById(id) == null) {
// registration was removed, so we don't want to keep data for it
currentData.responseFuture.cancel(false);
return null;
} else {
// registration is still there, we can add it
return currentData;
}
});
}
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,16 @@ export default {
return `?timeout=${timeout.get()}&pathformat=${compositePathFormat.get()}&nodeformat=${compositeNodeFormat.get()}`;
},
updateState(content, requestButton) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
if ("valid" in content || "success" in content) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
} else {
requestButton.resetState();
}
},
read(requestButton) {
this.axios
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,16 @@ export default {
return `?timeout=${timeout.get()}&format=${format.get()}`;
},
updateState(content, requestButton) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
if ("valid" in content || "success" in content) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
} else {
requestButton.resetState();
}
},
read(requestButton) {
this.axios
Expand Down
16 changes: 10 additions & 6 deletions leshan-demo-server/webapp/src/components/object/ObjectControl.vue
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ export default {
return `?timeout=${timeout.get()}&format=${format.get()}`;
},
updateState(content, requestButton) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
if ("valid" in content || "success" in content) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
} else {
requestButton.resetState();
}
},
openCreateDialog() {
this.dialog = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ export default {
return `?timeout=${timeout.get()}&format=${this.getFormat()}`;
},
updateState(content, requestButton) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
if ("valid" in content || "success" in content) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
} else {
requestButton.resetState();
}
},
read(requestButton) {
this.axios
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ export default {
},

updateState(content, requestButton) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
if ("valid" in content || "success" in content) {
let state = !content.valid
? "warning"
: content.success
? "success"
: "error";
requestButton.changeState(state, content.status);
} else {
requestButton.resetState();
}
},
read(requestButton) {
this.axios
Expand Down
18 changes: 17 additions & 1 deletion leshan-demo-server/webapp/src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,24 @@ Vue.directive("visible", function (el, binding) {
el.style.visibility = binding.value ? "visible" : "hidden";
});

new Vue({
let v = new Vue({
vuetify,
router,
render: (h) => h(App),
}).$mount("#app");

/** Add Leshan Server Demo specific axios interceptor */
v.$axios.interceptors.response.use(function (response) {
if (response.data.delayed) {
// show request will be delayed
let msg = `<strong>Device is not awake</strong>
</br>Request will be delayed until device is awake again.
</br><strong>Leshan Server Demo</strong> is only able to delayed the last request.`;

Vue.prototype.$dialog.notify.info(msg, {
position: "bottom-right",
timeout: 5000,
});
}
return response;
});
15 changes: 15 additions & 0 deletions leshan-demo-server/webapp/src/views/Client.vue
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ export default {
this.$store.newNode(this.$route.params.endpoint, path, msg.val[path]);
}
})
.on("REQUEST_RESPONSE", (msg) => {
if (msg.path) {
// we suppose that if we have a path property then this is a single node
this.$store.newNode(
this.$route.params.endpoint,
msg.path,
msg.response.content
);
} else {
this.$store.newNodes(
this.$route.params.endpoint,
msg.response.content
);
}
})
.on("NOTIFICATION", (msg) => {
if (msg.kind == "composite") {
this.$store.newNodes(this.$route.params.endpoint, msg.val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class StaticClientAwakeTimeProvider implements ClientAwakeTimeProvider {
private final int clientAwakeTime;

/**
* Create a {@link ClientAwakeTimeProvider} which always return 9300ms which is the default CoAP MAX_TRANSMIT_WAIT
* Create a {@link ClientAwakeTimeProvider} which always return 93000ms which is the default CoAP MAX_TRANSMIT_WAIT
* value.
*/
public StaticClientAwakeTimeProvider() {
Expand Down

0 comments on commit baa124f

Please sign in to comment.