Simplify tag_updater reset/update
[canto-curses.git] / canto_curses / tagcore.py
1 # -*- coding: utf-8 -*-
2 #Canto-curses - ncurses RSS reader
3 #   Copyright (C) 2014 Jack Miller <jack@codezen.org>
4 #
5 #   This program is free software; you can redistribute it and/or modify
6 #   it under the terms of the GNU General Public License version 2 as 
7 #   published by the Free Software Foundation.
8
9 from canto_next.rwlock import RWLock
10 from canto_next.hooks import call_hook, on_hook
11
12 from .subthread import SubThread
13 from .locks import config_lock
14 from .config import config, story_needed_attrs
15
16 import traceback
17 import logging
18
19 log = logging.getLogger("TAGCORE")
20
21 alltagcores = []
22
23 class TagCore(list):
24     def __init__(self, tag):
25         list.__init__(self)
26         self.tag = tag
27
28         self.changes = False
29         self.was_reset = False
30
31         self.lock = RWLock("lock: %s" % tag)
32         alltagcores.append(self)
33
34     # change functions must be called holding lock
35
36     def ack_changes(self):
37         self.changes = False
38
39     def changed(self):
40         self.changes = True
41
42     def add_items(self, ids):
43         self.lock.acquire_write()
44
45         added = []
46         for id in ids:
47             self.append(id)
48             added.append(id)
49
50         call_hook("curses_items_added", [ self, added ] )
51
52         self.changed()
53         self.lock.release_write()
54
55     def remove_items(self, ids):
56         self.lock.acquire_write()
57
58         removed = []
59
60         # Copy self so we can remove from self
61         # without screwing up iteration.
62
63         for idx, id in enumerate(self[:]):
64             if id in ids:
65                 log.debug("removing: %s", id)
66
67                 list.remove(self, id)
68                 removed.append(id)
69
70         call_hook("curses_items_removed", [ self, removed ] )
71
72         self.changed()
73         self.lock.release_write()
74
75     # Remove all stories from this tag.
76
77     def reset(self):
78
79         # Tag should be sorted on sync if we were reset, regardless of whether
80         # a sync was done when the tag was empty, so keep track of this and
81         # the Tag object will clear it on sync.
82
83         self.was_reset = True
84
85         self.lock.acquire_write()
86
87         if len(self):
88             call_hook("curses_items_removed", [ self, self[:] ])
89         del self[:]
90
91         self.changed()
92         self.lock.release_write()
93
94 class TagUpdater(SubThread):
95     def init(self, backend):
96         SubThread.init(self, backend)
97
98         self.item_tag = None
99         self.item_buf = []
100         self.item_removes = []
101         self.item_adds = []
102
103         self.attributes = {}
104         self.lock = RWLock("tagupdater")
105
106         # Response counters
107         self.still_updating = 0
108
109         self.start_pthread()
110
111         # Setup automatic attributes.
112
113         # We know we're going to want at least these attributes for
114         # all stories, as they're part of the fallback format string.
115
116         self.needed_attrs = [ "title", "canto-state", "canto-tags", "link", "enclosures" ]
117
118         tsa = config.get_opt("taglist.search_attributes")
119
120         for attrlist in [ story_needed_attrs, tsa ]:
121             for sa in attrlist:
122                 if sa not in self.needed_attrs:
123                     self.needed_attrs.append(sa)
124
125         self.write("AUTOATTR", self.needed_attrs)
126
127         # Lock config_lock so that strtags doesn't change and we miss
128         # tags.
129
130         config_lock.acquire_read()
131
132         strtags = config.get_var("strtags")
133
134         # Request initial information, instantiate TagCores()
135
136         self.write("WATCHTAGS", strtags)
137         for tag in strtags:
138             self.on_new_tag(tag)
139
140         on_hook("curses_new_tag", self.on_new_tag)
141         on_hook("curses_del_tag", self.on_del_tag)
142         on_hook("curses_stories_removed", self.on_stories_removed)
143         on_hook("curses_def_opt_change", self.on_def_opt_change)
144
145         config_lock.release_read()
146
147     def on_new_tag(self, tag):
148         self.write("WATCHTAGS", [ tag ])
149         self.prot_tagchange(tag)
150         call_hook("curses_new_tagcore", [ TagCore(tag) ])
151
152     def on_del_tag(self, tag):
153         for tagcore in alltagcores:
154             if tagcore.tag == tag:
155                 tagcore.reset()
156                 call_hook("curses_del_tagcore", [ tagcore ])
157                 return
158
159     # Once they've been removed from the GUI, their attributes can be forgotten
160     def on_stories_removed(self, tag, items):
161         tagcore = None
162         for tc in alltagcores:
163             if tc.tag == tag.tag:
164                 tagcore = tc
165                 break
166         else:
167             log.warn("Couldn't find tagcore for removed story tag %s" % tag.tag)
168
169         self.lock.acquire_write()
170         for item in items:
171             if tagcore and item.id in tc:
172                 log.debug("%s still in tagcore, not removing", item.id)
173                 continue
174             if item.id in self.attributes:
175                 del self.attributes[item.id]
176         self.lock.release_write()
177
178     # Changes to global filters should force a full refresh.
179
180     def on_def_opt_change(self, defaults):
181         if 'global_transform' in defaults:
182             log.debug("global_transform changed, forcing reset + update")
183             self.reset(True)
184             self.update()
185
186     def prot_attributes(self, d):
187         if self.still_updating > 1:
188             return
189
190         # Update attributes, and then notify everyone to grab new content.
191         self.lock.acquire_write()
192
193         for key in d.keys():
194             if key in self.attributes:
195
196                 # If we're updating, we want to create a whole new dict object
197                 # so that our stories dicts don't get updated without a sync
198
199                 cp = self.attributes[key].copy()
200                 cp.update(d[key])
201                 self.attributes[key] = cp
202             else:
203                 self.attributes[key] = d[key]
204         self.lock.release_write()
205
206         call_hook("curses_attributes", [ self.attributes ])
207
208     def prot_items(self, updates):
209         if self.still_updating > 1:
210             return
211
212         # Daemon should now only return with one tag in an items response
213
214         tag = list(updates.keys())[0]
215
216         if self.item_tag == None or self.item_tag.tag != tag:
217             self.item_tag = None
218             self.item_buf = []
219             self.item_removes = []
220             self.item_adds = []
221             for have_tag in alltagcores:
222                 if have_tag.tag == tag:
223                     self.item_tag = have_tag
224                     break
225
226             # Shouldn't happen
227             else:
228                 return
229
230         self.item_buf.extend(updates[tag])
231
232         # Add new items.
233         for id in updates[tag]:
234             if id not in self.item_tag:
235                 self.item_adds.append(id)
236
237     def prot_itemsdone(self, empty):
238         if self.item_tag == None:
239             return
240
241         if self.still_updating > 1:
242             self.item_tag = None
243             self.item_buf = []
244             self.item_removes = []
245             self.item_adds = []
246             return
247
248         if self.item_adds:
249             self.item_tag.add_items(self.item_adds)
250
251         # Eliminate discarded items. This has to be done here, so we have
252         # access to all of the items given in the multiple ITEM responses.
253
254         for id in self.item_tag:
255             if id not in self.item_buf:
256                 self.item_removes.append(id)
257
258         if self.item_removes:
259             self.item_tag.remove_items(self.item_removes)
260
261         self.item_tag = None
262         self.item_buf = []
263         self.item_removes = []
264         self.item_adds = []
265
266     def prot_tagchange(self, tag):
267         self.write("ITEMS", [ tag ])
268
269     def prot_pong(self, args):
270         if self.still_updating:
271             self.still_updating -= 1
272             if not self.still_updating:
273                 log.debug("Calling curses_update_complete")
274                 call_hook("curses_update_complete", [])
275
276     # The following is the external interface to tagupdater.
277
278     def update(self):
279         strtags = config.get_var("strtags")
280         for tag in strtags:
281             self.write("ITEMS", [ tag ])
282         self.write("PING", [])
283         self.still_updating += 1
284
285     def reset(self, force=False):
286         if self.still_updating and not force:
287             log.debug("Not initiating refresh, update still in progress")
288             return False
289
290         for tag in alltagcores:
291             tag.reset()
292         return True
293
294     def transform(self, name, transform):
295         self.write("TRANSFORM", { name : transform })
296         self.reset(True)
297
298     # Writes are already serialized, so in the meantime, we protect
299     # self.attributes and self.needed_attrs with our lock.
300
301     def get_attributes(self, id):
302         r = {}
303         self.lock.acquire_read()
304         if id in self.attributes:
305             r = self.attributes[id]
306         self.lock.release_read()
307         return r
308
309     # This takes a fat argument because callers need to be able to curry
310     # together multiple sets so stuff like 'item-state read *' don't generate
311     # thousands of SETATTRIBUTES calls and take forever
312
313     def set_attributes(self, arg):
314         self.lock.acquire_write()
315         self.write("SETATTRIBUTES", arg)
316         self.lock.release_write()
317
318     def request_attributes(self, id, attrs):
319         self.write("ATTRIBUTES", { id : attrs })
320
321     def need_attributes(self, id, attrs):
322         self.lock.acquire_write()
323
324         needed = self.needed_attrs[:]
325         updated = False
326
327         for attr in attrs:
328             if attr not in needed:
329                 needed.append(attr)
330                 updated = True
331
332         if updated:
333             self.needed_attrs = needed
334             self.write("AUTOATTR", self.needed_attrs)
335
336         self.lock.release_write()
337
338         # Even if we didn't update this time, make sure we attempt to get this
339         # id's new needed attributes.
340
341         self.write("ATTRIBUTES", { id : needed })
342
343 tag_updater = TagUpdater()