Thread limit fetching
authorJack Miller <jack@codezen.org>
Tue, 16 Jun 2015 23:21:07 +0000 (18:21 -0500)
committerJack Miller <jack@codezen.org>
Wed, 17 Jun 2015 21:39:23 +0000 (16:39 -0500)
- Defaults to number of cpus (as reported by multiprocessing.cpu_count)

canto_next/fetch.py

index 67e9906..cda3b43 100644 (file)
@@ -10,7 +10,9 @@ from .plugins import PluginHandler, Plugin
 from .feed import allfeeds
 from .hooks import call_hook
 
+from multiprocessing import cpu_count
 from threading import Thread
+
 import feedparser
 import traceback
 import urllib.parse
@@ -141,11 +143,13 @@ class CantoFetchThread(PluginHandler, Thread):
         # This handles it's own locking
         self.feed.index(update_contents)
 
-
 class CantoFetch():
     def __init__(self, shelf):
         self.shelf = shelf
+        self.deferred = []
         self.threads = []
+        self.thread_limit = cpu_count()
+        log.debug("Thread Limit: %s", self.thread_limit)
 
     def needs_update(self, feed):
         passed = time.time() - feed.last_update
@@ -159,7 +163,24 @@ class CantoFetch():
                 return True
         return False
 
+    def _start_one(self, feed, fromdisk):
+        if len(self.threads) >= self.thread_limit:
+            return False
+
+        thread = CantoFetchThread(feed, fromdisk)
+        thread.start()
+        log.debug("Started thread for feed %s", feed)
+        self.threads.append((thread, feed.URL))
+        return True
+
     def fetch(self, force, fromdisk):
+        for feed, fd in self.deferred[:]:
+            if self._start_one(feed, fd):
+                log.debug("No longer deferred")
+                self.deferred = self.deferred[1:]
+            else:
+                return
+
         for feed in allfeeds.get_feeds():
             if not force and not self.needs_update(feed):
                 continue
@@ -167,10 +188,9 @@ class CantoFetch():
             if self.still_working(feed.URL):
                 continue
 
-            thread = CantoFetchThread(feed, fromdisk)
-            thread.start()
-            log.debug("Started thread for feed %s", feed.URL)
-            self.threads.append((thread, feed.URL))
+            if not self._start_one(feed, fromdisk):
+                log.debug("Deferring %s %s", feed, fromdisk)
+                self.deferred.append((feed, fromdisk))
 
     def reap(self, force=False):
         work_done = False