1 # -*- coding: utf-8 -*-
2 #Canto - RSS reader backend
3 # Copyright (C) 2014 Jack Miller <jack@codezen.org>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation.
9 from .plugins import PluginHandler, Plugin
10 from .feed import allfeeds
11 from .hooks import call_hook
13 from multiprocessing import cpu_count
14 from threading import Thread
26 log = logging.getLogger("CANTO-FETCH")
28 class DaemonFetchThreadPlugin(Plugin):
31 # This is the first time I've ever had a need for multiple inheritance.
32 # I'm not sure if that's a good thing or not =)
34 class CantoFetchThread(PluginHandler, Thread):
35 def __init__(self, feed, fromdisk):
36 PluginHandler.__init__(self)
37 Thread.__init__(self, name="Fetch: %s" % feed.URL)
40 self.plugin_class = DaemonFetchThreadPlugin
41 self.update_plugin_lookups()
43 # feedparser honors this value, want to avoid hung feeds when the
44 # internet connection is flaky
46 socket.setdefaulttimeout(30)
49 self.fromdisk = fromdisk
53 # Initial load, just feed.index grab from disk.
56 self.feed.index({"entries" : []})
59 self.feed.last_update = time.time()
61 # Otherwise, actually try to get an update.
63 extra_headers = { 'User-Agent' :\
64 'Canto/0.9.0 + http://codezen.org/canto-ng'}
69 if self.feed.username or self.feed.password:
70 domain = urllib.parse.urlparse(self.feed.URL)[1]
71 man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
72 auth = urllib.request.HTTPBasicAuthHandler(man)
73 auth.handler_order = 490
74 auth.add_password(None, domain, self.feed.username,
78 result = feedparser.parse(self.feed.URL, handlers=[auth],
79 request_headers = extra_headers)
81 # And, failing that, Digest Authentication
82 man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
83 auth = urllib.request.HTTPDigestAuthHandler(man)
84 auth.handler_order = 490
85 auth.add_password(None, domain, self.feed.username,
87 result = feedparser.parse(self.feed.URL, handlers=[auth],
88 request_headers = extra_headers)
92 result = feedparser.parse(self.feed.URL,
93 request_headers = extra_headers)
95 update_contents = result
96 except Exception as e:
97 log.error("ERROR: try to parse %s, got %s" % (self.feed.URL, e))
100 # Interpret feedparser's bozo_exception, if there was an
101 # error that resulted in no content, it's the same as
102 # any other broken feed.
104 if "bozo_exception" in update_contents:
105 if update_contents["bozo_exception"] == urllib.error.URLError:
106 log.error("ERROR: couldn't grab %s : %s" %\
108 update_contents["bozo_exception"].reason))
110 elif len(update_contents["entries"]) == 0:
111 log.error("No content in %s: %s" %\
113 update_contents["bozo_exception"]))
116 # Replace it if we ignore it, since exceptions
117 # are not pickle-able.
119 update_contents["bozo_exception"] = None
122 update_contents["canto_update"] = self.feed.last_update
124 update_contents = json.loads(json.dumps(update_contents))
126 log.debug("Parsed %s", self.feed.URL)
128 # Allow DaemonFetchThreadPlugins to do any sort of fetch stuff
129 # before the thread is marked as complete.
131 for attr in list(self.plugin_attrs.keys()):
132 if not attr.startswith("fetch_"):
136 a = getattr(self, attr)
137 a(feed = self.feed, newcontent = update_contents)
139 log.error("Error running fetch thread plugin")
140 log.error(traceback.format_exc())
142 log.debug("Plugins complete.")
144 # This handles it's own locking
145 self.feed.index(update_contents)
148 def __init__(self, shelf):
152 self.thread_limit = cpu_count()
153 log.debug("Thread Limit: %s", self.thread_limit)
155 def needs_update(self, feed):
156 passed = time.time() - feed.last_update
157 if passed < feed.rate * 60:
161 def still_working(self, URL):
162 for thread, workingURL in self.threads:
163 if URL == workingURL:
167 def _start_one(self, feed, fromdisk):
168 if len(self.threads) >= self.thread_limit:
171 # If feed is stopped/dead, pretend like we did the work but don't
177 thread = CantoFetchThread(feed, fromdisk)
179 log.debug("Started thread for feed %s", feed)
180 self.threads.append((thread, feed.URL))
183 def fetch(self, force, fromdisk):
184 for feed, fd in self.deferred[:]:
185 if self._start_one(feed, fd):
186 log.debug("No longer deferred")
187 self.deferred = self.deferred[1:]
191 for feed in allfeeds.get_feeds():
192 if not force and not self.needs_update(feed):
195 if self.still_working(feed.URL):
198 if not self._start_one(feed, fromdisk):
199 log.debug("Deferring %s %s", feed, fromdisk)
200 self.deferred.append((feed, fromdisk))
202 def reap(self, force=False):
206 for thread, URL in self.threads:
207 if not force and thread.isAlive():
208 newthreads.append((thread, URL))
213 self.threads = newthreads
216 call_hook("daemon_work_done", [])
217 if self.threads == []: