summaryrefslogtreecommitdiff
path: root/devel/utils.py
blob: bd57def929ff1bd5a6afc6c37905d348e16f26ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import re

from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.db import connection
from django.db.models import Count, Q

from devel.models import UserProfile
from main.utils import cache_function
from main.models import Package
from packages.models import PackageRelation

@cache_function(283)
def get_annotated_maintainers():
    profile_ids = UserProfile.allowed_repos.through.objects.values('userprofile_id')
    maintainers = User.objects.filter(
            is_active=True, userprofile__id__in=profile_ids).order_by(
            'first_name', 'last_name')

    # annotate the maintainers with # of maintained and flagged packages
    pkg_count_sql = """
SELECT pr.user_id, COUNT(*), COUNT(p.flag_date)
    FROM packages_packagerelation pr
    JOIN packages p
    ON pr.pkgbase = p.pkgbase
    WHERE pr.type = %s
    GROUP BY pr.user_id
"""
    cursor = connection.cursor()
    cursor.execute(pkg_count_sql, [PackageRelation.MAINTAINER])
    results = cursor.fetchall()

    pkg_count = {}
    flag_count = {}
    for k, total, flagged in results:
        pkg_count[k] = total
        flag_count[k] = flagged

    update_count = Package.objects.values_list('packager').order_by(
            'packager').annotate(Count('packager'))
    update_count = dict(update_count)

    for m in maintainers:
        m.package_count = pkg_count.get(m.id, 0)
        m.flagged_count = flag_count.get(m.id, 0)
        m.updated_count = update_count.get(m.id, 0)

    # force non-QS context, otherwise pickling doesn't work
    return list(maintainers)


def ignore_does_not_exist(func):
    def new_func(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except (ObjectDoesNotExist, MultipleObjectsReturned):
            return None
    return new_func


class UserFinder(object):
    def __init__(self):
        self.cache = {}
        self.username_cache = {}
        self.email_cache = {}
        self.pgp_cache = {}

    @staticmethod
    @ignore_does_not_exist
    def user_email(name, email):
        if email:
            return User.objects.get(email=email)
        return None

    @staticmethod
    @ignore_does_not_exist
    def username_email(name, email):
        if email and '@' in email:
            # split email addr at '@' symbol, ensure domain matches
            # or is a subdomain of archlinux.org
            # TODO: configurable domain/regex somewhere?
            username, domain = email.split('@', 1)
            if re.match(r'^(.+\.)?archlinux.org$', domain):
                return User.objects.get(username=username)
        return None

    @staticmethod
    @ignore_does_not_exist
    def profile_email(name, email):
        if email:
            return User.objects.get(userprofile__public_email=email)
        return None

    @staticmethod
    @ignore_does_not_exist
    def user_name(name, email):
        # yes, a bit odd but this is the easiest way since we can't always be
        # sure how to split the name. Ensure every 'token' appears in at least
        # one of the two name fields.
        if not name:
            return None
        name_q = Q()
        for token in name.split():
            # ignore quoted parts; e.g. nicknames in strings
            if re.match(r'^[\'"].*[\'"]$', token):
                continue
            name_q &= (Q(first_name__icontains=token) |
                    Q(last_name__icontains=token))
        return User.objects.get(name_q)

    def find(self, userstring):
        '''
        Attempt to find the corresponding User object for a standard
        packager string, e.g. something like
            'A. U. Thor <author@example.com>'.
        We start by searching for a matching email address; we then move onto
        matching by first/last name. If we cannot find a user, then return None.
        '''
        if not userstring:
            return None
        if userstring in self.cache:
            return self.cache[userstring]

        name = email = None

        matches = re.match(r'^([^<]+)? ?<([^>]*)>?', userstring)
        if not matches:
            name = userstring.strip()
        else:
            name = matches.group(1)
            email = matches.group(2)

        user = None
        find_methods = (self.user_email, self.profile_email,
                self.username_email, self.user_name)
        for matcher in find_methods:
            user = matcher(name, email)
            if user is not None:
                break

        self.cache[userstring] = user
        self.email_cache[email] = user
        return user

    def find_by_username(self, username):
        if not username:
            return None
        if username in self.username_cache:
            return self.username_cache[username]

        try:
            user = User.objects.get(username=username)
        except User.DoesNotExist:
            user = None

        self.username_cache[username] = user
        return user

    def find_by_email(self, email):
        if not email:
            return None
        if email in self.email_cache:
            return self.email_cache[email]

        user = self.user_email(None, email)
        if user is None:
            user = self.profile_email(None, email)
            if user is None:
                user = self.username_email(None, email)

        self.email_cache[email] = user
        return user

    def find_by_pgp_key(self, pgp_key):
        if not pgp_key:
            return None
        if pgp_key in self.pgp_cache:
            return self.pgp_cache[pgp_key]

        try:
            user = User.objects.get(
                    userprofile__pgp_key__endswith=pgp_key)
        except User.DoesNotExist:
            user = None

        self.pgp_cache[pgp_key] = user
        return user

    def clear_cache(self):
        self.cache = {}
        self.username_cache = {}
        self.email_cache = {}
        self.pgp_cache = {}

# vim: set ts=4 sw=4 et: