[projects/git-slug] Licznik postępu [N/total] w prefixie każdej linii wyjścia workerów
arekm
arekm at pld-linux.org
Tue May 12 15:08:20 CEST 2026
commit d5f28a7e08fe71b820577c968ef4175e79a02a97
Author: Arkadiusz Miśkiewicz <arekm at maven.pl>
Date: Mon Apr 6 22:04:52 2026 +0200
Licznik postępu [N/total] w prefixie każdej linii wyjścia workerów
Każda linia wyjścia z workerów jest poprzedzona [N/total], np.:
[42/20000] perl-Foo: Already up to date.
Implementacja przez współdzielony multiprocessing.Value('i') counter:
- _pool_worker_init() ustawia counter i total w każdym procesie workera
- _apply_func() atomowo inkrementuje counter przed wywołaniem funkcji
i zapisuje aktualną wartość w per-procesowej zmiennej _progress_current
- print_prefixed() odczytuje _progress_current i _progress_total do
formatowania [N/total] przed prefixem pakietu
Dzięki temu postęp jest widoczny na każdej linii wyjścia, nawet gdy
output z wielu workerów przeplata się na terminalu. Nie wymaga zmian
w żadnej funkcji workera (git_passthrough_worker, fetch_package,
initpackage, clone_package) — działa transparentnie przez print_prefixed.
run_worker() używa imap_unordered() zamiast starmap() z chunksize
= total // (jobs * 4) — balans między narzutem IPC a granularnością.
slug.py | 67 +++++++++++++++++++++++++++++++++++++++------
tests/test_failure_paths.py | 21 +++++++-------
2 files changed, 70 insertions(+), 18 deletions(-)
---
diff --git a/slug.py b/slug.py
index 5ef4227..c4a2fc6 100755
--- a/slug.py
+++ b/slug.py
@@ -151,12 +151,28 @@ def git_config_get(key):
return config.get(key.lower())
+# Per-worker-process progress state, set by pool initializer and updated
+# per task in _apply_func. Used by print_prefixed to show [N/total].
+_progress_counter = None # multiprocessing.Value('i'), shared across workers
+_progress_total = 0 # plain int, set once per pool run via initializer
+_progress_current = 0 # per-process int, updated per task before worker call
+
+
def print_prefixed(stream_data, prefix, dest):
- """Print each line of stream_data prefixed with 'prefix: ' to dest."""
+ """Print each line of stream_data prefixed with 'prefix: ' to dest.
+
+ When a progress counter is active (set by run_worker), each line
+ is prefixed with [N/total] so the user can track batch progress.
+ """
if not stream_data:
return
+ if _progress_total > 0:
+ width = len(str(_progress_total))
+ tag = '[{:>{}}/{}] '.format(_progress_current, width, _progress_total)
+ else:
+ tag = ''
for line in stream_data.splitlines():
- print("{}: {}".format(prefix, line), file=dest)
+ print("{}{}: {}".format(tag, prefix, line), file=dest)
# ---------------------------------------------------------------------------
@@ -334,23 +350,58 @@ def get_command_config(command):
# Worker pool
# ---------------------------------------------------------------------------
-def pool_worker_init():
- """Ignore SIGINT in worker processes — let the parent handle Ctrl-C."""
+def _pool_worker_init(counter, total):
+ """Initialize each worker process: ignore SIGINT, set up progress state."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
+ global _progress_counter, _progress_total
+ _progress_counter = counter
+ _progress_total = total
+
+
+def _apply_func(packed):
+ """Unpack (function, args), increment progress counter, call function.
+
+ The counter is a multiprocessing.Value shared across all workers.
+ _progress_current is set per-process so print_prefixed can read it
+ without locking.
+ """
+ func, args = packed
+ global _progress_current
+ with _progress_counter.get_lock():
+ _progress_counter.value += 1
+ _progress_current = _progress_counter.value
+ return func(*args)
def run_worker(function, options, args):
"""Run function(*arg) for each arg in args using a process pool.
- Returns a list of non-None results (typically failed repo paths).
+ Each worker prefixes its output with [N/total] via print_prefixed,
+ using a shared atomic counter. Returns a list of non-None results
+ (typically failed repo paths).
"""
+ args = list(args)
+ total = len(args)
+ if total == 0:
+ return []
+
+ # Shared counter for progress — incremented atomically by each worker
+ # before it calls the actual function, so print_prefixed output shows
+ # which task out of total is producing the output.
+ counter = multiprocessing.Value('i', 0)
+
ret = []
- pool = WorkerPool(options.jobs, pool_worker_init)
+ pool = WorkerPool(options.jobs, _pool_worker_init, (counter, total))
try:
- ret = pool.starmap(function, args)
+ packed = [(function, a) for a in args]
+ # chunksize controls how many tasks are dispatched to each worker
+ # at once. Larger chunks = less IPC overhead, but coarser progress.
+ chunksize = max(1, total // (options.jobs * 4))
+ for result in pool.imap_unordered(_apply_func, packed, chunksize):
+ if result is not None:
+ ret.append(result)
pool.close()
pool.join()
- ret = list(filter(None, ret))
except KeyboardInterrupt:
print('Keyboard interrupt received, finishing...', file=sys.stderr)
pool.terminate()
diff --git a/tests/test_failure_paths.py b/tests/test_failure_paths.py
index 0d8c894..f51e3e2 100644
--- a/tests/test_failure_paths.py
+++ b/tests/test_failure_paths.py
@@ -123,15 +123,14 @@ def test_run_worker_filters_none_results(monkeypatch, make_options):
state = {}
class FakePool:
- def __init__(self, jobs, initializer):
+ def __init__(self, jobs, initializer, initargs=()):
state["jobs"] = jobs
- state["initializer"] = initializer
state["closed"] = False
state["joined"] = False
- def starmap(self, function, args):
+ def imap_unordered(self, function, args, chunksize=1):
state["args"] = list(args)
- return [None, "repo-a", None, "repo-b"]
+ return iter([None, "repo-a", None, "repo-b"])
def close(self):
state["closed"] = True
@@ -145,21 +144,23 @@ def test_run_worker_filters_none_results(monkeypatch, make_options):
assert result == ["repo-a", "repo-b"]
assert state["jobs"] == 7
- assert state["initializer"] is slug.pool_worker_init
assert state["closed"] is True
assert state["joined"] is True
- assert state["args"] == [("a",), ("b",)]
def test_run_worker_handles_keyboard_interrupt(monkeypatch, make_options, capsys):
state = {}
class FakePool:
- def __init__(self, jobs, initializer):
+ def __init__(self, jobs, initializer, initargs=()):
state["jobs"] = jobs
- def starmap(self, function, args):
- raise KeyboardInterrupt()
+ def imap_unordered(self, function, args, chunksize=1):
+ # KeyboardInterrupt during iteration
+ def _gen():
+ raise KeyboardInterrupt()
+ yield # noqa: make it a generator
+ return _gen()
def terminate(self):
state["terminated"] = True
@@ -171,7 +172,7 @@ def test_run_worker_handles_keyboard_interrupt(monkeypatch, make_options, capsys
monkeypatch.setattr(slug, "kill_tracked_subprocesses", lambda: None)
with pytest.raises(SystemExit) as exc:
- slug.run_worker(object(), make_options(jobs=3), [])
+ slug.run_worker(object(), make_options(jobs=3), [("dummy",)])
assert exc.value.code == 1
assert state["terminated"] is True
================================================================
---- gitweb:
http://git.pld-linux.org/gitweb.cgi/projects/git-slug.git/commitdiff/4a7e426b8f1a3571094b5dc89412bc49b8f29666
More information about the pld-cvs-commit
mailing list