[PATCH] parsearchive: parallel processing

Daniel Axtens dja at axtens.net
Fri Oct 18 22:15:47 AEDT 2019


Allow parsing to be done in parallel with -jN. Handy for testing
the code against parallelism problems, also faster!

Issue: #241
Signed-off-by: Daniel Axtens <dja at axtens.net>

---

I wrote this ages ago when I was working on our racy series parsing.
I wanted to pull it out to test the patch to prevent split series.
Not fussy about whether we actually merge it or not.
---
 patchwork/management/commands/parsearchive.py | 132 +++++++++++++-----
 1 file changed, 95 insertions(+), 37 deletions(-)

diff --git patchwork/management/commands/parsearchive.py patchwork/management/commands/parsearchive.py
index b7f1ea7313c2..7ffc23ec8507 100644
--- patchwork/management/commands/parsearchive.py
+++ patchwork/management/commands/parsearchive.py
@@ -7,7 +7,9 @@ import logging
 import mailbox
 import os
 import sys
+import multiprocessing
 
+from django import db
 from django.core.management.base import BaseCommand
 
 from patchwork import models
@@ -16,6 +18,16 @@ from patchwork.parser import DuplicateMailError
 
 logger = logging.getLogger(__name__)
 
+TYPE_CONVERSION = {
+    models.Patch: 0,
+    models.CoverLetter: 1,
+    models.Comment: 2,
+}
+DUPLICATE = 3
+DROPPED = 4
+ERROR = 5
+NUM_TYPES = 6
+
 
 class Command(BaseCommand):
     help = 'Parse an mbox archive file and store any patches/comments found.'
@@ -28,17 +40,12 @@ class Command(BaseCommand):
             '--list-id',
             help='mailing list ID. If not supplied, this will be '
             'extracted from the mail headers.')
+        parser.add_argument(
+            '--jobs', '-j',
+            help='process the archive in N parallel jobs',
+            type=int, default=1)
 
     def handle(self, *args, **options):
-        results = {
-            models.Patch: 0,
-            models.CoverLetter: 0,
-            models.Comment: 0,
-        }
-        duplicates = 0
-        dropped = 0
-        errors = 0
-
         verbosity = int(options['verbosity'])
         if not verbosity:
             level = logging.CRITICAL
@@ -53,6 +60,11 @@ class Command(BaseCommand):
             logger.setLevel(level)
             logging.getLogger('patchwork.parser').setLevel(level)
 
+        jobs = options['jobs']
+        if jobs < 1:
+            logger.error('Invalid number of jobs %d, must be at least 1')
+            sys.exit(1)
+
         # TODO(stephenfin): Support passing via stdin
         path = args and args[0] or options['infile']
         if not os.path.exists(path):
@@ -65,8 +77,6 @@ class Command(BaseCommand):
         else:
             mbox = mailbox.Maildir(path, create=False)
 
-        count = len(mbox)
-
         # Iterate through the mbox. This will pick up exceptions that are only
         # thrown when a broken email is found part way through. Without this
         # block, we'd get the exception thrown in enumerate(mbox) below, which
@@ -84,26 +94,35 @@ class Command(BaseCommand):
             logger.error('Broken mbox/Maildir, aborting')
             return
 
-        logger.info('Parsing %d mails', count)
-        for i, msg in enumerate(mbox):
-            try:
-                obj = parse_mail(msg, options['list_id'])
-                if obj:
-                    results[type(obj)] += 1
-                else:
-                    dropped += 1
-            except DuplicateMailError as exc:
-                duplicates += 1
-                logger.warning('Duplicate mail for message ID %s', exc.msgid)
-            except (ValueError, Exception) as exc:
-                errors += 1
-                logger.warning('Invalid mail: %s', repr(exc))
-
-            if verbosity < 3 and (i % 10) == 0:
-                self.stdout.write('%06d/%06d\r' % (i, count), ending='')
-                self.stdout.flush()
+        # we need to close the db connection so each process gets its own
+        # see e.g. https://stackoverflow.com/a/10684672
+        db.connections.close_all()
+
+        threads = []
+        processed = multiprocessing.Value('i')
+        results = multiprocessing.Array('i', NUM_TYPES)
+        for job in range(jobs):
+            thread = multiprocessing.Process(target=self.parse_mbox,
+                                             kwargs={
+                                                 'path': path,
+                                                 'list_id': options['list_id'],
+                                                 'job': job,
+                                                 'num_jobs': jobs,
+                                                 'processed': processed,
+                                                 'results': results
+                                             })
+            print("starting", thread)
+            thread.daemon = True  # this makes Ctrl-C work
+            thread.start()
+            threads += [thread]
 
-        mbox.close()
+        count = len(mbox)
+        for thread in threads:
+            while thread.is_alive():
+                thread.join(1)
+                if True or verbosity < 3:
+                    self.stdout.write('%06d/%06d\r' % (processed.value, count), ending='')
+                    self.stdout.flush()
 
         if not verbosity:
             return
@@ -118,11 +137,50 @@ class Command(BaseCommand):
             '  %(errors)4d errors\n'
             'Total: %(new)s new entries' % {
                 'total': count,
-                'covers': results[models.CoverLetter],
-                'patches': results[models.Patch],
-                'comments': results[models.Comment],
-                'duplicates': duplicates,
-                'dropped': dropped,
-                'errors': errors,
-                'new': count - duplicates - dropped - errors,
+                'covers': results[TYPE_CONVERSION[models.CoverLetter]],
+                'patches': results[TYPE_CONVERSION[models.Patch]],
+                'comments': results[TYPE_CONVERSION[models.Comment]],
+                'duplicates': results[DUPLICATE],
+                'dropped': results[DROPPED],
+                'errors': results[ERROR],
+                'new': count - results[DUPLICATE] - results[DROPPED] - results[ERROR],
             })
+
+    def parse_mbox(self, path, list_id, job, num_jobs, processed, results):
+        if os.path.isfile(path):
+            mbox = mailbox.mbox(path, create=False)
+        else:
+            mbox = mailbox.Maildir(path, create=False)
+
+        count = len(mbox)
+
+        if num_jobs == 1:
+            logger.info('Parsing %d mails', count)
+        else:
+            logger.info('Parsing %d total mails, job %d of %d',
+                        count, job + 1, num_jobs)
+        for i, msg in enumerate(mbox):
+
+            if i % num_jobs != job:
+                continue
+
+            try:
+                obj = parse_mail(msg, list_id)
+                with results.get_lock():
+                    if obj:
+                        results[TYPE_CONVERSION[type(obj)]] += 1
+                    else:
+                        results[DROPPED] += 1
+            except DuplicateMailError as exc:
+                with results.get_lock():
+                    results[DUPLICATE] += 1
+                logger.warning('Duplicate mail %d for message ID %s', i, exc.msgid)
+            except (ValueError, Exception) as exc:
+                with results.get_lock():
+                    results[ERROR] += 1
+                logger.warning('Invalid mail %d: %s', i, repr(exc))
+
+            with processed.get_lock():
+                processed.value += 1
+
+        mbox.close()
-- 
2.20.1



More information about the Patchwork mailing list