[projects/git-slug: 2/9] Parallelize fetching, checking out, cloning.

glen glen at pld-linux.org
Mon Sep 21 21:40:52 CEST 2015


commit b1096c634ea9b262bd791863d68e2aed3847078d
Author: Arkadiusz Miśkiewicz <arekm at maven.pl>
Date:   Sun Nov 23 00:18:38 2014 +0100

    Parallelize fetching, checking out, cloning.
    
    Parallelize fetching, checking out, cloning using multiprocessing
    module.
    
    By default use number of parallel processes equal to number of
    system CPUs (use old value, 4, as fallback).
    
    Also replace thread based ThreadFetch() with the same multiprocessing
    mechanism as above for consistency.

 slug.py | 153 ++++++++++++++++++++++++++++++++++++----------------------------
 1 file changed, 87 insertions(+), 66 deletions(-)
---
diff --git a/slug.py b/slug.py
index 69bd3b9..da9c050 100755
--- a/slug.py
+++ b/slug.py
@@ -7,26 +7,18 @@ import os
 import shutil
 import subprocess
 import queue
-import threading
-
+import multiprocessing
 import argparse
 
 import signal
 import configparser
 
+from multiprocessing import Pool as WorkerPool
+
 from git_slug.gitconst import GITLOGIN, GITSERVER, GIT_REPO, GIT_REPO_PUSH, REMOTE_NAME, REMOTEREFS
 from git_slug.gitrepo import GitRepo, GitRepoError
 from git_slug.refsdata import GitArchiveRefsData, NoMatchedRepos, RemoteRefsError
 
-class Store():
-    def __init__(self):
-        self.lock = threading.Lock()
-        self.items = []
-
-    def put(self, item):
-        with self.lock:
-            self.items.append(item)
-
 class UnquoteConfig(configparser.ConfigParser):
     def get(self, section, option, **kwargs):
         value = super().get(section, option, **kwargs)
@@ -43,25 +35,15 @@ class DelAppend(argparse._AppendAction):
         item.append(values)
         setattr(namespace, self.dest, item)
 
-class ThreadFetch(threading.Thread):
-    def __init__(self, queue, output, pkgdir, depth=0):
-        threading.Thread.__init__(self)
-        self.queue = queue
-        self.packagesdir = pkgdir
-        self.depth = depth
-        self.output = output
-
-    def run(self):
-        while True:
-            (gitrepo, ref2fetch) = self.queue.get()
-            try:
-                (stdout, stderr) = gitrepo.fetch(ref2fetch, self.depth)
-                if stderr != b'':
-                    print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
-                    self.output.put(gitrepo)
-            except GitRepoError as e:
-                print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
-            self.queue.task_done()
+def cpu_count():
+    try:
+        return multiprocessing.cpu_count()
+    except NotImplementedError:
+        pass
+    return 4
+
+def pool_worker_init():
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
 
 def readconfig(path):
     config = UnquoteConfig(delimiters='=', interpolation=None, strict=False)
@@ -114,18 +96,19 @@ def getrefs(*args):
         sys.exit(2)
     return refs
 
+def fetch_package(gitrepo, ref2fetch, options):
+    try:
+        (stdout, stderr) = gitrepo.fetch(ref2fetch, options.depth)
+        if stderr != b'':
+            print('------', gitrepo.gdir[:-len('.git')], '------\n' + stderr.decode('utf-8'))
+            return gitrepo
+    except GitRepoError as e:
+        print('------', gitrepo.gdir[:-len('.git')], '------\n', e)
+         
 def fetch_packages(options, return_all=False):
-    fetch_queue = queue.Queue()
-    updated_repos = Store()
-    for i in range(options.jobs):
-        t = ThreadFetch(fetch_queue, updated_repos, options.packagesdir, options.depth)
-        t.setDaemon(True)
-        t.start()
-
-    signal.signal(signal.SIGINT, signal.SIG_DFL)
-
     refs = getrefs(options.branch, options.repopattern)
     print('Read remotes data')
+    args = []
     for pkgdir in sorted(refs.heads):
         gitdir = os.path.join(options.packagesdir, pkgdir, '.git')
         if not os.path.isdir(gitdir):
@@ -143,9 +126,18 @@ def fetch_packages(options, return_all=False):
                 ref2fetch.append('+{}:{}/{}'.format(ref, REMOTEREFS, ref[len('refs/heads/'):]))
         if ref2fetch:
             ref2fetch.append('refs/notes/*:refs/notes/*')
-            fetch_queue.put((gitrepo, ref2fetch))
+            args.append((gitrepo, ref2fetch, options))
 
-    fetch_queue.join()
+    pool = WorkerPool(options.jobs, pool_worker_init)
+    try:
+        updated_repos = pool.starmap(fetch_package, args)
+    except KeyboardInterrupt:
+        pool.terminate()
+    else:
+        pool.close()
+    pool.join()
+
+    updated_repos = list(filter(None, updated_repos))
 
     if options.prune:
         refs = getrefs('*')
@@ -158,26 +150,60 @@ def fetch_packages(options, return_all=False):
     if return_all:
         return refs.heads
     else:
-        return updated_repos.items
+        return updated_repos
+
+def checkout_package(repo, options):
+    try:
+        repo.checkout(options.checkout)
+    except GitRepoError as e:
+        print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
 
 def checkout_packages(options):
     if options.checkout is None:
         options.checkout = "/".join([REMOTE_NAME, options.branch[0]])
     fetch_packages(options)
     refs = getrefs(options.branch, options.repopattern)
+    repos = []
     for pkgdir in sorted(refs.heads):
-        repo = GitRepo(os.path.join(options.packagesdir, pkgdir))
-        try:
-            repo.checkout(options.checkout)
-        except GitRepoError as e:
-            print('Problem with checking branch {} in repo {}: {}'.format(options.checkout, repo.gdir, e), file=sys.stderr)
+        repos.append(GitRepo(os.path.join(options.packagesdir, pkgdir)))
+    pool = WorkerPool(options.jobs)
+    try:
+        pool.starmap(checkout_package, zip(repos, [options] * len(repos)))
+    except KeyboardInterrupt:
+        pool.terminate()
+    else:
+        pool.close()
+    pool.join()
+
+def clone_package(repo, options):
+    try:
+        repo.checkout('master')
+    except GitRepoError as e:
+        print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
 
 def clone_packages(options):
-    for repo in fetch_packages(options):
-        try:
-            repo.checkout('master')
-        except GitRepoError as e:
-            print('Problem with checking branch master in repo {}: {}'.format(repo.gdir, e), file=sys.stderr)
+    repos = fetch_packages(options)
+    pool = WorkerPool(options.jobs)
+    try:
+        pool.starmap(clone_package, zip(repos, [options] * len(repos)))
+    except KeyboardInterrupt:
+        pool.terminate()
+    else:
+        pool.close()
+    pool.join()
+
+def pull_package(gitrepo, options):
+    directory = os.path.basename(gitrepo.wtree)
+    try:
+        (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
+        sha1 = out.decode().strip()
+        (out, err) = gitrepo.commandexc(['rebase', sha1])
+        for line in out.decode().splitlines():
+            print(directory,":",line)
+    except GitRepoError as e:
+        for line in e.args[0].splitlines():
+            print("{}: {}".format(directory,line))
+        pass
 
 def pull_packages(options):
     repolist = []
@@ -189,19 +215,14 @@ def pull_packages(options):
     else:
         repolist = fetch_packages(options, False)
     print('--------Pulling------------')
-    for gitrepo in repolist:
-        directory = os.path.basename(gitrepo.wtree)
-        try:
-            (out, err) = gitrepo.commandexc(['rev-parse', '-q', '--verify', '@{u}'])
-            sha1 = out.decode().strip()
-            (out, err) = gitrepo.commandexc(['rebase', sha1])
-            for line in out.decode().splitlines():
-                print(directory,":",line)
-        except GitRepoError as e:
-            for line in e.args[0].splitlines():
-                print("{}: {}".format(directory,line))
-            pass
-
+    pool = WorkerPool(options.jobs, pool_worker_init)
+    try:
+        pool.starmap(pull_package, zip(repolist, [options] * len(repolist)))
+    except KeyboardInterrupt:
+        pool.terminate()
+    else:
+        pool.close()
+    pool.join()
 
 def list_packages(options):
     refs = getrefs(options.branch, options.repopattern)
@@ -213,7 +234,7 @@ common_options.add_argument('-d', '--packagesdir', help='local directory with gi
     default=os.path.expanduser('~/rpm/packages'))
 
 common_fetchoptions = argparse.ArgumentParser(add_help=False, parents=[common_options])
-common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=4, type=int)
+common_fetchoptions.add_argument('-j', '--jobs', help='number of threads to use', default=cpu_count(), type=int)
 common_fetchoptions.add_argument('repopattern', nargs='*', default = ['*'])
 common_fetchoptions.add_argument('--depth', help='depth of fetch', default=0)
 
================================================================

---- gitweb:

http://git.pld-linux.org/gitweb.cgi/projects/git-slug.git/commitdiff/d9fdf7d60e5aea39f9ac7065a09a645261fc712b



More information about the pld-cvs-commit mailing list