Queue writes when socket is full
authorJack Miller <jack@codezen.org>
Mon, 20 Jul 2015 16:30:28 +0000 (11:30 -0500)
committerJack Miller <jack@codezen.org>
Wed, 22 Jul 2015 03:20:30 +0000 (22:20 -0500)
Current code breaks down when there's a large amount of socket traffic.
The reason is that both sides, the client and server can block in poll()
on write() forever because the socket is full and the condition will
never clear if the other side is also blocked on write (or any other
reason it's not actively reading data from the socket).

Right now, the flush is attached to the read() because this is where the
thread will idle and it doesn't hurt to wake up a couple of times a
second if we have data pending.

canto_next/protocol.py
canto_next/server.py

index fb0f22f..886d98f 100644 (file)
@@ -50,6 +50,7 @@ class CantoSocket:
         self.sockets = []
         self.read_locks = {}
         self.write_locks = {}
+        self.write_frags = {}
 
         self.connect()
 
@@ -180,6 +181,7 @@ class CantoSocket:
         self.sockets.append(sock)
         self.read_locks[sock] = Lock()
         self.write_locks[sock] = Lock()
+        self.write_frags[sock] = None
         return sock
 
     # Setup poll.poll() object to watch for read status on conn.
@@ -204,18 +206,23 @@ class CantoSocket:
             log.debug("\n\nRead:\n%s", json.dumps((cmd, args), indent=4, sort_keys=True))
             return (cmd, args)
 
-    # Reads from a connection, returns:
-    # 1) (cmd, args) from self.parse if possible.
-    # 2) None, if there was not enough data read.
-    # 3) select.POLLHUP if the connection is dead.
-
     def do_read(self, conn, timeout=None):
-        self.read_locks[conn].acquire()
-        r = self._do_read(conn, timeout)
-        self.read_locks[conn].release()
-        if r == select.POLLHUP:
-            self.disconnected(conn)
-        return r
+        while True:
+            to = timeout
+            if self.write_frags[conn] != None:
+                if to == None:
+                    to = 500
+                self.do_write(conn, None, None)
+
+            self.read_locks[conn].acquire()
+            r = self._do_read(conn, to)
+            self.read_locks[conn].release()
+
+            if r == select.POLLHUP:
+                self.disconnected(conn)
+            elif r == None and timeout == None:
+                continue
+            return r
 
     def _do_read(self, conn, timeout):
         poll = select.poll()
@@ -286,76 +293,81 @@ class CantoSocket:
 
     def do_write(self, conn, cmd, args):
         self.write_locks[conn].acquire()
-        r = self._do_write(conn, cmd, args)
+        r, frag = self._do_write(conn, cmd, args, self.write_frags[conn])
         self.write_locks[conn].release()
+
         if r == select.POLLHUP:
             self.disconnected(conn)
+        elif r == errno.EINTR:
+            self.write_frags[conn] = frag
+        else:
+            self.write_frags[conn] = None
+
         return r
 
-    def _do_write(self, conn, cmd, args):
+    def _do_write(self, conn, cmd, args, frag):
         log.debug("\n\nWrite:\n%s\n", json.dumps((cmd, args), indent=4, sort_keys=True))
 
-        message = json.dumps((cmd, args)).encode("UTF-8")
-        size = struct.pack("!q", len(message))
-        message = size + message
+        tosend = b""
 
-        poll = select.poll()
-        tosend = message
+        if cmd:
+            message = json.dumps((cmd, args)).encode("UTF-8")
+            size = struct.pack("!q", len(message))
+            tosend = size + message
 
-        try:
-            self.write_mode(poll, conn)
-        except:
-            log.error("Error putting conn in write mode.")
-            log.error("Interpreting as HUP")
-            return select.POLLHUP
-
-        eintr_count = 0
+        if frag:
+            tosend = frag + tosend
 
         while tosend:
+            poll = select.poll()
 
-            # Again, we only care about the first descriptor's mask
+            try:
+                self.write_mode(poll, conn)
+            except:
+                log.error("Error putting conn in write mode.")
+                log.error("Interpreting as HUP")
+                return (select.POLLHUP, 0)
 
             try:
-                p = poll.poll()
+                p = poll.poll(1)
             except select.error as e:
                 if e.args[0] == errno.EINTR:
-                    eintr_count += 1
-                    if eintr_count >= 3:
-                        log.error("conn %s appears valid, but unresponsive." % conn)
-                        log.error("Closing conn, please check client.")
-                        conn.close()
-                        return select.POLLHUP
-                    continue
+                    return (errno.EINTR, tosend)
                 log.error("Raising error: %s" % e[1])
                 raise
 
-            if not p:
-                continue
+            if p == []:
+                log.debug("poll timed out")
+                return (errno.EINTR, tosend)
+
             e = p[0][1]
 
             if e & select.POLLHUP:
                 log.debug("Write HUP")
-                return select.POLLHUP
+                return (select.POLLHUP, 0)
             if e & select.POLLNVAL:
                 log.debug("Write NVAL")
-                return select.POLLHUP
+                return (select.POLLHUP, 0)
             if e & select.POLLERR:
                 log.debug("Write ERR")
-                return select.POLLHUP
+                return (select.POLLHUP, 0)
             if e & select.POLLOUT:
                 try:
                     sent = conn.send(tosend)
                 except Exception as e:
                     if e.args[0] == errno.EINTR:
-                        continue
+                        return (errno.EINTR, tosend)
                     log.error("Error sending: %s" % e[1])
                     log.error("Interpreting as HUP")
-                    return select.POLLHUP
+                    return (select.POLLHUP, 0)
 
                 tosend = tosend[sent:]
                 log.debug("Sent %d bytes.", sent)
 
+        return (None, 0)
+
     def disconnected(self, conn):
         self.sockets.remove(conn)
         del self.read_locks[conn]
         del self.write_locks[conn]
+        del self.write_frags[conn]
index 4cebf78..a572d4c 100644 (file)
@@ -91,6 +91,7 @@ class CantoServer(CantoSocket):
     def accept_conn(self, conn):
         self.read_locks[conn] = Lock()
         self.write_locks[conn] = Lock()
+        self.write_frags[conn] = None
 
         # Notify watchers about new socket.
         call_hook("server_new_socket", [conn])