Rework tag_updater -> tagcore interaction
authorJack Miller <jack@codezen.org>
Mon, 13 Jul 2015 17:11:08 +0000 (12:11 -0500)
committerJack Miller <jack@codezen.org>
Wed, 15 Jul 2015 02:34:19 +0000 (21:34 -0500)
- ITEMS is now guaranteed to be a single response, so no need to buffer
  content waiting on ITEMSDONE (which is now issued, but ignored).

- The function uses a similar sorted-compare to sync() and the daemon's
  index(), which should make integrating new items take less time and
produce new/removed item hook calls directly.

- The Tagcore object itself has been stripped down.

- Tag_updater "reset" is now basically a no-op, it marks tags as reset,
  but doesn't actually clear their items. Then, "reset" tags are
automatically synchronized and re-sorted the next time items come in.
This saves quite a bit of time when refreshing.

- Tag_updater "update" now works in tandem with reset code, and merely
  sends out ITEMS requests directly.

canto_curses/gui.py
canto_curses/tag.py
canto_curses/tagcore.py
canto_curses/taglist.py
tests/test-tagcore-function.py

index 94eed05..9e6d97c 100644 (file)
@@ -268,9 +268,10 @@ class CantoCursesGui(CommandHandler):
             if self.sync_requested:
                 self.tags_to_sync = alltags[:]
                 self.sync_requested = False
-            elif not self.tags_to_sync:
+            else:
                 for tag in alltags:
-                    if (len(tag) == 0 and len(tag.tagcore) != 0):
+                    if (tag not in self.tags_to_sync) and (tag.tagcore.was_reset or\
+                            (len(tag) == 0 and len(tag.tagcore) != 0)):
                         self.tags_to_sync.append(tag)
 
             if self.tags_to_sync:
@@ -278,7 +279,6 @@ class CantoCursesGui(CommandHandler):
                 self.tags_to_sync = self.tags_to_sync[1:]
                 partial_sync = True
 
-
             needs_resize = self.callbacks["get_var"]("needs_resize") or self.winched
             needs_refresh = self.callbacks["get_var"]("needs_refresh")
             needs_redraw = self.callbacks["get_var"]("needs_redraw")
index 186e05b..fa998b8 100644 (file)
@@ -359,7 +359,7 @@ class Tag(PluginHandler, list):
             sorted_ids = [ (x, x.id, i) for (i, x) in enumerate(self) ]
             sorted_ids.sort(key=lambda x: x[1])
 
-            tagcore_sorted_ids = [ x for x in enumerate(self.tagcore[:]) ]
+            tagcore_sorted_ids = list(enumerate(self.tagcore))
             tagcore_sorted_ids.sort(key=lambda x: x[1])
 
             new_ids = []
@@ -371,7 +371,7 @@ class Tag(PluginHandler, list):
                     new_ids.append(tagcore_sorted_ids.pop(0))
 
                 if not tagcore_sorted_ids or s_id < tagcore_sorted_ids[0][1]:
-                    if s_id == sel.id:
+                    if sel and (not sel.is_tag) and (s_id == sel.id):
 
                         # If we preserve the selection in an "undead" state, then
                         # we keep set tagcore changed so that the next sync operation
index 7ad5aaa..2687ff5 100644 (file)
@@ -39,73 +39,24 @@ class TagCore(list):
     def changed(self):
         self.changes = True
 
-    def add_items(self, ids):
+    def set_items(self, ids):
         self.lock.acquire_write()
 
-        added = []
-        for id in ids:
-            self.append(id)
-            added.append(id)
-
-        call_hook("curses_items_added", [ self, added ] )
-
-        self.changed()
-        self.lock.release_write()
-
-    def remove_items(self, ids):
-        self.lock.acquire_write()
-
-        removed = []
-
-        # Copy self so we can remove from self
-        # without screwing up iteration.
-
-        for idx, id in enumerate(self[:]):
-            if id in ids:
-                log.debug("removing: %s", id)
-
-                list.remove(self, id)
-                removed.append(id)
-
-        call_hook("curses_items_removed", [ self, removed ] )
-
-        self.changed()
-        self.lock.release_write()
-
-    # Remove all stories from this tag.
-
-    def reset(self):
-
-        # Tag should be sorted on sync if we were reset, regardless of whether
-        # a sync was done when the tag was empty, so keep track of this and
-        # the Tag object will clear it on sync.
-
-        self.was_reset = True
-
-        self.lock.acquire_write()
-
-        if len(self):
-            call_hook("curses_items_removed", [ self, self[:] ])
         del self[:]
-
+        self.extend(ids)
         self.changed()
+
         self.lock.release_write()
 
 class TagUpdater(SubThread):
     def init(self, backend):
         SubThread.init(self, backend)
 
-        self.item_tag = None
-        self.item_buf = []
-        self.item_removes = []
-        self.item_adds = []
+        self.updating = []
 
         self.attributes = {}
         self.lock = RWLock("tagupdater")
 
-        # Response counters
-        self.still_updating = 0
-
         self.start_pthread()
 
         # Setup automatic attributes.
@@ -150,10 +101,15 @@ class TagUpdater(SubThread):
         call_hook("curses_new_tagcore", [ TagCore(tag) ])
 
     def on_del_tag(self, tag):
-        for tagcore in alltagcores:
+        for tagcore in alltagcores[:]:
             if tagcore.tag == tag:
-                tagcore.reset()
+                if len(tagcore):
+                    call_hook("curses_items_removed", [ tagcore, tagcore ] )
+                    tagcore.set_items([])
                 call_hook("curses_del_tagcore", [ tagcore ])
+                alltagcores.remove(tagcore)
+                while tagcore in self.updating:
+                    self.updating.remove(tagcore)
                 return
 
     # Once they've been removed from the GUI, their attributes can be forgotten
@@ -180,13 +136,9 @@ class TagUpdater(SubThread):
     def on_def_opt_change(self, defaults):
         if 'global_transform' in defaults:
             log.debug("global_transform changed, forcing reset + update")
-            self.reset(True)
             self.update()
 
     def prot_attributes(self, d):
-        if self.still_updating > 1:
-            return
-
         # Update attributes, and then notify everyone to grab new content.
         self.lock.acquire_write()
 
@@ -206,94 +158,77 @@ class TagUpdater(SubThread):
         call_hook("curses_attributes", [ self.attributes ])
 
     def prot_items(self, updates):
-        if self.still_updating > 1:
-            return
-
         # Daemon should now only return with one tag in an items response
 
         tag = list(updates.keys())[0]
 
-        if self.item_tag == None or self.item_tag.tag != tag:
-            self.item_tag = None
-            self.item_buf = []
-            self.item_removes = []
-            self.item_adds = []
-            for have_tag in alltagcores:
-                if have_tag.tag == tag:
-                    self.item_tag = have_tag
-                    break
-
-            # Shouldn't happen
-            else:
-                return
+        for have_tag in alltagcores:
+            if have_tag.tag == tag:
+                break
+        else:
+            return
 
-        self.item_buf.extend(updates[tag])
+        sorted_updated_ids = list(enumerate(updates[tag]))
+        sorted_updated_ids.sort(key=lambda x : x[1])
 
-        # Add new items.
-        for id in updates[tag]:
-            if id not in self.item_tag:
-                self.item_adds.append(id)
+        sorted_current_ids = list(enumerate(have_tag))
+        sorted_updated_ids.sort(key=lambda x : x[1])
 
-    def prot_itemsdone(self, empty):
-        if self.item_tag == None:
-            return
+        new_ids = []
+        cur_ids = []
+        old_ids = []
 
-        if self.still_updating > 1:
-            self.item_tag = None
-            self.item_buf = []
-            self.item_removes = []
-            self.item_adds = []
-            return
+        for c_place, c_id in sorted_current_ids:
+            while sorted_updated_ids and c_id > sorted_updated_ids[0][1]:
+                new_ids.append(sorted_updated_ids.pop(0))
 
-        if self.item_adds:
-            self.item_tag.add_items(self.item_adds)
+            if not sorted_updated_ids or c_id < sorted_updated_ids[0][1]:
+                old_ids.append(c_id)
+            else:
+                place = sorted_updated_ids.pop(0)[0]
+                cur_ids.append((place, c_id))
 
-        # Eliminate discarded items. This has to be done here, so we have
-        # access to all of the items given in the multiple ITEM responses.
+        new_ids += sorted_updated_ids
 
-        for id in self.item_tag:
-            if id not in self.item_buf:
-                self.item_removes.append(id)
+        all_ids = new_ids + cur_ids
+        all_ids.sort()
 
-        if self.item_removes:
-            self.item_tag.remove_items(self.item_removes)
+        have_tag.set_items([x[1] for x in all_ids])
 
-        self.item_tag = None
-        self.item_buf = []
-        self.item_removes = []
-        self.item_adds = []
+        if new_ids:
+            call_hook("curses_items_added", [ have_tag, [x[1] for x in new_ids] ] )
 
-    def prot_tagchange(self, tag):
-        self.write("ITEMS", [ tag ])
+        if old_ids:
+            call_hook("curses_items_removed", [ have_tag, old_ids ] )
 
-    def prot_pong(self, args):
-        if self.still_updating:
-            self.still_updating -= 1
-            if not self.still_updating:
-                log.debug("Calling curses_update_complete")
+        if have_tag in self.updating:
+            have_tag.was_reset = True
+            call_hook("curses_tag_updated", [ have_tag ])
+            self.updating.remove(have_tag)
+            if self.updating == []:
                 call_hook("curses_update_complete", [])
 
+    def prot_itemsdone(self, tag):
+        pass
+
+    def prot_tagchange(self, tag):
+        self.write("ITEMS", [ tag ])
+
     # The following is the external interface to tagupdater.
 
     def update(self):
+        self.reset()
         strtags = config.get_var("strtags")
         for tag in strtags:
             self.write("ITEMS", [ tag ])
-        self.write("PING", [])
-        self.still_updating += 1
 
-    def reset(self, force=False):
-        if self.still_updating and not force:
-            log.debug("Not initiating refresh, update still in progress")
-            return False
-
-        for tag in alltagcores:
-            tag.reset()
+    def reset(self):
+        self.updating += alltagcores
         return True
 
     def transform(self, name, transform):
         self.write("TRANSFORM", { name : transform })
-        self.reset(True)
+        self.reset()
 
     # Writes are already serialized, so in the meantime, we protect
     # self.attributes and self.needed_attrs with our lock.
index 1c5d37d..99a0584 100644 (file)
@@ -58,11 +58,12 @@ class TagList(GuiBase):
 
         on_hook("curses_eval_tags_changed", self.on_eval_tags_changed, self)
         on_hook("curses_items_added", self.on_items_added, self)
+        on_hook("curses_items_removed", self.on_items_removed, self)
+        on_hook("curses_tag_updated", self.on_tag_updated, self)
         on_hook("curses_stories_added", self.on_stories_added, self)
         on_hook("curses_stories_removed", self.on_stories_removed, self)
         on_hook("curses_opt_change", self.on_opt_change, self)
         on_hook("curses_new_tagcore", self.on_new_tagcore, self)
-        on_hook("curses_update_complete", self.on_update_complete, self)
         on_hook("curses_del_tagcore", self.on_del_tagcore, self)
 
         args = {
@@ -290,11 +291,13 @@ class TagList(GuiBase):
     def on_items_added(self, tagcore, items):
         self.callbacks["release_gui"]()
 
-    def on_eval_tags_changed(self):
-        self.callbacks["force_sync"]()
+    def on_items_removed(self, tagcore, items):
         self.callbacks["release_gui"]()
 
-    def on_update_complete(self):
+    def on_tag_updated(self, tagcore):
+        self.callbacks["release_gui"]()
+
+    def on_eval_tags_changed(self):
         self.callbacks["force_sync"]()
         self.callbacks["release_gui"]()
 
index 9df3908..62af727 100755 (executable)
@@ -15,6 +15,7 @@ NEW_TC = 4
 DEL_TC = 8
 ATTRIBUTES = 16
 UPDATE_COMPLETE = 32
+TAG_UPDATED = 64
 
 class FakeTag(object):
     def __init__(self, tag):
@@ -34,17 +35,18 @@ class TestTagCoreFunction(Test):
         self.oia_tcids = None
         self.new_tc = None
         self.del_tc = None
+        self.otu_tag = None
         self.attributes = None
 
     def on_items_removed(self, tagcore, removed):
         self.flags |= ITEMS_REMOVED
         self.oir_tctag = tagcore.tag
-        self.oir_tcids = removed
+        self.oir_tcids = removed[:]
 
     def on_items_added(self, tagcore, added):
         self.flags |= ITEMS_ADDED
         self.oia_tctag = tagcore.tag
-        self.oia_tcids = added
+        self.oia_tcids = added[:]
 
     def on_new_tagcore(self, tagcore):
         self.flags |= NEW_TC
@@ -61,6 +63,10 @@ class TestTagCoreFunction(Test):
     def on_update_complete(self):
         self.flags |= UPDATE_COMPLETE
 
+    def on_tag_updated(self, tag):
+        self.flags |= TAG_UPDATED
+        self.otu_tag = tag.tag
+
     def check(self):
         config_script = {
             'VERSION' : { '*' : [('VERSION', CANTO_PROTOCOL_COMPATIBLE)] },
@@ -84,6 +90,7 @@ class TestTagCoreFunction(Test):
         on_hook("curses_del_tagcore", self.on_del_tagcore)
         on_hook("curses_attributes", self.on_attributes)
         on_hook("curses_update_complete", self.on_update_complete)
+        on_hook("curses_tag_updated", self.on_tag_updated)
 
         # 1. Previously existing tags in config should be populated on init
 
@@ -213,73 +220,21 @@ class TestTagCoreFunction(Test):
         self.compare_var("oir_tctag", "maintag:Slashdot")
         self.compare_var("oir_tcids", [ "id1" ])
 
-        # 12. Reset should empty all tagcores and ignore all traffic
-        # until it receives a PONG for every reset() PING
+        # 12. Update should cause all tags to generate a tag_update
+        # hook call on ITEMS, and update_complete when all done.
 
         self.reset_flags()
 
-        tag_updater.reset()
-
-        for tc in alltagcores:
-            if len(tc) > 0:
-                raise Exception("TC %s not empty!" % tc.tag)
-
-        tag_backend.inject("ITEMS", { "maintag:Test1" : [ "id3", "id4" ] })
-        tag_backend.inject("ITEMSDONE", {})
-        tag_backend.inject("ATTRIBUTES", { "id3" : { "test" : "test" }, "id4" : { "test" : "test" }})
-
-        self.compare_flags(0)
-        if "id3" in tag_updater.attributes:
-            raise Exception("Shouldn't have gotten id3!")
-        if "id4" in tag_updater.attributes:
-            raise Exception("Shouldn't have gotten id4!")
-
-        tag_updater.reset()
-        
-        tag_backend.inject("PONG", {})
-
-        tag_backend.inject("ITEMS", { "maintag:Test1" : [ "id3", "id4" ] })
-        tag_backend.inject("ITEMSDONE", {})
-        tag_backend.inject("ATTRIBUTES", { "id3" : { "test" : "test" }, "id4" : { "test" : "test" }})
-
-        self.compare_flags(0)
-        if "id3" in tag_updater.attributes:
-            raise Exception("Shouldn't have gotten id3!")
-        if "id4" in tag_updater.attributes:
-            raise Exception("Shouldn't have gotten id4!")
-
-        tag_backend.inject("PONG", {})
-        tag_backend.inject("ITEMS", { "maintag:Test1" : [ "id3", "id4" ] })
-        tag_backend.inject("ITEMSDONE", {})
-        tag_backend.inject("ATTRIBUTES", { "id3" : { "test" : "test" }, "id4" : { "test" : "test" }})
-
-        self.compare_flags(ITEMS_ADDED | ATTRIBUTES)
-
-        if "id3" not in tag_updater.attributes:
-            raise Exception("Should have gotten id3!")
-        if "id4" not in tag_updater.attributes:
-            raise Exception("Should have gotten id4!")
-
-        # 13. Non-force reset not allowed during update
-
         tag_updater.update()
 
-        if tag_updater.reset() != False:
-            raise Exception("Should have rejected reset()")
-
-        # 14. Update complete should trigger on receiving items from update
-        # and a subsequent reset() should work
-
-        self.reset_flags()
         tag_backend.inject("ITEMS", { "maintag:Test1" : [ "id3", "id4" ] })
         tag_backend.inject("ITEMSDONE", {})
+        tag_backend.inject("ATTRIBUTES", { "id3" : { "test" : "test" }, "id4" : { "test" : "test" }})
 
-        self.compare_flags(UPDATE_COMPLETE)
-
-        if tag_updater.reset() != True:
-            raise Exception("Shouldn't have rejected reset()!")
+        print(tag_updater.updating)
+        self.compare_flags(TAG_UPDATED | UPDATE_COMPLETE | ITEMS_ADDED | ATTRIBUTES)
+        self.compare_var("otu_tag", "maintag:Test1")
 
-        tag_backend.inject("PONG", {})
 
         return True