[PATCH] parsearchive: parallel processing
Daniel Axtens
dja at axtens.net
Fri Oct 18 22:32:00 AEDT 2019
Daniel Axtens <dja at axtens.net> writes:
> Allow parsing to be done in parallel with -jN. Handy for testing
> the code against parallelism problems, also faster!
>
> Issue: #241
> Signed-off-by: Daniel Axtens <dja at axtens.net>
>
> ---
>
> I wrote this ages ago when I was working on our racy series parsing.
> I wanted to pull it out to test the patch to prevent split series.
> Not fussy about whether we actually merge it or not.
On reflection, we probably don't want to merge this, or if we do we want
some big noisy warning messages. Parsing in parallel is going to lose
comments - if the patch hasn't been parsed, the comment will just be
dropped. Out-of-order works fine for patches - by design - but the
comments are only saved if there's a patch to attach to.
Regards,
Daniel
> ---
> patchwork/management/commands/parsearchive.py | 132 +++++++++++++-----
> 1 file changed, 95 insertions(+), 37 deletions(-)
>
> diff --git patchwork/management/commands/parsearchive.py patchwork/management/commands/parsearchive.py
> index b7f1ea7313c2..7ffc23ec8507 100644
> --- patchwork/management/commands/parsearchive.py
> +++ patchwork/management/commands/parsearchive.py
> @@ -7,7 +7,9 @@ import logging
> import mailbox
> import os
> import sys
> +import multiprocessing
>
> +from django import db
> from django.core.management.base import BaseCommand
>
> from patchwork import models
> @@ -16,6 +18,16 @@ from patchwork.parser import DuplicateMailError
>
> logger = logging.getLogger(__name__)
>
> +TYPE_CONVERSION = {
> + models.Patch: 0,
> + models.CoverLetter: 1,
> + models.Comment: 2,
> +}
> +DUPLICATE = 3
> +DROPPED = 4
> +ERROR = 5
> +NUM_TYPES = 6
> +
>
> class Command(BaseCommand):
> help = 'Parse an mbox archive file and store any patches/comments found.'
> @@ -28,17 +40,12 @@ class Command(BaseCommand):
> '--list-id',
> help='mailing list ID. If not supplied, this will be '
> 'extracted from the mail headers.')
> + parser.add_argument(
> + '--jobs', '-j',
> + help='process the archive in N parallel jobs',
> + type=int, default=1)
>
> def handle(self, *args, **options):
> - results = {
> - models.Patch: 0,
> - models.CoverLetter: 0,
> - models.Comment: 0,
> - }
> - duplicates = 0
> - dropped = 0
> - errors = 0
> -
> verbosity = int(options['verbosity'])
> if not verbosity:
> level = logging.CRITICAL
> @@ -53,6 +60,11 @@ class Command(BaseCommand):
> logger.setLevel(level)
> logging.getLogger('patchwork.parser').setLevel(level)
>
> + jobs = options['jobs']
> + if jobs < 1:
> + logger.error('Invalid number of jobs %d, must be at least 1')
> + sys.exit(1)
> +
> # TODO(stephenfin): Support passing via stdin
> path = args and args[0] or options['infile']
> if not os.path.exists(path):
> @@ -65,8 +77,6 @@ class Command(BaseCommand):
> else:
> mbox = mailbox.Maildir(path, create=False)
>
> - count = len(mbox)
> -
> # Iterate through the mbox. This will pick up exceptions that are only
> # thrown when a broken email is found part way through. Without this
> # block, we'd get the exception thrown in enumerate(mbox) below, which
> @@ -84,26 +94,35 @@ class Command(BaseCommand):
> logger.error('Broken mbox/Maildir, aborting')
> return
>
> - logger.info('Parsing %d mails', count)
> - for i, msg in enumerate(mbox):
> - try:
> - obj = parse_mail(msg, options['list_id'])
> - if obj:
> - results[type(obj)] += 1
> - else:
> - dropped += 1
> - except DuplicateMailError as exc:
> - duplicates += 1
> - logger.warning('Duplicate mail for message ID %s', exc.msgid)
> - except (ValueError, Exception) as exc:
> - errors += 1
> - logger.warning('Invalid mail: %s', repr(exc))
> -
> - if verbosity < 3 and (i % 10) == 0:
> - self.stdout.write('%06d/%06d\r' % (i, count), ending='')
> - self.stdout.flush()
> + # we need to close the db connection so each process gets its own
> + # see e.g. https://stackoverflow.com/a/10684672
> + db.connections.close_all()
> +
> + threads = []
> + processed = multiprocessing.Value('i')
> + results = multiprocessing.Array('i', NUM_TYPES)
> + for job in range(jobs):
> + thread = multiprocessing.Process(target=self.parse_mbox,
> + kwargs={
> + 'path': path,
> + 'list_id': options['list_id'],
> + 'job': job,
> + 'num_jobs': jobs,
> + 'processed': processed,
> + 'results': results
> + })
> + print("starting", thread)
> + thread.daemon = True # this makes Ctrl-C work
> + thread.start()
> + threads += [thread]
>
> - mbox.close()
> + count = len(mbox)
> + for thread in threads:
> + while thread.is_alive():
> + thread.join(1)
> + if True or verbosity < 3:
> + self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='')
> + self.stdout.flush()
>
> if not verbosity:
> return
> @@ -118,11 +137,50 @@ class Command(BaseCommand):
> ' %(errors)4d errors\n'
> 'Total: %(new)s new entries' % {
> 'total': count,
> - 'covers': results[models.CoverLetter],
> - 'patches': results[models.Patch],
> - 'comments': results[models.Comment],
> - 'duplicates': duplicates,
> - 'dropped': dropped,
> - 'errors': errors,
> - 'new': count - duplicates - dropped - errors,
> + 'covers': results[TYPE_CONVERSION[models.CoverLetter]],
> + 'patches': results[TYPE_CONVERSION[models.Patch]],
> + 'comments': results[TYPE_CONVERSION[models.Comment]],
> + 'duplicates': results[DUPLICATE],
> + 'dropped': results[DROPPED],
> + 'errors': results[ERROR],
> + 'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR],
> })
> +
> + def parse_mbox(self, path, list_id, job, num_jobs, processed, results):
> + if os.path.isfile(path):
> + mbox = mailbox.mbox(path, create=False)
> + else:
> + mbox = mailbox.Maildir(path, create=False)
> +
> + count = len(mbox)
> +
> + if num_jobs == 1:
> + logger.info('Parsing %d mails', count)
> + else:
> + logger.info('Parsing %d total mails, job %d of %d',
> + count, job + 1, num_jobs)
> + for i, msg in enumerate(mbox):
> +
> + if i % num_jobs != job:
> + continue
> +
> + try:
> + obj = parse_mail(msg, list_id)
> + with results.get_lock():
> + if obj:
> + results[TYPE_CONVERSION[type(obj)]] += 1
> + else:
> + results[DROPPED] += 1
> + except DuplicateMailError as exc:
> + with results.get_lock():
> + results[DUPLICATE] += 1
> + logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid)
> + except (ValueError, Exception) as exc:
> + with results.get_lock():
> + results[ERROR] += 1
> + logger.warning('Invalid mail %d: %s', i, repr(exc))
> +
> + with processed.get_lock():
> + processed.value += 1
> +
> + mbox.close()
> --
> 2.20.1
More information about the Patchwork
mailing list