9bf55ad402da39700a6f73cbe0a032906971d486
[canto-next.git] / canto_next / feed.py
1 # -*- coding: utf-8 -*-
2
3 #Canto - RSS reader backend
4 #   Copyright (C) 2014 Jack Miller <jack@codezen.org>
5 #
6 #   This program is free software; you can redistribute it and/or modify
7 #   it under the terms of the GNU General Public License version 2 as 
8 #   published by the Free Software Foundation.
9
10 from .plugins import PluginHandler, Plugin
11 from .tag import alltags
12 from .rwlock import RWLock, read_lock, write_lock
13 from .locks import feed_lock, tag_lock
14 from .hooks import call_hook
15
16 import traceback
17 import logging
18 import json
19 import time
20
21 log = logging.getLogger("FEED")
22
23 def dict_id(i):
24     if type(i) == dict:
25         return i
26     return json.loads(i)
27
28 class CantoFeeds():
29     def __init__(self):
30         self.order = []
31         self.feeds = {}
32         self.dead_feeds = {}
33
34     @write_lock(feed_lock)
35     def add_feed(self, URL, feed):
36         self.order.append(URL)
37         self.feeds[URL] = feed
38
39         # Return old feed object
40         if URL in self.dead_feeds:
41             del self.dead_feeds[URL]
42
43     @read_lock(feed_lock)
44     def get_feed(self, URL):
45         if URL in self.feeds:
46             return self.feeds[URL]
47         if URL in self.dead_feeds:
48             return self.dead_feeds[URL]
49
50     @read_lock(feed_lock)
51     def get_feeds(self):
52         return [ self.get_feed(URL) for URL in self.order]
53
54     @read_lock(feed_lock)
55     def items_to_feeds(self, items):
56         f = {}
57         for i in items:
58             d_i = dict_id(i)
59
60             if d_i["URL"] in self.feeds:
61                 feed = self.feeds[d_i["URL"]]
62             else:
63                 raise Exception("Can't find feed: %s" % d_i["URL"])
64
65             if feed in f:
66                 f[feed].append(i)
67             else:
68                 f[feed] = [i]
69         return f
70
71     def all_parsed(self):
72         for URL in self.dead_feeds:
73             feed = self.dead_feeds[URL]
74             call_hook("daemon_del_tag", [[ "maintag:" + feed.name ]])
75             feed.destroy()
76         self.dead_feeds = {}
77
78     @write_lock(feed_lock)
79     def reset(self):
80         self.dead_feeds = self.feeds
81         self.feeds = {}
82         self.order = []
83
84 allfeeds = CantoFeeds()
85
86 # Lock helpers
87
88 def wlock_all():
89     feed_lock.acquire_write()
90     for feed in sorted(allfeeds.feeds.keys()):
91         allfeeds.feeds[feed].lock.acquire_write()
92
93 def wunlock_all():
94     for feed in sorted(allfeeds.feeds.keys()):
95         allfeeds.feeds[feed].lock.release_write()
96     feed_lock.release_write()
97
98 def wlock_feeds(fn):
99     def _fl(*args):
100         wlock_all()
101         try:
102             return fn(*args)
103         finally:
104             wunlock_all()
105     return _fl
106
107 def rlock_all():
108     feed_lock.acquire_read()
109     for feed in sorted(allfeeds.feeds.keys()):
110         allfeeds.feeds[feed].lock.acquire_read()
111
112 def runlock_all():
113     for feed in sorted(allfeeds.feeds.keys()):
114         allfeeds.feeds[feed].lock.release_read()
115     feed_lock.release_read()
116
117 def rlock_feeds(fn):
118     def _fl(*args):
119         rlock_all()
120         try:
121             return fn(*args)
122         finally:
123             runlock_all()
124     return _fl
125
126 # feed_objs to enforce
127 def rlock_feed_objs(objs):
128     feed_lock.acquire_read()
129     for feed in sorted(allfeeds.feeds.keys()):
130         for obj in objs:
131             if obj.URL == feed:
132                 obj.lock.acquire_read()
133                 break
134
135 def runlock_feed_objs(objs):
136     for feed in sorted(allfeeds.feeds.keys()):
137         for obj in objs:
138             if obj.URL == feed:
139                 obj.lock.release_read()
140                 break
141     feed_lock.release_read()
142
143 def stop_feeds():
144     for feed in allfeeds.feeds:
145         allfeeds.feeds[feed].stopped = True
146
147 class DaemonFeedPlugin(Plugin):
148     pass
149
150 class CantoFeed(PluginHandler):
151     def __init__(self, shelf, name, URL, rate, keep_time, keep_unread, **kwargs):
152         PluginHandler.__init__(self)
153
154         self.plugin_class = DaemonFeedPlugin
155         self.update_plugin_lookups()
156
157         self.shelf = shelf
158         self.name = name
159         self.URL = URL
160         self.rate = rate
161         self.keep_time = keep_time
162         self.keep_unread = keep_unread
163         self.stopped = False
164
165         self.last_update = 0
166
167         # This is held by the update thread, as well as any get / set attribute
168         # threads
169
170         self.lock = RWLock()
171
172         self.username = None
173         if "username" in kwargs:
174             self.username = kwargs["username"]
175
176         self.password = None
177         if "password" in kwargs:
178             self.password = kwargs["password"]
179
180         allfeeds.add_feed(URL, self)
181
182     def __str__(self):
183         return "CantoFeed: %s" % self.name
184
185     # Return { id : { attribute : value .. } .. }
186
187     def get_attributes(self, items, attributes):
188         r = {}
189
190         d = self.shelf[self.URL]
191
192         args = [ (dict_id(item)["ID"], item, attributes[item]) for item in items ]
193         args.sort()
194
195         got = [ (item["id"], item) for item in d["entries"] ]
196         got.sort()
197
198         for item, full_id, needed_attrs in args:
199             while got and item > got[0][0]:
200                 got.pop(0)
201
202             if got and got[0][0] == item:
203                 attrs = {}
204                 for a in needed_attrs:
205                     if a == "description":
206                         real = "summary"
207                     else:
208                         real = a
209
210                     if real in got[0][1]:
211                         attrs[a] = got[0][1][real]
212                     else:
213                         attrs[a] = ""
214                 r[full_id] = attrs
215                 got.pop(0)
216             else:
217                 log.warn("item not found: %s" % item)
218                 r[full_id] = {}
219                 for a in needed_attrs:
220                     r[full_id][a] = ""
221                 r[full_id]["title"] = "???"
222         return r
223
224     # Given an ID and a dict of attributes, update the disk.
225     def set_attributes(self, items, attributes):
226
227         self.lock.acquire_write()
228
229         d = self.shelf[self.URL]
230
231         items_to_remove = []
232         tags_to_add = []
233
234         for item in items:
235             d_id = dict_id(item)["ID"]
236
237             for d_item in d["entries"]:
238                 if d_id != d_item["id"]:
239                     continue
240                 for a in attributes[item]:
241                     d_item[a] = attributes[item][a]
242
243                 items_to_remove.append(d_item)
244                 tags_to_add += self._tag([d_item])
245
246         self.shelf[self.URL] = d
247         self.shelf.update_umod()
248
249         self.lock.release_write()
250
251         self._retag(items_to_remove, tags_to_add, [])
252
253     def _item_id(self, item):
254         return json.dumps({ "URL" : self.URL, "ID" : item["id"] })
255
256     def _tag(self, items):
257         tags_to_add = []
258
259         for item in items:
260             tags_to_add.append((item, "maintag:" + self.name))
261             if "canto-tags" in item:
262                 for user_tag in item["canto-tags"]:
263                     log.debug("index adding user tag: %s - %s", user_tag,item["id"])
264                     tags_to_add.append((item, user_tag))
265
266         return tags_to_add
267
268     def _retag(self, items_to_remove, tags_to_add, tags_to_remove):
269         feed_lock.acquire_read()
270         tag_lock.acquire_write()
271
272         for item in items_to_remove:
273             alltags.remove_id(self._item_id(item))
274
275         for item, tag in tags_to_add:
276             alltags.add_tag(self._item_id(item), tag)
277
278         for item, tag in tags_to_remove:
279             alltags.remove_tag(self._item_id(item), tag)
280
281         alltags.do_tag_changes()
282
283         tag_lock.release_write()
284         feed_lock.release_read()
285
286     def _keep_olditem(self, olditem):
287         ref_time = time.time()
288
289         if "canto_update" not in olditem:
290             olditem["canto_update"] = ref_time
291
292         item_time = olditem["canto_update"]
293
294         if "canto-state" in olditem:
295             item_state = olditem["canto-state"]
296         else:
297             item_state = []
298
299         if (ref_time - item_time) < self.keep_time:
300             log.debug("Item not over keep_time (%d): %s", 
301                     self.keep_time, olditem["id"])
302         elif self.keep_unread and "read" not in item_state:
303             log.debug("Keeping unread item: %s\n", olditem["id"])
304         else:
305             log.debug("Discarding: %s", olditem["id"])
306             return False
307         return True
308
309     # Re-index contents
310     # If we have update_contents, use that
311     # If not, at least populate self.items from disk.
312
313     # MUST GUARANTEE self.items is in same order as entries on disk.
314
315     def index(self, update_contents):
316
317         # If the daemon is shutting down, discard this update.
318
319         if self.stopped:
320             return
321
322         self.lock.acquire_write()
323
324         if self.URL not in self.shelf:
325             # Stub empty feed
326             log.debug("Previous content not found for %s.", self.URL)
327             old_contents = {"entries" : []}
328         else:
329             old_contents = self.shelf[self.URL]
330             log.debug("Fetched previous content for %s.", self.URL)
331
332         new_entries = []
333
334         for i, item in enumerate(update_contents["entries"]):
335
336             # Update canto_update only for freshly seen items.
337             item["canto_update"] = update_contents["canto_update"]
338
339             # Attempt to isolate a feed unique ID
340             if "id" not in item:
341                 if "link" in item:
342                     item["id"] = item["link"]
343                 elif "title" in item:
344                     item["id"] = item["title"]
345                 else:
346                     log.error("Unable to uniquely ID item: %s" % item)
347                     continue
348
349             new_entries.append((i, item["id"], item))
350
351         # Sort by string id
352         new_entries.sort(key=lambda x: x[1])
353
354         # Remove duplicates
355         last_id = ""
356         for x in new_entries[:]:
357             if x[1] == last_id:
358                 new_entries.remove(x)
359             else:
360                 last_id = x[1]
361
362         old_entries = [ (i, item["id"], item) for (i, item) in enumerate(old_contents["entries"])]
363
364         old_entries.sort(key=lambda x: x[1])
365
366         keep_all = new_entries == []
367
368         kept_entries = []
369
370         for x in new_entries:
371
372             # old_entry is really old, see if we should keep or discard
373
374             while old_entries and x[1] > old_entries[0][1]:
375                 if keep_all or self._keep_olditem(old_entries[0][2]):
376                     kept_entries.append(old_entries.pop(0))
377                 else:
378                     old_entries.pop(0)
379
380             # new entry and old entry match, move content over
381
382             if old_entries and x[1] == old_entries[0][1]:
383                 olditem = old_entries.pop(0)[2]
384                 for key in olditem:
385                     if key == "canto_update":
386                         continue
387                     elif key.startswith("canto"):
388                         x[2][key] = olditem[key]
389
390             # new entry is really new, tell everyone
391
392             else:
393                 call_hook("daemon_new_item", [self, x[2]])
394
395         # Resort lists by place, instead of string id
396         new_entries.sort()
397         old_entries.sort()
398
399         if keep_all:
400             kept_entries += old_entries
401         else:
402             for x in old_entries:
403                 if self._keep_olditem(x[2]):
404                     kept_entries.append(x)
405
406         kept_entries.sort()
407         new_entries += kept_entries
408
409         update_contents["entries"] = [ x[2] for x in new_entries ]
410
411         tags_to_add = self._tag(update_contents["entries"])
412         tags_to_remove = []
413         remove_items = []
414
415         # Allow plugins to add items prior to running the editing functions
416         # so that the editing functions are guaranteed the full list.
417
418         for attr in list(self.plugin_attrs.keys()):
419             if not attr.startswith("additems_"):
420                 continue
421
422             try:
423                 a = getattr(self, attr)
424                 a(feed = self, newcontent = update_contents,
425                         tags_to_add = tags_to_add,
426                         tags_to_remove = tags_to_remove,
427                         remove_items = remove_items)
428             except:
429                 log.error("Error running feed item adding plugin")
430                 log.error(traceback.format_exc())
431
432         # Allow plugins DaemonFeedPlugins defining edit_* functions to have a
433         # crack at the contents before we commit to disk.
434
435         for attr in list(self.plugin_attrs.keys()):
436             if not attr.startswith("edit_"):
437                 continue
438
439             try:
440                 a = getattr(self, attr)
441                 a(feed = self, newcontent = update_contents,
442                         tags_to_add = tags_to_add,
443                         tags_to_remove = tags_to_remove,
444                         remove_items = remove_items)
445             except:
446                 log.error("Error running feed editing plugin")
447                 log.error(traceback.format_exc())
448
449         if not self.stopped:
450             # Commit the updates to disk.
451
452             self.shelf[self.URL] = update_contents
453
454             self.lock.release_write()
455
456             self._retag(old_contents["entries"] + remove_items, tags_to_add, tags_to_remove)
457         else:
458             self.lock.release_write()
459
460     def destroy(self):
461         # Check for existence in case of delete quickly
462         # after add.
463
464         self.stopped = True
465         if self.URL in self.shelf:
466             del self.shelf[self.URL]