67e99063dc6345cd1da3ab2984b433079d8785fd
[canto-next.git] / canto_next / fetch.py
1 # -*- coding: utf-8 -*-
2 #Canto - RSS reader backend
3 #   Copyright (C) 2014 Jack Miller <jack@codezen.org>
4 #
5 #   This program is free software; you can redistribute it and/or modify
6 #   it under the terms of the GNU General Public License version 2 as 
7 #   published by the Free Software Foundation.
8
9 from .plugins import PluginHandler, Plugin
10 from .feed import allfeeds
11 from .hooks import call_hook
12
13 from threading import Thread
14 import feedparser
15 import traceback
16 import urllib.parse
17 import urllib.request, urllib.error, urllib.parse
18 import logging
19 import socket
20 import json
21 import time
22
23 log = logging.getLogger("CANTO-FETCH")
24
25 class DaemonFetchThreadPlugin(Plugin):
26     pass
27
28 # This is the first time I've ever had a need for multiple inheritance.
29 # I'm not sure if that's a good thing or not =)
30
31 class CantoFetchThread(PluginHandler, Thread):
32     def __init__(self, feed, fromdisk):
33         PluginHandler.__init__(self)
34         Thread.__init__(self, name="Fetch: %s" % feed.URL)
35         self.daemon = True
36
37         self.plugin_class = DaemonFetchThreadPlugin
38         self.update_plugin_lookups()
39
40         # feedparser honors this value, want to avoid hung feeds when the
41         # internet connection is flaky
42
43         socket.setdefaulttimeout(30)
44
45         self.feed = feed
46         self.fromdisk = fromdisk
47
48     def run(self):
49
50         # Initial load, just feed.index grab from disk.
51
52         if self.fromdisk:
53             self.feed.index({"entries" : []})
54             return
55
56         self.feed.last_update = time.time()
57
58         # Otherwise, actually try to get an update.
59
60         extra_headers = { 'User-Agent' :\
61                 'Canto/0.9.0 + http://codezen.org/canto-ng'}
62
63         try:
64             result = None
65             # Passworded Feed
66             if self.feed.username or self.feed.password:
67                 domain = urllib.parse.urlparse(self.feed.URL)[1]
68                 man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
69                 auth = urllib.request.HTTPBasicAuthHandler(man)
70                 auth.handler_order = 490
71                 auth.add_password(None, domain, self.feed.username,
72                         self.feed.password)
73
74                 try:
75                     result = feedparser.parse(self.feed.URL, handlers=[auth],
76                             request_headers = extra_headers)
77                 except:
78                     # And, failing that, Digest Authentication
79                     man = urllib.request.HTTPPasswordMgrWithDefaultRealm()
80                     auth = urllib.request.HTTPDigestAuthHandler(man)
81                     auth.handler_order = 490
82                     auth.add_password(None, domain, self.feed.username,
83                             self.feed.password)
84                     result = feedparser.parse(self.feed.URL, handlers=[auth],
85                             request_headers = extra_headers)
86
87             # No password
88             else:
89                 result = feedparser.parse(self.feed.URL,
90                         request_headers = extra_headers)
91
92             update_contents = result
93         except Exception as e:
94             log.error("ERROR: try to parse %s, got %s" % (self.feed.URL, e))
95             return
96
97         # Interpret feedparser's bozo_exception, if there was an
98         # error that resulted in no content, it's the same as
99         # any other broken feed.
100
101         if "bozo_exception" in update_contents:
102             if update_contents["bozo_exception"] == urllib.error.URLError:
103                 log.error("ERROR: couldn't grab %s : %s" %\
104                         (self.feed.URL,\
105                         update_contents["bozo_exception"].reason))
106                 return
107             elif len(update_contents["entries"]) == 0:
108                 log.error("No content in %s: %s" %\
109                         (self.feed.URL,\
110                         update_contents["bozo_exception"]))
111                 return
112
113             # Replace it if we ignore it, since exceptions
114             # are not pickle-able.
115
116             update_contents["bozo_exception"] = None
117
118         # Update timestamp
119         update_contents["canto_update"] = self.feed.last_update
120
121         update_contents = json.loads(json.dumps(update_contents))
122
123         log.debug("Parsed %s", self.feed.URL)
124
125         # Allow DaemonFetchThreadPlugins to do any sort of fetch stuff
126         # before the thread is marked as complete.
127
128         for attr in list(self.plugin_attrs.keys()):
129             if not attr.startswith("fetch_"):
130                 continue
131
132             try:
133                 a = getattr(self, attr)
134                 a(feed = self.feed, newcontent = update_contents)
135             except:
136                 log.error("Error running fetch thread plugin")
137                 log.error(traceback.format_exc())
138
139         log.debug("Plugins complete.")
140
141         # This handles it's own locking
142         self.feed.index(update_contents)
143
144
145 class CantoFetch():
146     def __init__(self, shelf):
147         self.shelf = shelf
148         self.threads = []
149
150     def needs_update(self, feed):
151         passed = time.time() - feed.last_update
152         if passed < feed.rate * 60:
153             return False
154         return True
155
156     def still_working(self, URL):
157         for thread, workingURL in self.threads:
158             if URL == workingURL:
159                 return True
160         return False
161
162     def fetch(self, force, fromdisk):
163         for feed in allfeeds.get_feeds():
164             if not force and not self.needs_update(feed):
165                 continue
166
167             if self.still_working(feed.URL):
168                 continue
169
170             thread = CantoFetchThread(feed, fromdisk)
171             thread.start()
172             log.debug("Started thread for feed %s", feed.URL)
173             self.threads.append((thread, feed.URL))
174
175     def reap(self, force=False):
176         work_done = False
177         newthreads = []
178
179         for thread, URL in self.threads:
180             if not force and thread.isAlive():
181                 newthreads.append((thread, URL))
182                 continue
183             work_done = True
184             thread.join()
185
186         self.threads = newthreads
187
188         if work_done:
189             if self.threads == []:
190                 self.shelf.sync()
191             call_hook("daemon_work_done", [])