[PATCH v3] parsearchive: parallel processing
Daniel Axtens
dja at axtens.net
Fri Dec 6 12:59:53 AEDT 2019
Allow parsing to be done in parallel with -jN. Handy for testing
the code against parallelism problems, also faster!
Issue: #241
Signed-off-by: Daniel Axtens <dja at axtens.net>
---
v2: i was having problems with one thread 'running ahead' and merging
unrelated spins of poorly threaded series. This gives a 100 message
limit to how far any one thread can 'run ahead' of any other. 100 is
arbitrary, it's a speed/accuracy trade off.
v3: make the locking a bit clearer/more correct/avoid RMW issues, derp.
I still think it's probably not worth merging, but I want to have the
code out there so people can use it if they want, esp for development.
I'm still trying to hammer out a series parsing change that doesn't
involve arbitrary retries. I think I'm getting somewhere.
---
patchwork/management/commands/parsearchive.py | 147 +++++++++++++-----
1 file changed, 110 insertions(+), 37 deletions(-)
diff --git a/patchwork/management/commands/parsearchive.py b/patchwork/management/commands/parsearchive.py
index b7f1ea7313c2..45e942034812 100644
--- a/patchwork/management/commands/parsearchive.py
+++ b/patchwork/management/commands/parsearchive.py
@@ -7,7 +7,9 @@ import logging
import mailbox
import os
import sys
+import multiprocessing
+from django import db
from django.core.management.base import BaseCommand
from patchwork import models
@@ -16,6 +18,17 @@ from patchwork.parser import DuplicateMailError
logger = logging.getLogger(__name__)
+TYPE_CONVERSION = {
+ models.Patch: 0,
+ models.CoverLetter: 1,
+ models.Comment: 2,
+}
+DUPLICATE = 3
+DROPPED = 4
+ERROR = 5
+NUM_TYPES = 6
+
+RUN_AHEAD_LIMIT = 100
class Command(BaseCommand):
help = 'Parse an mbox archive file and store any patches/comments found.'
@@ -28,17 +41,12 @@ class Command(BaseCommand):
'--list-id',
help='mailing list ID. If not supplied, this will be '
'extracted from the mail headers.')
+ parser.add_argument(
+ '--jobs', '-j',
+ help='process the archive in N parallel jobs',
+ type=int, default=1)
def handle(self, *args, **options):
- results = {
- models.Patch: 0,
- models.CoverLetter: 0,
- models.Comment: 0,
- }
- duplicates = 0
- dropped = 0
- errors = 0
-
verbosity = int(options['verbosity'])
if not verbosity:
level = logging.CRITICAL
@@ -53,6 +61,11 @@ class Command(BaseCommand):
logger.setLevel(level)
logging.getLogger('patchwork.parser').setLevel(level)
+ jobs = options['jobs']
+ if jobs < 1:
+ logger.error('Invalid number of jobs %d, must be at least 1')
+ sys.exit(1)
+
# TODO(stephenfin): Support passing via stdin
path = args and args[0] or options['infile']
if not os.path.exists(path):
@@ -65,8 +78,6 @@ class Command(BaseCommand):
else:
mbox = mailbox.Maildir(path, create=False)
- count = len(mbox)
-
# Iterate through the mbox. This will pick up exceptions that are only
# thrown when a broken email is found part way through. Without this
# block, we'd get the exception thrown in enumerate(mbox) below, which
@@ -84,26 +95,39 @@ class Command(BaseCommand):
logger.error('Broken mbox/Maildir, aborting')
return
- logger.info('Parsing %d mails', count)
- for i, msg in enumerate(mbox):
- try:
- obj = parse_mail(msg, options['list_id'])
- if obj:
- results[type(obj)] += 1
- else:
- dropped += 1
- except DuplicateMailError as exc:
- duplicates += 1
- logger.warning('Duplicate mail for message ID %s', exc.msgid)
- except (ValueError, Exception) as exc:
- errors += 1
- logger.warning('Invalid mail: %s', repr(exc))
+ # we need to close the db connection so each process gets its own
+ # see e.g. https://stackoverflow.com/a/10684672
+ db.connections.close_all()
+
+ threads = []
+ processed = multiprocessing.Value('i')
+ results = multiprocessing.Array('i', NUM_TYPES)
+ run_ahead_barrier = multiprocessing.Condition()
+ latest_msg = multiprocessing.Value('i')
+ for job in range(jobs):
+ thread = multiprocessing.Process(target=self.parse_mbox,
+ kwargs={
+ 'path': path,
+ 'list_id': options['list_id'],
+ 'job': job,
+ 'num_jobs': jobs,
+ 'processed': processed,
+ 'results': results,
+ 'run_ahead_barrier': run_ahead_barrier,
+ 'latest_msg': latest_msg,
+ })
+ print("starting", thread)
+ thread.daemon = True # this makes Ctrl-C work
+ thread.start()
+ threads += [thread]
- if verbosity < 3 and (i % 10) == 0:
- self.stdout.write('%06d/%06d\r' % (i, count), ending='')
- self.stdout.flush()
-
- mbox.close()
+ count = len(mbox)
+ for thread in threads:
+ while thread.is_alive():
+ thread.join(1)
+ if True or verbosity < 3:
+ self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='')
+ self.stdout.flush()
if not verbosity:
return
@@ -118,11 +142,60 @@ class Command(BaseCommand):
' %(errors)4d errors\n'
'Total: %(new)s new entries' % {
'total': count,
- 'covers': results[models.CoverLetter],
- 'patches': results[models.Patch],
- 'comments': results[models.Comment],
- 'duplicates': duplicates,
- 'dropped': dropped,
- 'errors': errors,
- 'new': count - duplicates - dropped - errors,
+ 'covers': results[TYPE_CONVERSION[models.CoverLetter]],
+ 'patches': results[TYPE_CONVERSION[models.Patch]],
+ 'comments': results[TYPE_CONVERSION[models.Comment]],
+ 'duplicates': results[DUPLICATE],
+ 'dropped': results[DROPPED],
+ 'errors': results[ERROR],
+ 'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR],
})
+
+ def parse_mbox(self, path, list_id, job, num_jobs, processed, results,
+ run_ahead_barrier, latest_msg):
+ if os.path.isfile(path):
+ mbox = mailbox.mbox(path, create=False)
+ else:
+ mbox = mailbox.Maildir(path, create=False)
+
+ count = len(mbox)
+
+ if num_jobs == 1:
+ logger.info('Parsing %d mails', count)
+ else:
+ logger.info('Parsing %d total mails, job %d of %d',
+ count, job + 1, num_jobs)
+ for i, msg in enumerate(mbox):
+
+ if i % num_jobs != job:
+ continue
+
+ with run_ahead_barrier:
+ run_ahead_barrier.wait_for(lambda: i - latest_msg.value <= RUN_AHEAD_LIMIT)
+
+ try:
+ obj = parse_mail(msg, list_id)
+ with results.get_lock():
+ if obj:
+ results[TYPE_CONVERSION[type(obj)]] += 1
+ else:
+ results[DROPPED] += 1
+ except DuplicateMailError as exc:
+ with results.get_lock():
+ results[DUPLICATE] += 1
+ logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid)
+ except (ValueError, Exception) as exc:
+ with results.get_lock():
+ results[ERROR] += 1
+ logger.warning('Invalid mail %d: %s', i, repr(exc))
+
+ with processed.get_lock():
+ processed.value += 1
+
+ with run_ahead_barrier:
+ with latest_msg.get_lock():
+ if i > latest_msg.value:
+ latest_msg.value = i
+ run_ahead_barrier.notify_all()
+
+ mbox.close()
--
2.20.1
More information about the Patchwork
mailing list