5fde678d8099467d69335f77a2a94eecde5b6925
[canto-next.git] / canto_next / fetch.py
1 # -*- coding: utf-8 -*-
2 #Canto - RSS reader backend
3 #   Copyright (C) 2014 Jack Miller <jack@codezen.org>
4 #
5 #   This program is free software; you can redistribute it and/or modify
6 #   it under the terms of the GNU General Public License version 2 as 
7 #   published by the Free Software Foundation.
8
9 from .plugins import PluginHandler, Plugin
10 from .feed import allfeeds
11 from .hooks import call_hook
12
13 from multiprocessing import cpu_count
14 from threading import Thread
15
16 import feedparser
17 import traceback
18 import urllib.parse
19 import urllib.request
20 import urllib.error
21 import logging
22 import socket
23 import json
24 import time
25
26 log = logging.getLogger("CANTO-FETCH")
27
28 class DaemonFetchThreadPlugin(Plugin):
29     pass
30
31 # This is the first time I've ever had a need for multiple inheritance.
32 # I'm not sure if that's a good thing or not =)
33
34 class CantoFetchThread(PluginHandler, Thread):
35     def __init__(self, feed, fromdisk):
36         PluginHandler.__init__(self)
37         Thread.__init__(self, name="Fetch: %s" % feed.URL)
38         self.daemon = True
39
40         self.plugin_class = DaemonFetchThreadPlugin
41         self.update_plugin_lookups()
42
43         # feedparser honors this value, want to avoid hung feeds when the
44         # internet connection is flaky
45
46         socket.setdefaulttimeout(30)
47
48         self.feed = feed
49         self.fromdisk = fromdisk
50
51     def run(self):
52
53         # Initial load, just feed.index grab from disk.
54
55         if self.fromdisk:
56             self.feed.index({"entries" : []})
57             return
58
59         self.feed.last_update = time.time()
60
61         # Otherwise, actually try to get an update.
62
63         extra_headers = { 'User-Agent' :\
64                 'Canto/0.9.0 + http://codezen.org/canto-ng'}
65
66         try:
67             result = None
68             # Passworded Feed
69             if self.feed.username or self.feed.password:
70                 domain = urllib.parse.urlparse(self.feed.URL)[1]
71                 man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
72                 auth = urllib.request.HTTPBasicAuthHandler(man)
73                 auth.handler_order = 490
74                 auth.add_password(None, domain, self.feed.username,
75                         self.feed.password)
76
77                 try:
78                     result = feedparser.parse(self.feed.URL, handlers=[auth],
79                             request_headers = extra_headers)
80                 except:
81                     # And, failing that, Digest Authentication
82                     man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
83                     auth = urllib.request.HTTPDigestAuthHandler(man)
84                     auth.handler_order = 490
85                     auth.add_password(None, domain, self.feed.username,
86                             self.feed.password)
87                     result = feedparser.parse(self.feed.URL, handlers=[auth],
88                             request_headers = extra_headers)
89
90             # No password
91             else:
92                 result = feedparser.parse(self.feed.URL,
93                         request_headers = extra_headers)
94
95             update_contents = result
96         except Exception as e:
97             log.error("ERROR: try to parse %s, got %s" % (self.feed.URL, e))
98             return
99
100         # Interpret feedparser's bozo_exception, if there was an
101         # error that resulted in no content, it's the same as
102         # any other broken feed.
103
104         if "bozo_exception" in update_contents:
105             if update_contents["bozo_exception"] == urllib.error.URLError:
106                 log.error("ERROR: couldn't grab %s : %s" %\
107                         (self.feed.URL,\
108                         update_contents["bozo_exception"].reason))
109                 return
110             elif len(update_contents["entries"]) == 0:
111                 log.error("No content in %s: %s" %\
112                         (self.feed.URL,\
113                         update_contents["bozo_exception"]))
114                 return
115
116             # Replace it if we ignore it, since exceptions
117             # are not pickle-able.
118
119             update_contents["bozo_exception"] = None
120
121         # Update timestamp
122         update_contents["canto_update"] = self.feed.last_update
123
124         update_contents = json.loads(json.dumps(update_contents))
125
126         log.debug("Parsed %s", self.feed.URL)
127
128         # Allow DaemonFetchThreadPlugins to do any sort of fetch stuff
129         # before the thread is marked as complete.
130
131         for attr in list(self.plugin_attrs.keys()):
132             if not attr.startswith("fetch_"):
133                 continue
134
135             try:
136                 a = getattr(self, attr)
137                 a(feed = self.feed, newcontent = update_contents)
138             except:
139                 log.error("Error running fetch thread plugin")
140                 log.error(traceback.format_exc())
141
142         log.debug("Plugins complete.")
143
144         # This handles it's own locking
145         self.feed.index(update_contents)
146
147 class CantoFetch():
148     def __init__(self, shelf):
149         self.shelf = shelf
150         self.deferred = []
151         self.threads = []
152         self.thread_limit = cpu_count()
153         log.debug("Thread Limit: %s", self.thread_limit)
154
155     def needs_update(self, feed):
156         passed = time.time() - feed.last_update
157         if passed < feed.rate * 60:
158             return False
159         return True
160
161     def still_working(self, URL):
162         for thread, workingURL in self.threads:
163             if URL == workingURL:
164                 return True
165         return False
166
167     def _start_one(self, feed, fromdisk):
168         if len(self.threads) >= self.thread_limit:
169             return False
170
171         # If feed is stopped/dead, pretend like we did the work but don't
172         # resurrect tags
173
174         if feed.stopped:
175             return True
176
177         thread = CantoFetchThread(feed, fromdisk)
178         thread.start()
179         log.debug("Started thread for feed %s", feed)
180         self.threads.append((thread, feed.URL))
181         return True
182
183     def fetch(self, force, fromdisk):
184         for feed, fd in self.deferred[:]:
185             if self._start_one(feed, fd):
186                 log.debug("No longer deferred")
187                 self.deferred = self.deferred[1:]
188             else:
189                 return
190
191         for feed in allfeeds.get_feeds():
192             if not force and not self.needs_update(feed):
193                 continue
194
195             if self.still_working(feed.URL):
196                 continue
197
198             if not self._start_one(feed, fromdisk):
199                 log.debug("Deferring %s %s", feed, fromdisk)
200                 self.deferred.append((feed, fromdisk))
201
202     def reap(self, force=False):
203         work_done = False
204         newthreads = []
205
206         for thread, URL in self.threads:
207             if not force and thread.isAlive():
208                 newthreads.append((thread, URL))
209                 continue
210             work_done = True
211             thread.join()
212
213         self.threads = newthreads
214
215         if work_done:
216             if self.threads == []:
217                 self.shelf.sync()
218             call_hook("daemon_work_done", [])