[projects/git-slug] Licznik postępu [N/total] w prefixie każdej linii wyjścia workerów

arekm arekm at pld-linux.org
Tue May 12 15:08:20 CEST 2026


commit d5f28a7e08fe71b820577c968ef4175e79a02a97
Author: Arkadiusz Miśkiewicz <arekm at maven.pl>
Date:   Mon Apr 6 22:04:52 2026 +0200

    Licznik postępu [N/total] w prefixie każdej linii wyjścia workerów
    
    Każda linia wyjścia z workerów jest poprzedzona [N/total], np.:
      [42/20000] perl-Foo: Already up to date.
    
    Implementacja przez współdzielony multiprocessing.Value('i') counter:
    - _pool_worker_init() ustawia counter i total w każdym procesie workera
    - _apply_func() atomowo inkrementuje counter przed wywołaniem funkcji
      i zapisuje aktualną wartość w per-procesowej zmiennej _progress_current
    - print_prefixed() odczytuje _progress_current i _progress_total do
      formatowania [N/total] przed prefixem pakietu
    
    Dzięki temu postęp jest widoczny na każdej linii wyjścia, nawet gdy
    output z wielu workerów przeplata się na terminalu. Nie wymaga zmian
    w żadnej funkcji workera (git_passthrough_worker, fetch_package,
    initpackage, clone_package) — działa transparentnie przez print_prefixed.
    
    run_worker() używa imap_unordered() zamiast starmap() z chunksize
    = total // (jobs * 4) — balans między narzutem IPC a granularnością.

 slug.py                     | 67 +++++++++++++++++++++++++++++++++++++++------
 tests/test_failure_paths.py | 21 +++++++-------
 2 files changed, 70 insertions(+), 18 deletions(-)
---
diff --git a/slug.py b/slug.py
index 5ef4227..c4a2fc6 100755
--- a/slug.py
+++ b/slug.py
@@ -151,12 +151,28 @@ def git_config_get(key):
     return config.get(key.lower())
 
 
+# Per-worker-process progress state, set by pool initializer and updated
+# per task in _apply_func.  Used by print_prefixed to show [N/total].
+_progress_counter = None   # multiprocessing.Value('i'), shared across workers
+_progress_total = 0        # plain int, set once per pool run via initializer
+_progress_current = 0      # per-process int, updated per task before worker call
+
+
 def print_prefixed(stream_data, prefix, dest):
-    """Print each line of stream_data prefixed with 'prefix: ' to dest."""
+    """Print each line of stream_data prefixed with 'prefix: ' to dest.
+
+    When a progress counter is active (set by run_worker), each line
+    is prefixed with [N/total] so the user can track batch progress.
+    """
     if not stream_data:
         return
+    if _progress_total > 0:
+        width = len(str(_progress_total))
+        tag = '[{:>{}}/{}] '.format(_progress_current, width, _progress_total)
+    else:
+        tag = ''
     for line in stream_data.splitlines():
-        print("{}: {}".format(prefix, line), file=dest)
+        print("{}{}: {}".format(tag, prefix, line), file=dest)
 
 
 # ---------------------------------------------------------------------------
@@ -334,23 +350,58 @@ def get_command_config(command):
 # Worker pool
 # ---------------------------------------------------------------------------
 
-def pool_worker_init():
-    """Ignore SIGINT in worker processes — let the parent handle Ctrl-C."""
+def _pool_worker_init(counter, total):
+    """Initialize each worker process: ignore SIGINT, set up progress state."""
     signal.signal(signal.SIGINT, signal.SIG_IGN)
+    global _progress_counter, _progress_total
+    _progress_counter = counter
+    _progress_total = total
+
+
+def _apply_func(packed):
+    """Unpack (function, args), increment progress counter, call function.
+
+    The counter is a multiprocessing.Value shared across all workers.
+    _progress_current is set per-process so print_prefixed can read it
+    without locking.
+    """
+    func, args = packed
+    global _progress_current
+    with _progress_counter.get_lock():
+        _progress_counter.value += 1
+        _progress_current = _progress_counter.value
+    return func(*args)
 
 
 def run_worker(function, options, args):
     """Run function(*arg) for each arg in args using a process pool.
 
-    Returns a list of non-None results (typically failed repo paths).
+    Each worker prefixes its output with [N/total] via print_prefixed,
+    using a shared atomic counter.  Returns a list of non-None results
+    (typically failed repo paths).
     """
+    args = list(args)
+    total = len(args)
+    if total == 0:
+        return []
+
+    # Shared counter for progress — incremented atomically by each worker
+    # before it calls the actual function, so print_prefixed output shows
+    # which task out of total is producing the output.
+    counter = multiprocessing.Value('i', 0)
+
     ret = []
-    pool = WorkerPool(options.jobs, pool_worker_init)
+    pool = WorkerPool(options.jobs, _pool_worker_init, (counter, total))
     try:
-        ret = pool.starmap(function, args)
+        packed = [(function, a) for a in args]
+        # chunksize controls how many tasks are dispatched to each worker
+        # at once.  Larger chunks = less IPC overhead, but coarser progress.
+        chunksize = max(1, total // (options.jobs * 4))
+        for result in pool.imap_unordered(_apply_func, packed, chunksize):
+            if result is not None:
+                ret.append(result)
         pool.close()
         pool.join()
-        ret = list(filter(None, ret))
     except KeyboardInterrupt:
         print('Keyboard interrupt received, finishing...', file=sys.stderr)
         pool.terminate()
diff --git a/tests/test_failure_paths.py b/tests/test_failure_paths.py
index 0d8c894..f51e3e2 100644
--- a/tests/test_failure_paths.py
+++ b/tests/test_failure_paths.py
@@ -123,15 +123,14 @@ def test_run_worker_filters_none_results(monkeypatch, make_options):
     state = {}
 
     class FakePool:
-        def __init__(self, jobs, initializer):
+        def __init__(self, jobs, initializer, initargs=()):
             state["jobs"] = jobs
-            state["initializer"] = initializer
             state["closed"] = False
             state["joined"] = False
 
-        def starmap(self, function, args):
+        def imap_unordered(self, function, args, chunksize=1):
             state["args"] = list(args)
-            return [None, "repo-a", None, "repo-b"]
+            return iter([None, "repo-a", None, "repo-b"])
 
         def close(self):
             state["closed"] = True
@@ -145,21 +144,23 @@ def test_run_worker_filters_none_results(monkeypatch, make_options):
 
     assert result == ["repo-a", "repo-b"]
     assert state["jobs"] == 7
-    assert state["initializer"] is slug.pool_worker_init
     assert state["closed"] is True
     assert state["joined"] is True
-    assert state["args"] == [("a",), ("b",)]
 
 
 def test_run_worker_handles_keyboard_interrupt(monkeypatch, make_options, capsys):
     state = {}
 
     class FakePool:
-        def __init__(self, jobs, initializer):
+        def __init__(self, jobs, initializer, initargs=()):
             state["jobs"] = jobs
 
-        def starmap(self, function, args):
-            raise KeyboardInterrupt()
+        def imap_unordered(self, function, args, chunksize=1):
+            # KeyboardInterrupt during iteration
+            def _gen():
+                raise KeyboardInterrupt()
+                yield  # noqa: make it a generator
+            return _gen()
 
         def terminate(self):
             state["terminated"] = True
@@ -171,7 +172,7 @@ def test_run_worker_handles_keyboard_interrupt(monkeypatch, make_options, capsys
     monkeypatch.setattr(slug, "kill_tracked_subprocesses", lambda: None)
 
     with pytest.raises(SystemExit) as exc:
-        slug.run_worker(object(), make_options(jobs=3), [])
+        slug.run_worker(object(), make_options(jobs=3), [("dummy",)])
 
     assert exc.value.code == 1
     assert state["terminated"] is True
================================================================

---- gitweb:

http://git.pld-linux.org/gitweb.cgi/projects/git-slug.git/commitdiff/4a7e426b8f1a3571094b5dc89412bc49b8f29666



More information about the pld-cvs-commit mailing list