aboutsummaryrefslogtreecommitdiff
path: root/rhodecode/lib/caching_query.py
blob: ffa21c0235a210997314e1d01f8f063eec0cae09 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""caching_query.py

Represent persistence structures which allow the usage of
Beaker caching with SQLAlchemy.

The three new concepts introduced here are:

 * CachingQuery - a Query subclass that caches and
   retrieves results in/from Beaker.
 * FromCache - a query option that establishes caching
   parameters on a Query
 * RelationshipCache - a variant of FromCache which is specific
   to a query invoked during a lazy load.
 * _params_from_query - extracts value parameters from
   a Query.

The rest of what's here are standard SQLAlchemy and
Beaker constructs.

"""
import beaker
from beaker.exceptions import BeakerException

from sqlalchemy.orm.interfaces import MapperOption
from sqlalchemy.orm.query import Query
from sqlalchemy.sql import visitors
from rhodecode.lib.utils2 import safe_str


class CachingQuery(Query):
    """A Query subclass which optionally loads full results from a Beaker
    cache region.

    The CachingQuery stores additional state that allows it to consult
    a Beaker cache before accessing the database:

    * A "region", which is a cache region argument passed to a
      Beaker CacheManager, specifies a particular cache configuration
      (including backend implementation, expiration times, etc.)
    * A "namespace", which is a qualifying name that identifies a
      group of keys within the cache.  A query that filters on a name
      might use the name "by_name", a query that filters on a date range
      to a joined table might use the name "related_date_range".

    When the above state is present, a Beaker cache is retrieved.

    The "namespace" name is first concatenated with
    a string composed of the individual entities and columns the Query
    requests, i.e. such as ``Query(User.id, User.name)``.

    The Beaker cache is then loaded from the cache manager based
    on the region and composed namespace.  The key within the cache
    itself is then constructed against the bind parameters specified
    by this query, which are usually literals defined in the
    WHERE clause.

    The FromCache and RelationshipCache mapper options below represent
    the "public" method of configuring this state upon the CachingQuery.

    """

    def __init__(self, manager, *args, **kw):
        self.cache_manager = manager
        Query.__init__(self, *args, **kw)

    def __iter__(self):
        """override __iter__ to pull results from Beaker
           if particular attributes have been configured.

           Note that this approach does *not* detach the loaded objects from
           the current session. If the cache backend is an in-process cache
           (like "memory") and lives beyond the scope of the current session's
           transaction, those objects may be expired. The method here can be
           modified to first expunge() each loaded item from the current
           session before returning the list of items, so that the items
           in the cache are not the same ones in the current Session.

        """
        if hasattr(self, '_cache_parameters'):
            return self.get_value(createfunc=lambda:
                                  list(Query.__iter__(self)))
        else:
            return Query.__iter__(self)

    def invalidate(self):
        """Invalidate the value represented by this Query."""

        cache, cache_key = _get_cache_parameters(self)
        cache.remove(cache_key)

    def get_value(self, merge=True, createfunc=None):
        """Return the value from the cache for this query.

        Raise KeyError if no value present and no
        createfunc specified.

        """
        cache, cache_key = _get_cache_parameters(self)
        ret = cache.get_value(cache_key, createfunc=createfunc)
        if merge:
            ret = self.merge_result(ret, load=False)
        return ret

    def set_value(self, value):
        """Set the value in the cache for this query."""

        cache, cache_key = _get_cache_parameters(self)
        cache.put(cache_key, value)


def query_callable(manager, query_cls=CachingQuery):
    def query(*arg, **kw):
        return query_cls(manager, *arg, **kw)
    return query


def get_cache_region(name, region):
    if region not in beaker.cache.cache_regions:
        raise BeakerException('Cache region `%s` not configured '
            'Check if proper cache settings are in the .ini files' % region)
    kw = beaker.cache.cache_regions[region]
    return beaker.cache.Cache._get_cache(name, kw)


def _get_cache_parameters(query):
    """For a query with cache_region and cache_namespace configured,
    return the correspoinding Cache instance and cache key, based
    on this query's current criterion and parameter values.

    """
    if not hasattr(query, '_cache_parameters'):
        raise ValueError("This Query does not have caching "
                         "parameters configured.")

    region, namespace, cache_key = query._cache_parameters

    namespace = _namespace_from_query(namespace, query)

    if cache_key is None:
        # cache key - the value arguments from this query's parameters.
        args = [safe_str(x) for x in _params_from_query(query)]
        args.extend(filter(lambda k: k not in ['None', None, u'None'],
                           [str(query._limit), str(query._offset)]))

        cache_key = " ".join(args)

    if cache_key is None:
        raise Exception('Cache key cannot be None')

    # get cache
    #cache = query.cache_manager.get_cache_region(namespace, region)
    cache = get_cache_region(namespace, region)
    # optional - hash the cache_key too for consistent length
    # import uuid
    # cache_key= str(uuid.uuid5(uuid.NAMESPACE_DNS, cache_key))

    return cache, cache_key


def _namespace_from_query(namespace, query):
    # cache namespace - the token handed in by the
    # option + class we're querying against
    namespace = " ".join([namespace] + [str(x) for x in query._entities])

    # memcached wants this
    namespace = namespace.replace(' ', '_')

    return namespace


def _set_cache_parameters(query, region, namespace, cache_key):

    if hasattr(query, '_cache_parameters'):
        region, namespace, cache_key = query._cache_parameters
        raise ValueError("This query is already configured "
                        "for region %r namespace %r" %
                        (region, namespace)
                    )
    query._cache_parameters = region, namespace, cache_key


class FromCache(MapperOption):
    """Specifies that a Query should load results from a cache."""

    propagate_to_loaders = False

    def __init__(self, region, namespace, cache_key=None):
        """Construct a new FromCache.

        :param region: the cache region.  Should be a
        region configured in the Beaker CacheManager.

        :param namespace: the cache namespace.  Should
        be a name uniquely describing the target Query's
        lexical structure.

        :param cache_key: optional.  A string cache key
        that will serve as the key to the query.   Use this
        if your query has a huge amount of parameters (such
        as when using in_()) which correspond more simply to
        some other identifier.

        """
        self.region = region
        self.namespace = namespace
        self.cache_key = cache_key

    def process_query(self, query):
        """Process a Query during normal loading operation."""

        _set_cache_parameters(query, self.region, self.namespace,
                              self.cache_key)


class RelationshipCache(MapperOption):
    """Specifies that a Query as called within a "lazy load"
       should load results from a cache."""

    propagate_to_loaders = True

    def __init__(self, region, namespace, attribute):
        """Construct a new RelationshipCache.

        :param region: the cache region.  Should be a
        region configured in the Beaker CacheManager.

        :param namespace: the cache namespace.  Should
        be a name uniquely describing the target Query's
        lexical structure.

        :param attribute: A Class.attribute which
        indicates a particular class relationship() whose
        lazy loader should be pulled from the cache.

        """
        self.region = region
        self.namespace = namespace
        self._relationship_options = {
            (attribute.property.parent.class_, attribute.property.key): self
        }

    def process_query_conditionally(self, query):
        """Process a Query that is used within a lazy loader.

        (the process_query_conditionally() method is a SQLAlchemy
        hook invoked only within lazyload.)

        """
        if query._current_path:
            mapper, key = query._current_path[-2:]

            for cls in mapper.class_.__mro__:
                if (cls, key) in self._relationship_options:
                    relationship_option = \
                        self._relationship_options[(cls, key)]
                    _set_cache_parameters(
                            query,
                            relationship_option.region,
                            relationship_option.namespace,
                            None)

    def and_(self, option):
        """Chain another RelationshipCache option to this one.

        While many RelationshipCache objects can be specified on a single
        Query separately, chaining them together allows for a more efficient
        lookup during load.

        """
        self._relationship_options.update(option._relationship_options)
        return self


def _params_from_query(query):
    """Pull the bind parameter values from a query.

    This takes into account any scalar attribute bindparam set up.

    E.g. params_from_query(query.filter(Cls.foo==5).filter(Cls.bar==7)))
    would return [5, 7].

    """
    v = []
    def visit_bindparam(bind):

        if bind.key in query._params:
            value = query._params[bind.key]
        elif bind.callable:
            # lazyloader may dig a callable in here, intended
            # to late-evaluate params after autoflush is called.
            # convert to a scalar value.
            value = bind.callable()
        else:
            value = bind.value

        v.append(value)
    if query._criterion is not None:
        visitors.traverse(query._criterion, {}, {'bindparam':visit_bindparam})
    for f in query._from_obj:
        visitors.traverse(f, {}, {'bindparam':visit_bindparam})
    return v