aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/names/test/test_cache.py
blob: 7afafedf87503451ded1ea8cd636e2d01bc7dbc8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

import time

from twisted.trial import unittest

from twisted.names import dns, cache
from twisted.internet import task

class Caching(unittest.TestCase):
    """
    Tests for L{cache.CacheResolver}.
    """

    def test_lookup(self):
        c = cache.CacheResolver({
            dns.Query(name='example.com', type=dns.MX, cls=dns.IN):
                (time.time(), ([], [], []))})
        return c.lookupMailExchange('example.com').addCallback(
            self.assertEqual, ([], [], []))


    def test_constructorExpires(self):
        """
        Cache entries passed into L{cache.CacheResolver.__init__} get
        cancelled just like entries added with cacheResult
        """

        r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
                           dns.Record_A("127.0.0.1", 60))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 50,
                           dns.Record_A("127.0.0.1", 50))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 40,
                           dns.Record_A("127.0.0.1", 40))])

        clock = task.Clock()
        query = dns.Query(name="example.com", type=dns.A, cls=dns.IN)

        c = cache.CacheResolver({ query : (clock.seconds(), r)}, reactor=clock)

        # 40 seconds is enough to expire the entry because expiration is based
        # on the minimum TTL.
        clock.advance(40)

        self.assertNotIn(query, c.cache)

        return self.assertFailure(
            c.lookupAddress("example.com"), dns.DomainError)


    def test_normalLookup(self):
        """
        When a cache lookup finds a cached entry from 1 second ago, it is
        returned with a TTL of original TTL minus the elapsed 1 second.
        """
        r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
                           dns.Record_A("127.0.0.1", 60))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 50,
                           dns.Record_A("127.0.0.1", 50))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 40,
                           dns.Record_A("127.0.0.1", 40))])

        clock = task.Clock()

        c = cache.CacheResolver(reactor=clock)
        c.cacheResult(dns.Query(name="example.com", type=dns.A, cls=dns.IN), r)

        clock.advance(1)

        def cbLookup(result):
            self.assertEquals(result[0][0].ttl, 59)
            self.assertEquals(result[1][0].ttl, 49)
            self.assertEquals(result[2][0].ttl, 39)
            self.assertEquals(result[0][0].name.name, "example.com")

        return c.lookupAddress("example.com").addCallback(cbLookup)


    def test_cachedResultExpires(self):
        """
        Once the TTL has been exceeded, the result is removed from the cache.
        """
        r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
                           dns.Record_A("127.0.0.1", 60))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 50,
                           dns.Record_A("127.0.0.1", 50))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 40,
                           dns.Record_A("127.0.0.1", 40))])

        clock = task.Clock()

        c = cache.CacheResolver(reactor=clock)
        query = dns.Query(name="example.com", type=dns.A, cls=dns.IN)
        c.cacheResult(query, r)

        clock.advance(40)

        self.assertNotIn(query, c.cache)

        return self.assertFailure(
            c.lookupAddress("example.com"), dns.DomainError)


    def test_expiredTTLLookup(self):
        """
        When the cache is queried exactly as the cached entry should expire but
        before it has actually been cleared, the cache does not return the
        expired entry.
        """
        r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
                           dns.Record_A("127.0.0.1", 60))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 50,
                           dns.Record_A("127.0.0.1", 50))],
             [dns.RRHeader("example.com", dns.A, dns.IN, 40,
                           dns.Record_A("127.0.0.1", 40))])

        clock = task.Clock()
        # Make sure timeouts never happen, so entries won't get cleared:
        clock.callLater = lambda *args, **kwargs: None

        c = cache.CacheResolver({
            dns.Query(name="example.com", type=dns.A, cls=dns.IN) :
                (clock.seconds(), r)}, reactor=clock)

        clock.advance(60.1)

        return self.assertFailure(
            c.lookupAddress("example.com"), dns.DomainError)