1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import time
from twisted.trial import unittest
from twisted.names import dns, cache
from twisted.internet import task
class Caching(unittest.TestCase):
"""
Tests for L{cache.CacheResolver}.
"""
def test_lookup(self):
c = cache.CacheResolver({
dns.Query(name='example.com', type=dns.MX, cls=dns.IN):
(time.time(), ([], [], []))})
return c.lookupMailExchange('example.com').addCallback(
self.assertEqual, ([], [], []))
def test_constructorExpires(self):
"""
Cache entries passed into L{cache.CacheResolver.__init__} get
cancelled just like entries added with cacheResult
"""
r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
dns.Record_A("127.0.0.1", 60))],
[dns.RRHeader("example.com", dns.A, dns.IN, 50,
dns.Record_A("127.0.0.1", 50))],
[dns.RRHeader("example.com", dns.A, dns.IN, 40,
dns.Record_A("127.0.0.1", 40))])
clock = task.Clock()
query = dns.Query(name="example.com", type=dns.A, cls=dns.IN)
c = cache.CacheResolver({ query : (clock.seconds(), r)}, reactor=clock)
# 40 seconds is enough to expire the entry because expiration is based
# on the minimum TTL.
clock.advance(40)
self.assertNotIn(query, c.cache)
return self.assertFailure(
c.lookupAddress("example.com"), dns.DomainError)
def test_normalLookup(self):
"""
When a cache lookup finds a cached entry from 1 second ago, it is
returned with a TTL of original TTL minus the elapsed 1 second.
"""
r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
dns.Record_A("127.0.0.1", 60))],
[dns.RRHeader("example.com", dns.A, dns.IN, 50,
dns.Record_A("127.0.0.1", 50))],
[dns.RRHeader("example.com", dns.A, dns.IN, 40,
dns.Record_A("127.0.0.1", 40))])
clock = task.Clock()
c = cache.CacheResolver(reactor=clock)
c.cacheResult(dns.Query(name="example.com", type=dns.A, cls=dns.IN), r)
clock.advance(1)
def cbLookup(result):
self.assertEquals(result[0][0].ttl, 59)
self.assertEquals(result[1][0].ttl, 49)
self.assertEquals(result[2][0].ttl, 39)
self.assertEquals(result[0][0].name.name, "example.com")
return c.lookupAddress("example.com").addCallback(cbLookup)
def test_cachedResultExpires(self):
"""
Once the TTL has been exceeded, the result is removed from the cache.
"""
r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
dns.Record_A("127.0.0.1", 60))],
[dns.RRHeader("example.com", dns.A, dns.IN, 50,
dns.Record_A("127.0.0.1", 50))],
[dns.RRHeader("example.com", dns.A, dns.IN, 40,
dns.Record_A("127.0.0.1", 40))])
clock = task.Clock()
c = cache.CacheResolver(reactor=clock)
query = dns.Query(name="example.com", type=dns.A, cls=dns.IN)
c.cacheResult(query, r)
clock.advance(40)
self.assertNotIn(query, c.cache)
return self.assertFailure(
c.lookupAddress("example.com"), dns.DomainError)
def test_expiredTTLLookup(self):
"""
When the cache is queried exactly as the cached entry should expire but
before it has actually been cleared, the cache does not return the
expired entry.
"""
r = ([dns.RRHeader("example.com", dns.A, dns.IN, 60,
dns.Record_A("127.0.0.1", 60))],
[dns.RRHeader("example.com", dns.A, dns.IN, 50,
dns.Record_A("127.0.0.1", 50))],
[dns.RRHeader("example.com", dns.A, dns.IN, 40,
dns.Record_A("127.0.0.1", 40))])
clock = task.Clock()
# Make sure timeouts never happen, so entries won't get cleared:
clock.callLater = lambda *args, **kwargs: None
c = cache.CacheResolver({
dns.Query(name="example.com", type=dns.A, cls=dns.IN) :
(clock.seconds(), r)}, reactor=clock)
clock.advance(60.1)
return self.assertFailure(
c.lookupAddress("example.com"), dns.DomainError)
|