Thread limit fetching
[canto-next.git] / canto_next / fetch.py
1 # -*- coding: utf-8 -*-
2 #Canto - RSS reader backend
3 #   Copyright (C) 2014 Jack Miller <jack@codezen.org>
4 #
5 #   This program is free software; you can redistribute it and/or modify
6 #   it under the terms of the GNU General Public License version 2 as 
7 #   published by the Free Software Foundation.
8
9 from .plugins import PluginHandler, Plugin
10 from .feed import allfeeds
11 from .hooks import call_hook
12
13 from multiprocessing import cpu_count
14 from threading import Thread
15
16 import feedparser
17 import traceback
18 import urllib.parse
19 import urllib.request, urllib.error, urllib.parse
20 import logging
21 import socket
22 import json
23 import time
24
25 log = logging.getLogger("CANTO-FETCH")
26
27 class DaemonFetchThreadPlugin(Plugin):
28     pass
29
30 # This is the first time I've ever had a need for multiple inheritance.
31 # I'm not sure if that's a good thing or not =)
32
33 class CantoFetchThread(PluginHandler, Thread):
34     def __init__(self, feed, fromdisk):
35         PluginHandler.__init__(self)
36         Thread.__init__(self, name="Fetch: %s" % feed.URL)
37         self.daemon = True
38
39         self.plugin_class = DaemonFetchThreadPlugin
40         self.update_plugin_lookups()
41
42         # feedparser honors this value, want to avoid hung feeds when the
43         # internet connection is flaky
44
45         socket.setdefaulttimeout(30)
46
47         self.feed = feed
48         self.fromdisk = fromdisk
49
50     def run(self):
51
52         # Initial load, just feed.index grab from disk.
53
54         if self.fromdisk:
55             self.feed.index({"entries" : []})
56             return
57
58         self.feed.last_update = time.time()
59
60         # Otherwise, actually try to get an update.
61
62         extra_headers = { 'User-Agent' :\
63                 'Canto/0.9.0 + http://codezen.org/canto-ng'}
64
65         try:
66             result = None
67             # Passworded Feed
68             if self.feed.username or self.feed.password:
69                 domain = urllib.parse.urlparse(self.feed.URL)[1]
70                 man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
71                 auth = urllib.request.HTTPBasicAuthHandler(man)
72                 auth.handler_order = 490
73                 auth.add_password(None, domain, self.feed.username,
74                         self.feed.password)
75
76                 try:
77                     result = feedparser.parse(self.feed.URL, handlers=[auth],
78                             request_headers = extra_headers)
79                 except:
80                     # And, failing that, Digest Authentication
81                     man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
82                     auth = urllib.request.HTTPDigestAuthHandler(man)
83                     auth.handler_order = 490
84                     auth.add_password(None, domain, self.feed.username,
85                             self.feed.password)
86                     result = feedparser.parse(self.feed.URL, handlers=[auth],
87                             request_headers = extra_headers)
88
89             # No password
90             else:
91                 result = feedparser.parse(self.feed.URL,
92                         request_headers = extra_headers)
93
94             update_contents = result
95         except Exception as e:
96             log.error("ERROR: try to parse %s, got %s" % (self.feed.URL, e))
97             return
98
99         # Interpret feedparser's bozo_exception, if there was an
100         # error that resulted in no content, it's the same as
101         # any other broken feed.
102
103         if "bozo_exception" in update_contents:
104             if update_contents["bozo_exception"] == urllib.error.URLError:
105                 log.error("ERROR: couldn't grab %s : %s" %\
106                         (self.feed.URL,\
107                         update_contents["bozo_exception"].reason))
108                 return
109             elif len(update_contents["entries"]) == 0:
110                 log.error("No content in %s: %s" %\
111                         (self.feed.URL,\
112                         update_contents["bozo_exception"]))
113                 return
114
115             # Replace it if we ignore it, since exceptions
116             # are not pickle-able.
117
118             update_contents["bozo_exception"] = None
119
120         # Update timestamp
121         update_contents["canto_update"] = self.feed.last_update
122
123         update_contents = json.loads(json.dumps(update_contents))
124
125         log.debug("Parsed %s", self.feed.URL)
126
127         # Allow DaemonFetchThreadPlugins to do any sort of fetch stuff
128         # before the thread is marked as complete.
129
130         for attr in list(self.plugin_attrs.keys()):
131             if not attr.startswith("fetch_"):
132                 continue
133
134             try:
135                 a = getattr(self, attr)
136                 a(feed = self.feed, newcontent = update_contents)
137             except:
138                 log.error("Error running fetch thread plugin")
139                 log.error(traceback.format_exc())
140
141         log.debug("Plugins complete.")
142
143         # This handles it's own locking
144         self.feed.index(update_contents)
145
146 class CantoFetch():
147     def __init__(self, shelf):
148         self.shelf = shelf
149         self.deferred = []
150         self.threads = []
151         self.thread_limit = cpu_count()
152         log.debug("Thread Limit: %s", self.thread_limit)
153
154     def needs_update(self, feed):
155         passed = time.time() - feed.last_update
156         if passed < feed.rate * 60:
157             return False
158         return True
159
160     def still_working(self, URL):
161         for thread, workingURL in self.threads:
162             if URL == workingURL:
163                 return True
164         return False
165
166     def _start_one(self, feed, fromdisk):
167         if len(self.threads) >= self.thread_limit:
168             return False
169
170         thread = CantoFetchThread(feed, fromdisk)
171         thread.start()
172         log.debug("Started thread for feed %s", feed)
173         self.threads.append((thread, feed.URL))
174         return True
175
176     def fetch(self, force, fromdisk):
177         for feed, fd in self.deferred[:]:
178             if self._start_one(feed, fd):
179                 log.debug("No longer deferred")
180                 self.deferred = self.deferred[1:]
181             else:
182                 return
183
184         for feed in allfeeds.get_feeds():
185             if not force and not self.needs_update(feed):
186                 continue
187
188             if self.still_working(feed.URL):
189                 continue
190
191             if not self._start_one(feed, fromdisk):
192                 log.debug("Deferring %s %s", feed, fromdisk)
193                 self.deferred.append((feed, fromdisk))
194
195     def reap(self, force=False):
196         work_done = False
197         newthreads = []
198
199         for thread, URL in self.threads:
200             if not force and thread.isAlive():
201                 newthreads.append((thread, URL))
202                 continue
203             work_done = True
204             thread.join()
205
206         self.threads = newthreads
207
208         if work_done:
209             if self.threads == []:
210                 self.shelf.sync()
211             call_hook("daemon_work_done", [])