1.1 --- a/chat/server/src/main/java/org/apidesign/demo/chat/server/ChatServerResource.java Fri Apr 22 08:58:08 2016 +0200
1.2 +++ b/chat/server/src/main/java/org/apidesign/demo/chat/server/ChatServerResource.java Fri Apr 22 18:13:09 2016 +0200
1.3 @@ -26,12 +26,16 @@
1.4 import java.io.ByteArrayInputStream;
1.5 import java.io.IOException;
1.6 import java.util.ArrayList;
1.7 +import java.util.HashSet;
1.8 import java.util.List;
1.9 +import java.util.Set;
1.10 import java.util.logging.Logger;
1.11 import net.java.html.BrwsrCtx;
1.12 import net.java.html.json.Models;
1.13 import org.apidesign.demo.chat.shared.Message;
1.14 import org.apidesign.demo.chat.shared.Query;
1.15 +import org.apidesign.demo.chat.shared.Reply;
1.16 +import org.glassfish.grizzly.websockets.DataFrame;
1.17 import org.glassfish.grizzly.websockets.WebSocket;
1.18 import org.glassfish.grizzly.websockets.WebSocketApplication;
1.19
1.20 @@ -50,8 +54,8 @@
1.21 }
1.22
1.23
1.24 + private final Set<WebSocket> connected = new HashSet<>();
1.25 /*
1.26 - private final Map<AsyncResponse, Long> awaiting = new IdentityHashMap<>();
1.27
1.28 @Produces(MediaType.APPLICATION_JSON)
1.29 @GET public synchronized void getResources(
1.30 @@ -70,38 +74,45 @@
1.31 awaiting.put(ar, since);
1.32 }
1.33 }
1.34 -
1.35 - private void handleAwaiting(long newest) {
1.36 + */
1.37 + private void handleAwaiting(Message msg) {
1.38 assert Thread.holdsLock(this);
1.39 - AGAIN: for (;;) {
1.40 - for (Map.Entry<AsyncResponse, Long> entry : awaiting.entrySet()) {
1.41 - AsyncResponse ar = entry.getKey();
1.42 - Long since = entry.getValue();
1.43 - if (since <= newest) {
1.44 - awaiting.remove(ar);
1.45 - getResources(since, ar);
1.46 - continue AGAIN;
1.47 - }
1.48 - }
1.49 - return;
1.50 + Reply reply = new Reply();
1.51 + reply.getMessages().add(msg);
1.52 + String txt = reply.toString();
1.53 + for (WebSocket webSocket : connected) {
1.54 + webSocket.send(txt);
1.55 }
1.56 }
1.57 -
1.58 - @POST @Consumes(value = MediaType.APPLICATION_JSON)
1.59 - public synchronized Message publish(Message msg) {
1.60 +
1.61 + private synchronized Message publish(Message msg) {
1.62 msg.setSince(System.currentTimeMillis() - started);
1.63 msgs.add(msg);
1.64 - handleAwaiting(msg.getSince());
1.65 + handleAwaiting(msg);
1.66 return msg;
1.67 }
1.68 -*/
1.69 +
1.70 + @Override
1.71 + public void onConnect(WebSocket socket) {
1.72 + connected.add(socket);
1.73 + }
1.74 +
1.75 + @Override
1.76 + public void onClose(WebSocket socket, DataFrame frame) {
1.77 + connected.remove(socket);
1.78 + }
1.79
1.80 @Override
1.81 public void onMessage(WebSocket socket, String text) {
1.82 try {
1.83 ByteArrayInputStream is = new ByteArrayInputStream(text.getBytes("UTF-8"));
1.84 Query q = Models.parse(BrwsrCtx.findDefault(ChatServerResource.class), Query.class, is);
1.85 - super.onMessage(socket, text);
1.86 + if (q.getPost() != null) {
1.87 + publish(q.getPost());
1.88 + }
1.89 + if (q.isAll()) {
1.90 + socket.send(new Reply(true, msgs.toArray(new Message[0])).toString());
1.91 + }
1.92 } catch (IOException ex) {
1.93 throw new IllegalStateException(ex);
1.94 }