Protection is eliminated.
authorJack Miller <jack@codezen.org>
Tue, 9 Sep 2014 22:09:31 +0000 (17:09 -0500)
committerJack Miller <jack@codezen.org>
Thu, 11 Sep 2014 00:56:30 +0000 (19:56 -0500)
The idea behind the protection system was that items that were sitting
in the client, but protected, would never have requests for information
fail. A noble goal. However, I've decided that it's just not worth the
effort. Here's why:

- 99.99% of the time items are protected without need. Items are already
  kept for 24 hours *after* they disappear from the source feed. So,
assuming that a client gets the item immediately before it disappears,
it still has to sit there for a full day before protection makes one bit
of difference.

- Keeping in mind that's it's extremely rare that protection would
  matter, I can't justify all of this:

    - Having logic to keep dead feeds around when they have protected
      items

    - Having to hold a lock in feed.index to check protections

    - Having to automatically protect every item sent to a client to
      keep the whole thing from being racy.

    - Forcing the sync plugin to try and reconcile protected old items
      that no longer exist in new content.

- In addition, protection doesn't mean that attribute requests will
  never fail because attribute requests can always fail. Items might not
even have a "title" entry for Chrissakes, so it's not really saving any
complexity on the client side

- Most importantly for this argument, if you really want to make sure
  you never lose an item there's keep_unread which will keep every
single item that you haven't explicitly marked read (and works naturally
without any of the bullshit from point 2).

Filter-immune protection was eliminated as well, which caused an API
change to transforms. This system was inherently racy with
multi-threading anyway (i.e. items received before filter-immunity set)

I came up with a few solutions that kept this system alive, like keeping
protected items in memory instead of on disk (which would avoid the
technical shortcomings) but when every item in a client has to be
protected by default to keep the system from being racy the memory cost
was too great just for the 00.01% case.

canto_next/canto_backend.py
canto_next/feed.py
canto_next/locks.py
canto_next/protect.py [deleted file]
canto_next/transform.py
plugins/reddit.py

index 130bb15..2a7250e 100644 (file)
@@ -8,16 +8,10 @@
 
 # This Backend class is the core of the daemon's specific protocol.
 
-# PROTOCOL_VERSION History:
-# 0.1 - Initial versioned commit.
-# 0.2 - Modified tags to escape the : separator such that tags handed out are
-#       immediaely read to be used as [ Tag -whatever- ] config headers.
-
-CANTO_PROTOCOL_VERSION = 0.4
+CANTO_PROTOCOL_VERSION = 0.9
 
 from .feed import allfeeds, wlock_feeds, rlock_feeds, wlock_all, wunlock_all, rlock_all, runlock_all, stop_feeds
 from .encoding import encoder
-from .protect import protection
 from .server import CantoServer
 from .config import CantoConfig
 from .storage import CantoShelf
@@ -51,30 +45,14 @@ log = logging.getLogger("CANTO-DAEMON")
 
 FETCH_CHECK_INTERVAL = 60
 
-# x.lock is a specific feed's lock
-# x.locks are all feed's locks
-#
-# Index threads take
-#   x.lock (w) -> protect_lock (r)
-#              \> tag_lock (w)
-#
-# (meaning that it holds x.lock, but takes protect and tag locks serially)
-#
-# So, if any command (in this file) needs to take feed_lock and x.locks first
-#
-# Every other threaded lock taker is going to come through CantoBackend, so the
-# lock order must be the same. Between all cmd_ functions.
-#
-# Fortunately, this means we can do locks alphabetically.
-#
-# Another caveat is that commands that call each other need to hold all the
-# locks at the outset. For example, cmd_items calls cmd_attributes (to handle
-# automatic attributes), so it needs to hold the feed and protect locks, even
-# if it didn't have to otherwise.
-
 class DaemonBackendPlugin(Plugin):
     pass
 
+# Index threads and the main thread no longer take multiple locks at once. The
+# cmd_* functions in CantoBackend only need to worry about deadlocking with
+# each other. By convention, they take locks in alphabetical order and all on
+# start of command, **except feed_lock which is always first**.
+
 class CantoBackend(PluginHandler, CantoServer):
     def __init__(self):
 
@@ -243,11 +221,8 @@ class CantoBackend(PluginHandler, CantoServer):
         for socket in self.watches["del_tags"]:
             self.write(socket, "DELTAGS", tags)
 
-    # If a socket dies, it's not longer watching any events and
-    # revoke any protection associated with it
+    # If a socket dies, it's no longer watching any events.
 
-    @wlock_feeds
-    @write_lock(protect_lock)
     @write_lock(socktran_lock)
     @write_lock(watch_lock)
     def on_kill_socket(self, socket):
@@ -267,8 +242,6 @@ class CantoBackend(PluginHandler, CantoServer):
         if socket in list(self.socket_transforms.keys()):
             del self.socket_transforms[socket]
 
-        protection.unprotect((socket, "auto"))
-
     # We need to be alerted on certain events, ensure
     # we get notified about them.
 
@@ -284,43 +257,27 @@ class CantoBackend(PluginHandler, CantoServer):
         on_hook("daemon_del_configs", lambda x, y : self.internal_command(x, self.in_delconfigs, y))
         on_hook("daemon_get_configs", lambda x, y : self.internal_command(x, self.in_configs, y))
 
-    # Return list of item tuples after global transforms have
-    # been performed on them.
+    # Return list of item tuples after global transforms have been performed on
+    # them.
 
     def apply_transforms(self, socket, tag):
-
-        # Lambda up a function that, given an id, can tell a filter if it's
-        # protected in this circumstance without allowing filters access to the
-        # socket, or requiring them to know anything about the protection
-        # scheme.
-
-        filter_immune = lambda x :\
-                protection.protected_by(x, (socket, "filter-immune"))
-
-        # Lock the feeds so we don't lose any items. We don't want transforms
-        # to have to deal with ids disappearing from feeds.
-
-        # Because we hold tag / protect write, we know that no more feeds can
-        # start indexing, so taking the lock just means we're making sure none
-        # of them are in progress.
-
         tagobj = alltags.get_tag(tag)
 
         f = allfeeds.items_to_feeds(tagobj)
 
         # Global transform
         if self.conf.global_transform:
-            tagobj = self.conf.global_transform(tagobj, filter_immune)
+            tagobj = self.conf.global_transform(tagobj)
 
         # Tag level transform
         if tag in alltags.tag_transforms and\
                 alltags.tag_transforms[tag]:
-            tagobj = alltags.tag_transforms[tag](tagobj, filter_immune)
+            tagobj = alltags.tag_transforms[tag](tagobj)
 
         # Socket transforms ANDed together.
         if socket in self.socket_transforms:
             for filt in self.socket_transforms[socket]:
-                tagobj = self.socket_transforms[socket][filt](tagobj, filter_immune)
+                tagobj = self.socket_transforms[socket][filt](tagobj)
 
         return tagobj
 
@@ -360,7 +317,6 @@ class CantoBackend(PluginHandler, CantoServer):
             transforms.append({"name" : transform["name"]})
         self.write(socket, "LISTTRANSFORMS", transforms)
 
-
     # TRANSFORM {} -> return current socket transform, with names instead of
     # actual filt objects.
     # TRANSFORM {"string":"transform"} -> set a socket transform
@@ -421,10 +377,9 @@ class CantoBackend(PluginHandler, CantoServer):
 
     # ITEMS [tags] -> { tag : [ ids ], tag2 : ... }
 
-    @rlock_feeds # For _cmd_attributes
+    @read_lock(feed_lock)
     @read_lock(attr_lock)
     @read_lock(config_lock)
-    @write_lock(protect_lock)
     @read_lock(socktran_lock)
     @read_lock(tag_lock)
     def cmd_items(self, socket, args):
@@ -435,12 +390,6 @@ class CantoBackend(PluginHandler, CantoServer):
             # get_tag returns a list invariably, but may be empty.
             items = self.apply_transforms(socket, tag)
 
-            # ITEMS must protect all given items automatically to
-            # avoid instances where an item disappears before a PROTECT
-            # call can be made by the client.
-
-            protection.protect((socket, "auto"), items)
-
             # Divide each response into 100 items or less and dispatch them
 
             attr_list = []
@@ -460,27 +409,14 @@ class CantoBackend(PluginHandler, CantoServer):
             self.write(socket, "ITEMSDONE", {})
 
             for attr_req in attr_list:
-                self._cmd_attributes(socket, attr_req)
-
-    # FEEDATTRIBUTES { 'url' : [ attribs .. ] .. } ->
-    # { url : { attribute : value } ... }
-
-    @rlock_feeds
-    def cmd_feedattributes(self, socket, args):
-        r = {}
-        for url in list(args.keys()):
-            feed = allfeeds.get_feed(url)
-            if not feed:
-                continue
-            r.update({ url : feed.get_feedattributes(args[url])})
-        self.write(socket, "FEEDATTRIBUTES", r)
+                self.cmd_attributes(socket, attr_req)
 
     # ATTRIBUTES { id : [ attribs .. ] .. } ->
     # { id : { attribute : value } ... }
 
     # This is called with appropriate locks from cmd_items
 
-    def _cmd_attributes(self, socket, args):
+    def cmd_attributes(self, socket, args):
         ret = {}
         feeds = allfeeds.items_to_feeds(list(args.keys()))
         for f in feeds:
@@ -488,14 +424,9 @@ class CantoBackend(PluginHandler, CantoServer):
 
         self.write(socket, "ATTRIBUTES", ret)
 
-    @rlock_feeds
-    def cmd_attributes(self, socket, args):
-        self._cmd_attributes(socket, args)
-
     # SETATTRIBUTES { id : { attribute : value } ... } -> None
 
-    @wlock_feeds
-    @write_lock(attr_lock)
+    @read_lock(feed_lock)
     @write_lock(tag_lock)
     def cmd_setattributes(self, socket, args):
 
@@ -539,7 +470,6 @@ class CantoBackend(PluginHandler, CantoServer):
 
     @write_lock(feed_lock)
     @write_lock(config_lock)
-    @read_lock(protect_lock)
     @write_lock(tag_lock)
     @read_lock(watch_lock)
     def cmd_setconfigs(self, socket, args):
@@ -557,7 +487,6 @@ class CantoBackend(PluginHandler, CantoServer):
 
     @write_lock(feed_lock)
     @write_lock(config_lock)
-    @read_lock(protect_lock)
     @write_lock(tag_lock)
     @read_lock(watch_lock)
     def cmd_delconfigs(self, socket, args):
@@ -596,21 +525,6 @@ class CantoBackend(PluginHandler, CantoServer):
             else:
                 self.watches["tags"][tag] = [socket]
 
-    # PROTECT { "reason" : [ id, ... ], ... }
-
-    @write_lock(protect_lock)
-    def cmd_protect(self, socket, args):
-        for reason in args:
-            protection.protect((socket, reason), args[reason])
-
-    # UNPROTECT { "reason" : [ id, ... ], ... }
-
-    @write_lock(protect_lock)
-    def cmd_unprotect(self, socket, args):
-        for reason in args:
-            for id in args[reason]:
-                protection.unprotect_one((socket, reason), id)
-
     # UPDATE {}
 
     # Note that this is intended to allow clients to take manual
index 1c3499a..6f243ff 100644 (file)
@@ -8,11 +8,10 @@
 #   published by the Free Software Foundation.
 
 from .plugins import PluginHandler, Plugin
-from .protect import protection
 from .encoding import encoder
 from .tag import alltags
 from .rwlock import RWLock, read_lock, write_lock
-from .locks import feed_lock, protect_lock, tag_lock
+from .locks import feed_lock, tag_lock
 from .hooks import call_hook
 
 import traceback
@@ -53,9 +52,7 @@ class CantoFeeds():
     def get_feeds(self):
         return [ self.get_feed(URL) for URL in self.order]
 
-    # Interestingly, don't need to get the read lock on feed because the URL is
-    # part of the ID.
-
+    @read_lock(feed_lock)
     def items_to_feeds(self, items):
         f = {}
         for i in items:
@@ -63,8 +60,6 @@ class CantoFeeds():
 
             if d_i["URL"] in self.feeds:
                 feed = self.feeds[d_i["URL"]]
-            elif d_i["URL"] in self.dead_feeds:
-                feed = self.dead_feeds[d_i["URL"]]
             else:
                 raise Exception("Can't find feed: %s" % d_i["URL"])
 
@@ -81,6 +76,7 @@ class CantoFeeds():
             feed.destroy()
         self.dead_feeds = {}
 
+    @write_lock(feed_lock)
     def reset(self):
         self.dead_feeds = self.feeds
         self.feeds = {}
@@ -163,32 +159,17 @@ class CantoFeed(PluginHandler):
 
         allfeeds.add_feed(URL, self)
 
-    # Remove old items from all tags. Called with self.lock read
+    # Identify items that are no longer being recorded.
 
-    def sweep_tags(self, olditems):
+    def old_ids(self, olditems):
+        r = []
         for olditem in olditems:
             for item in self.shelf[self.URL]["entries"]:
-                # Same ID exists in new items
                 if item["id"] == olditem["id"]:
                     break
             else:
-                # Will lock
                 cache_id = self._cacheitem(olditem)["id"]
-                alltags.remove_id(cache_id)
-
-    # Return { attribute : value ... }
-
-    def get_feedattributes(self, attributes):
-
-        d = self.shelf[self.URL]
-
-        r = {}
-        for attr in attributes:
-            if attr in d:
-                r[attr] = d[attr]
-            else:
-                r[attr] = ""
-
+                r.append(cache_id)
         return r
 
     # Return { id : { attribute : value .. } .. }
@@ -222,6 +203,8 @@ class CantoFeed(PluginHandler):
     # Given an ID and a dict of attributes, update the disk.
     def set_attributes(self, items, attributes):
 
+        self.lock.acquire_write()
+
         d = self.shelf[self.URL]
 
         for item in items:
@@ -249,23 +232,10 @@ class CantoFeed(PluginHandler):
                     d_item[a] = attributes[item][a]
 
         self.shelf[self.URL] = d
-
-        # Allow DaemonFeed plugins to define set_attribute_* functions
-        # to receive notifications of changed attributes
-
-        for attr in list(self.plugin_attrs.keys()):
-            if not attr.startswith("set_attributes_"):
-                continue
-
-            try:
-                a = getattr(self, attr)
-                a(feed = self, items = items, attributes = attributes, content = d)
-            except:
-                log.error("Error running feed set_attr plugin")
-                log.error(traceback.format_exc())
-
         self.shelf.update_umod()
 
+        self.lock.release_write()
+
     def _cacheitem(self, item):
         cacheitem = {}
         cacheitem["id"] = json.dumps(\
@@ -320,16 +290,23 @@ class CantoFeed(PluginHandler):
                     log.error("Unable to uniquely ID item: %s" % item)
                     continue
 
-            if item in to_add:
+            # Make sure that this item can be uniquely IDed.
+            found = False
+            for seen_item in to_add:
+                if seen_item["id"] == item["id"]:
+                    found = True
+                    break
+
+            if found:
                 continue
+
             to_add.append(item)
 
             cacheitem = self._cacheitem(item)
             tags_to_add.append((cacheitem["id"], "maintag:" + self.name))
 
-            # Move over custom content from item.
-            # Custom content is denoted with a key that
-            # starts with "canto", but not "canto_update",
+            # Move over custom content from item.  Custom content is denoted
+            # with a key that starts with "canto", but not "canto_update",
             # which changes invariably.
 
             for olditem in old_contents["entries"]:
@@ -350,36 +327,15 @@ class CantoFeed(PluginHandler):
 
         update_contents["entries"] = to_add
 
-        # STEP 2: Keep items that have been given to clients from disappearing
-        # from the disk. This ensures that even if an item has been sitting in
-        # an active client for days requests for more information won't fail.
-
-        # While we're looping through the olditems, we also make a list of
-        # unprotected items for the next step (increasing the number of
-        # remembered feed items).
-
-        unprotected_old = []
-
-        protect_lock.acquire_read()
+        # STEP 2: Keep all items that have been seen in the feed in the last
+        # day (keep_time default).
 
+        ref_time = time.time()
         for olditem in old_contents["entries"]:
-            for item in update_contents["entries"]:
+            for item in to_add:
                 if olditem["id"] == item["id"]:
-                    log.debug("still in self.items")
-                    break
-            else:
-                if protection.protected(olditem["id"]):
-                    log.debug("Saving committed item: %s" % olditem["id"])
-                    update_contents["entries"].append(olditem)
-                else:
-                    unprotected_old.append(olditem)
-
-        protect_lock.release_read()
-
-        # Keep all items that have been seen in the feed in the last day.
+                    continue
 
-        ref_time = time.time()
-        for olditem in unprotected_old:
             if "canto_update" not in olditem:
                 olditem["canto_update"] = ref_time
 
@@ -417,19 +373,28 @@ class CantoFeed(PluginHandler):
 
         if not self.stopped:
             # Commit the updates to disk.
+
             self.shelf[self.URL] = update_contents
 
+            to_remove = self.old_ids(old_contents["entries"])
+
+            self.lock.release_write()
+
             tag_lock.acquire_write()
+
+            for item in to_remove:
+                alltags.remove_id(item)
+
             for item, tag in tags_to_add:
                 alltags.add_tag(item, tag)
 
             # Go through and take items in old_contents that didn't make it
             # into update_contents / self.items and remove them from all tags.
 
-            self.sweep_tags(old_contents["entries"])
-            tag_lock.release_write()
 
-        self.lock.release_write()
+            tag_lock.release_write()
+        else:
+            self.lock.release_write()
 
     def destroy(self):
         # Check for existence in case of delete quickly
index 4d881c2..16846e4 100644 (file)
@@ -21,7 +21,6 @@ tag_lock = RWLock('tag_lock')
 config_lock = RWLock('config_lock')
 
 # The rest of these are independent.
-protect_lock = RWLock('protect_lock')
 watch_lock = RWLock('watch_lock')
 attr_lock = RWLock('attr_lock')
 socktran_lock = RWLock('socktran_lock')
diff --git a/canto_next/protect.py b/canto_next/protect.py
deleted file mode 100644 (file)
index 1e89c63..0000000
+++ /dev/null
@@ -1,70 +0,0 @@
-# -*- coding: utf-8 -*-
-
-#Canto - RSS reader backend
-#   Copyright (C) 2010 Jack Miller <jack@codezen.org>
-#
-#   This program is free software; you can redistribute it and/or modify
-#   it under the terms of the GNU General Public License version 2 as 
-#   published by the Free Software Foundation.
-
-import logging
-
-log = logging.getLogger("PROTECT")
-
-# The Protection class keeps a list of IDs whose information should
-# never leave the disk. The primary utility of keeping these items around is
-# that clients who have outdated (even by just a minute or so) information can
-# count on the fact that all of the data associated with those items is still
-# retrievable.
-
-class Protection():
-    def __init__(self):
-        self.prot = {}
-
-    # Return whether a single item tuple is protected.
-
-    def protected(self, item):
-        for key in list(self.prot.keys()):
-            if item in self.prot[key]:
-                log.debug("item %s is protected." % (item,))
-                return True
-
-        log.debug("item %s is not protected." % (item,))
-        return False
-
-    # Return whether a single item tuple is protected by
-    # a particular key.
-
-    def protected_by(self, item, reason):
-        if reason not in self.prot:
-            log.debug("No reason \"%s\" known." % (reason,))
-            return False
-
-        if item in self.prot[reason]:
-            log.debug("item %s is protected by %s" % (item, reason))
-            return True
-
-        log.debug("item %s is not protected by %s" % (item, reason))
-        return False
-
-    # Put a set of items under the protection of key.
-
-    def protect(self, key, items):
-        if key in self.prot:
-            self.prot[key] += items[:]
-        else:
-            self.prot[key] = items[:]
-
-    # Unprotect a single item under key.
-
-    def unprotect_one(self, key, item):
-        if key in self.prot and item in self.prot[key]:
-            self.prot[key].remove(item)
-
-    # Unprotect all items under the protection of key.
-
-    def unprotect(self, key):
-        if key in self.prot:
-            del self.prot[key]
-
-protection = Protection()
index 0051d85..156cbda 100644 (file)
@@ -32,7 +32,7 @@ class CantoTransform():
 
     # This is called with the feeds already read locked.
 
-    def __call__(self, tag, immune):
+    def __call__(self, tag):
         a = {}
         f = allfeeds.items_to_feeds(tag)
         needed = self.needed_attributes(tag)
@@ -48,12 +48,12 @@ class CantoTransform():
                 log.warn("Missing attributes for %s" % item)
                 tag.remove(item)
 
-        return self.transform(tag, a, immune)
+        return self.transform(tag, a)
 
     def needed_attributes(self, tag):
         return []
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         return items
 
 # A StateFilter will filter out items that match a particular state. Supports
@@ -67,7 +67,7 @@ class StateFilter(CantoTransform):
     def needed_attributes(self, tag):
         return ["canto-state"]
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         if self.state[0] == "-":
             state = self.state[1:]
             keep = True
@@ -76,7 +76,7 @@ class StateFilter(CantoTransform):
             keep = False
 
         return [ i for i in items if \
-                (state in attrs[i]["canto-state"]) == keep or immune(i)]
+                (state in attrs[i]["canto-state"]) == keep]
 
 # Filter out items whose [attribute] content matches an arbitrary regex.
 
@@ -95,16 +95,12 @@ class ContentFilterRegex(CantoTransform):
             return []
         return [ self.attribute ]
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         if not self.match:
             return items
 
         r = []
         for item in items:
-            if immune(item):
-                r.append(item)
-                continue
-
             a = attrs[item]
             if self.attribute not in a:
                 r.append(item)
@@ -132,7 +128,7 @@ class SortTransform(CantoTransform):
     def needed_attributes(self, tag):
         return [ self.attr ]
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         r = [ ( attrs[item][self.attr], item ) for item in items ]
         r.sort()
         return [ item[1] for item in r ]
@@ -158,10 +154,10 @@ class AllTransform(CantoTransform):
                     needed.append(a)
         return needed
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         good_items = items[:]
         for t in self.transforms:
-            good_items = t.transform(good_items, attrs, immune)
+            good_items = t.transform(good_items, attrs)
             if not good_items:
                 break
         return good_items
@@ -186,12 +182,12 @@ class AnyTransform(CantoTransform):
                     needed.append(a)
         return needed
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         good_items = []
         per_transform = []
 
         for t in self.transforms:
-            per_transform.append(t.transform(items, attrs, immune))
+            per_transform.append(t.transform(items, attrs))
 
         for pt in per_transform:
             for item in pt:
@@ -209,14 +205,10 @@ class InTags(CantoTransform):
     def needed_attributes(self, tag):
         return []
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         good = []
 
         for item in items:
-            if immune(item):
-                good.append(item)
-                continue
-
             for itag in alltags.items_to_tags([item]):
                 if itag in self.tags:
                     good.append(item)
index 5185f75..e6ad4f7 100644 (file)
@@ -84,7 +84,7 @@ class RedditScoreSort(CantoTransform):
     def needed_attributes(self, tag):
         return [ "reddit-score" ]
 
-    def transform(self, items, attrs, immune):
+    def transform(self, items, attrs):
         scored = []
         unscored = []