d212726b7b065e6118ac98b33aea0c8dcc67bae7
[canto-next.git] / canto_next / protocol.py
1 # -*- coding: utf-8 -*-
2
3 #Canto - RSS reader backend
4 #   Copyright (C) 2014 Jack Miller <jack@codezen.org>
5 #
6 #   This program is free software; you can redistribute it and/or modify
7 #   it under the terms of the GNU General Public License version 2 as 
8 #   published by the Free Software Foundation.
9
10 import logging
11 import socket
12 import select
13 import errno
14 import getopt
15 import struct
16 import shlex
17 import json
18 import time
19 import sys
20 import os
21
22 log = logging.getLogger('SOCKET')
23
24 class CantoSocket:
25     def __init__(self, socket_name, **kwargs):
26
27         self.socket_name = socket_name
28
29         if "server" in kwargs and kwargs["server"]:
30             self.server = True
31         else:
32             self.server = False
33
34         if "port" in kwargs:
35             self.port = kwargs["port"]
36         else:
37             self.port = 0
38
39         if "interface" in kwargs:
40             self.interface = kwargs["interface"]
41         else:
42             self.interface = ''
43
44         if "address" in kwargs:
45             self.address = kwargs["address"]
46         else:
47             self.address = None
48
49         self.sockets = []
50
51         self.connect()
52
53     # Handle options common to all servers and clients
54
55     def common_args(self, extrashort = "", extralong = [], version = ""):
56         self.verbosity = 0
57         self.port = -1
58         self.addr = None
59         self.disabled_plugins = []
60         self.enabled_plugins = []
61         self.plugin_default = True
62
63         try:
64             optlist, sys.argv =\
65                 getopt.getopt(sys.argv[1:], 'D:p:a:vV' + extrashort, ["dir=",
66                 "port=", "address=","version", "noplugins","enableplugins=",
67                 "disableplugins="] + extralong)
68
69         except getopt.GetoptError as e:
70             log.error("Error: %s" % e.msg)
71             return -1
72
73         self.conf_dir = os.path.expanduser("~/.canto-ng/")
74
75         self.location_args = []
76
77         for opt, arg in optlist:
78             if opt in [ "-D", "--dir"]:
79                 self.conf_dir = os.path.expanduser(arg)
80                 self.conf_dir = os.path.realpath(self.conf_dir)
81                 self.location_args += [ opt, arg ]
82
83             elif opt in ["-V", "--version"]:
84                 print(version)
85                 sys.exit(0)
86
87             elif opt in ["-v"]:
88                 self.verbosity += 1
89
90             elif opt in [ "-p", "--port"]:
91                 try:
92                     self.port = int(arg)
93                     if self.port < 0:
94                         raise Exception
95                 except:
96                     log.error("Error: Port must be >0 integer.")
97                     return -1
98
99                 # Assume loopback if address hasn't been set yet.
100                 if self.addr == None:
101                     self.addr = "127.0.0.1"
102
103                 self.location_args += [ opt, arg ]
104
105             elif opt in [ "-a", "--address"]:
106                 self.addr = arg
107                 self.location_args += [ opt, arg ]
108
109             elif opt in ['--noplugins']:
110                 self.plugin_default = False
111
112             elif opt in ['--disableplugins']:
113                 self.disabled_plugins = shlex.split(arg)
114
115             elif opt in ['--enableplugins']:
116                 self.enabled_plugins = shlex.split(arg)
117
118         self.socket_path = self.conf_dir + "/.canto_socket"
119
120         return optlist
121
122     # Server setup, potentially both unix and inet sockets.
123     def connect(self):
124         if self.server:
125             if self.socket_name:
126                 # Remove old unix socket.
127                 if os.path.exists(self.socket_name):
128                     os.remove(self.socket_name)
129
130                 # Setup new socket.
131                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
132                 sock.setblocking(0)
133                 sock.bind(self.socket_name)
134                 sock.listen(5)
135
136             # Net socket setup.
137             if self.port > 0:
138                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
139                 sock.setblocking(0)
140
141                 sock.bind((self.interface, self.port))
142                 sock.listen(5)
143
144         # Client setup, can only do unix or inet, not both.
145
146         else:
147             if self.address and self.port > 0:
148                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149                 addr = (self.address, self.port)
150             else:
151                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
152                 addr = self.socket_name
153
154             tries = 10
155
156             while tries > 0:
157                 try:
158                     sock.connect(addr)
159                     break
160                 except Exception as e:
161                     if e.args[0] != errno.ECONNREFUSED or tries == 1:
162                         raise
163                 time.sleep(1)
164                 tries -= 1
165
166         self.sockets.append(sock)
167         return sock
168
169     # Setup poll.poll() object to watch for read status on conn.
170     def read_mode(self, poll, conn):
171         poll.register(conn.fileno(),\
172                 select.POLLIN | select.POLLHUP | select.POLLERR |\
173                 select.POLLPRI)
174
175     # Setup poll.poll() object to watch for write status on conn.
176     def write_mode(self, poll, conn):
177         poll.register(conn.fileno(),\
178                 select.POLLOUT | select.POLLHUP | select.POLLERR |\
179                 select.POLLNVAL)
180
181     # Take raw data, return (cmd, args) tuple or None if not enough data.
182     def parse(self, conn, data):
183         try:
184             cmd, args = eval(repr(json.loads(data)), {}, {})
185         except:
186             log.error("Failed to parse message: %s" % data)
187         else:
188             log.debug("\n\nRead:\n%s", json.dumps((cmd, args), indent=4, sort_keys=True))
189             return (cmd, args)
190
191     # Reads from a connection, returns:
192     # 1) (cmd, args) from self.parse if possible.
193     # 2) None, if there was not enough data read.
194     # 3) select.POLLHUP if the connection is dead.
195
196     def do_read(self, conn, timeout=None):
197         r = self._do_read(conn, timeout)
198         if r == select.POLLHUP:
199             self.disconnected(conn)
200         return r
201
202     def _do_read(self, conn, timeout):
203         poll = select.poll()
204
205         try:
206             self.read_mode(poll, conn)
207         except:
208             log.error("Error putting conn in read mode.")
209             log.error("Interpreting as HUP")
210             return select.POLLHUP
211
212         # We only care about the first (only) descriptor's event
213         try:
214             p = poll.poll(timeout)
215         except select.error as e:
216             if e.args[0] == errno.EINTR:
217                 return
218             log.debug("Raising error: %s", e[1])
219             raise
220
221         if timeout and not p:
222             return
223
224         e = p[0][1]
225
226         log.debug("E: %d", e)
227         if e & select.POLLERR:
228             log.debug("Read ERR")
229             return select.POLLHUP
230         if e & (select.POLLIN | select.POLLPRI):
231             message = ""
232             try:
233                 size = struct.unpack('!q', conn.recv(8))[0]
234                 while size:
235                     frag = conn.recv(min((4096, size)))
236                     size -= len(frag)
237                     message += frag.decode()
238             except Exception as e:
239                 if e.args[0] == errno.EINTR:
240                     return
241                 log.error("Error receiving: %s" % e)
242                 log.error("Interpreting as HUP")
243                 return select.POLLHUP
244
245             # Never get POLLRDHUP on INET sockets, so
246             # use POLLIN with no data as POLLHUP
247             if not message:
248                 log.debug("Read POLLIN with no data")
249                 return select.POLLHUP
250
251             return self.parse(conn, message)
252
253         # Parse POLLHUP last so if we still got POLLIN, any data
254         # is still retrieved from the socket.
255         if e & select.POLLHUP:
256             log.debug("Read HUP")
257             return select.POLLHUP
258
259         # Non-empty, but not anything we're interested in?
260         log.debug("Unknown poll.poll() return")
261         return select.POLLHUP
262
263     # Writes a (cmd, args) to a single connection, returns:
264     # 1) None if the write completed.
265     # 2) select.POLLHUP is the connection is dead.
266
267     def do_write(self, conn, cmd, args):
268         r = self._do_write(conn, cmd, args)
269         if r == select.POLLHUP:
270             self.disconnected(conn)
271         return r
272
273     def _do_write(self, conn, cmd, args):
274         log.debug("\n\nWrite:\n%s\n", json.dumps((cmd, args), indent=4, sort_keys=True))
275
276         message = json.dumps((cmd, args))
277         size = struct.pack("!q", len(message))
278         message = size + message.encode("UTF-8")
279
280         poll = select.poll()
281         tosend = message
282
283         try:
284             self.write_mode(poll, conn)
285         except:
286             log.error("Error putting conn in write mode.")
287             log.error("Interpreting as HUP")
288             return select.POLLHUP
289
290         eintr_count = 0
291
292         while tosend:
293
294             # Again, we only care about the first descriptor's mask
295
296             try:
297                 p = poll.poll()
298             except select.error as e:
299                 if e.args[0] == errno.EINTR:
300                     eintr_count += 1
301                     if eintr_count >= 3:
302                         log.error("conn %s appears valid, but unresponsive." % conn)
303                         log.error("Closing conn, please check client.")
304                         conn.close()
305                         return select.POLLHUP
306                     continue
307                 log.error("Raising error: %s" % e[1])
308                 raise
309
310             if not p:
311                 continue
312             e = p[0][1]
313
314             if e & select.POLLHUP:
315                 log.debug("Write HUP")
316                 return select.POLLHUP
317             if e & select.POLLNVAL:
318                 log.debug("Write NVAL")
319                 return select.POLLHUP
320             if e & select.POLLERR:
321                 log.debug("Write ERR")
322                 return select.POLLHUP
323             if e & select.POLLOUT:
324                 try:
325                     sent = conn.send(tosend)
326                 except Exception as e:
327                     if e.args[0] == errno.EINTR:
328                         continue
329                     log.error("Error sending: %s" % e[1])
330                     log.error("Interpreting as HUP")
331                     return select.POLLHUP
332
333                 tosend = tosend[sent:]
334                 log.debug("Sent %d bytes.", sent)
335
336     def disconnected(self, conn):
337         pass