Change how protocol messages are sent.
authorJack Miller <jack@codezen.org>
Mon, 15 Jun 2015 22:12:47 +0000 (17:12 -0500)
committerJack Miller <jack@codezen.org>
Tue, 16 Jun 2015 06:21:58 +0000 (01:21 -0500)
It's better to just know the size right off the bat than do this
fragment management and add the possibility of a small message getting
held up in a buffer waiting for it to fill.

canto_next/protocol.py

index a63fa6e..d281ec5 100644 (file)
@@ -14,14 +14,13 @@ import socket
 import select
 import errno
 import getopt
+import struct
 import shlex
 import json
 import time
 import sys
 import os
 
-PROTO_TERMINATOR='\x00'
-
 log = logging.getLogger('SOCKET')
 
 class CantoSocket:
@@ -51,13 +50,6 @@ class CantoSocket:
 
         self.sockets = []
 
-        # Holster for partial reads.
-        self.fragments = { }
-
-        on_hook("client_new_socket", self.prot_new_frag)
-        on_hook("server_new_socket", self.prot_new_frag)
-        on_hook("server_kill_socket", self.prot_kill_frag)
-
         self.connect()
 
     # Handle options common to all servers and clients
@@ -188,29 +180,12 @@ class CantoSocket:
                 select.POLLOUT | select.POLLHUP | select.POLLERR |\
                 select.POLLNVAL)
 
-    def prot_new_frag(self, newconn):
-        if newconn not in self.fragments:
-            self.fragments[newconn] = ""
-
-    def prot_kill_frag(self, deadconn):
-        if deadconn in self.fragments:
-            del self.fragments[deadconn]
-
     # Take raw data, return (cmd, args) tuple or None if not enough data.
     def parse(self, conn, data):
-
-        self.fragments[conn] += data
-
-        if PROTO_TERMINATOR not in self.fragments[conn]:
-            return None
-
-        message, self.fragments[conn] =\
-                self.fragments[conn].split(PROTO_TERMINATOR, 1)
-
         try:
-            cmd, args = eval(repr(json.loads(message)), {}, {})
+            cmd, args = eval(repr(json.loads(data)), {}, {})
         except:
-            log.error("Failed to parse message: %s" % message)
+            log.error("Failed to parse message: %s" % data)
         else:
             log.debug("\n\nRead:\n%s", json.dumps((cmd, args), indent=4, sort_keys=True))
             return (cmd, args)
@@ -227,9 +202,6 @@ class CantoSocket:
         return r
 
     def _do_read(self, conn, timeout):
-        if self.fragments[conn] and PROTO_TERMINATOR in self.fragments[conn]:
-            return self.parse(conn, "") # <- already uses self.fragments
-
         poll = select.poll()
 
         try:
@@ -258,22 +230,27 @@ class CantoSocket:
             log.debug("Read ERR")
             return select.POLLHUP
         if e & (select.POLLIN | select.POLLPRI):
+            message = ""
             try:
-                fragment = conn.recv(4096).decode()
+                size = struct.unpack('!q', conn.recv(8))[0]
+                while size:
+                    frag = conn.recv(min((4096, size)))
+                    size -= len(frag)
+                    message += frag.decode()
             except Exception as e:
                 if e.args[0] == errno.EINTR:
                     return
-                log.error("Error sending: %s" % e)
+                log.error("Error receiving: %s" % e)
                 log.error("Interpreting as HUP")
                 return select.POLLHUP
 
             # Never get POLLRDHUP on INET sockets, so
             # use POLLIN with no data as POLLHUP
-            if not fragment:
+            if not message:
                 log.debug("Read POLLIN with no data")
                 return select.POLLHUP
 
-            return self.parse(conn, fragment)
+            return self.parse(conn, message)
 
         # Parse POLLHUP last so if we still got POLLIN, any data
         # is still retrieved from the socket.
@@ -298,7 +275,9 @@ class CantoSocket:
     def _do_write(self, conn, cmd, args):
         log.debug("\n\nWrite:\n%s\n", json.dumps((cmd, args), indent=4, sort_keys=True))
 
-        message = json.dumps((cmd, args)) + PROTO_TERMINATOR
+        message = json.dumps((cmd, args))
+        size = struct.pack("!q", len(message))
+        message = size + message.encode("UTF-8")
 
         poll = select.poll()
         tosend = message
@@ -345,7 +324,7 @@ class CantoSocket:
                 return select.POLLHUP
             if e & select.POLLOUT:
                 try:
-                    sent = conn.send(tosend.encode("UTF-8"))
+                    sent = conn.send(tosend)
                 except Exception as e:
                     if e.args[0] == errno.EINTR:
                         continue