1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# -*- coding: utf-8 -*-
"""
rhodecode.model.comment
~~~~~~~~~~~~~~~~~~~~~~~
comments model for RhodeCode
:created_on: Nov 11, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
"""
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import traceback
from pylons.i18n.translation import _
from sqlalchemy.util.compat import defaultdict
from rhodecode.lib import extract_mentioned_users
from rhodecode.lib import helpers as h
from rhodecode.model import BaseModel
from rhodecode.model.db import ChangesetComment, User, Repository, Notification
from rhodecode.model.notification import NotificationModel
log = logging.getLogger(__name__)
class ChangesetCommentsModel(BaseModel):
def __get_changeset_comment(self, changeset_comment):
return self._get_instance(ChangesetComment, changeset_comment)
def _extract_mentions(self, s):
user_objects = []
for username in extract_mentioned_users(s):
user_obj = User.get_by_username(username, case_insensitive=True)
if user_obj:
user_objects.append(user_obj)
return user_objects
def create(self, text, repo_id, user_id, revision, f_path=None,
line_no=None):
"""
Creates new comment for changeset
:param text:
:param repo_id:
:param user_id:
:param revision:
:param f_path:
:param line_no:
"""
if text:
repo = Repository.get(repo_id)
cs = repo.scm_instance.get_changeset(revision)
desc = cs.message
author_email = cs.author_email
comment = ChangesetComment()
comment.repo = repo
comment.user_id = user_id
comment.revision = revision
comment.text = text
comment.f_path = f_path
comment.line_no = line_no
self.sa.add(comment)
self.sa.flush()
# make notification
line = ''
if line_no:
line = _('on line %s') % line_no
subj = h.link_to('Re commit: %(commit_desc)s %(line)s' % \
{'commit_desc': desc, 'line': line},
h.url('changeset_home', repo_name=repo.repo_name,
revision=revision,
anchor='comment-%s' % comment.comment_id,
qualified=True,
)
)
body = text
# get the current participants of this changeset
recipients = ChangesetComment.get_users(revision=revision)
# add changeset author if it's in rhodecode system
recipients += [User.get_by_email(author_email)]
NotificationModel().create(
created_by=user_id, subject=subj, body=body,
recipients=recipients, type_=Notification.TYPE_CHANGESET_COMMENT
)
mention_recipients = set(self._extract_mentions(body))\
.difference(recipients)
if mention_recipients:
subj = _('[Mention]') + ' ' + subj
NotificationModel().create(
created_by=user_id, subject=subj, body=body,
recipients=mention_recipients,
type_=Notification.TYPE_CHANGESET_COMMENT
)
return comment
def delete(self, comment):
"""
Deletes given comment
:param comment_id:
"""
comment = self.__get_changeset_comment(comment)
self.sa.delete(comment)
return comment
def get_comments(self, repo_id, revision):
return ChangesetComment.query()\
.filter(ChangesetComment.repo_id == repo_id)\
.filter(ChangesetComment.revision == revision)\
.filter(ChangesetComment.line_no == None)\
.filter(ChangesetComment.f_path == None).all()
def get_inline_comments(self, repo_id, revision):
comments = self.sa.query(ChangesetComment)\
.filter(ChangesetComment.repo_id == repo_id)\
.filter(ChangesetComment.revision == revision)\
.filter(ChangesetComment.line_no != None)\
.filter(ChangesetComment.f_path != None).all()
paths = defaultdict(lambda: defaultdict(list))
for co in comments:
paths[co.f_path][co.line_no].append(co)
return paths.items()
|