芝麻web文件管理V1.00
编辑当前文件:/home/conskgoa/doughi.co.uk/dns.zip
PK g\ grange.pynu [ # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """DNS GENERATE range conversion.""" import dns def from_text(text): """Convert the text form of a range in a GENERATE statement to an integer. @param text: the textual range @type text: string @return: The start, stop and step values. @rtype: tuple """ # TODO, figure out the bounds on start, stop and step. step = 1 cur = '' state = 0 # state 0 1 2 3 4 # x - y / z if text and text[0] == '-': raise dns.exception.SyntaxError("Start cannot be a negative number") for c in text: if c == '-' and state == 0: start = int(cur) cur = '' state = 2 elif c == '/': stop = int(cur) cur = '' state = 4 elif c.isdigit(): cur += c else: raise dns.exception.SyntaxError("Could not parse %s" % (c)) if state in (1, 3): raise dns.exception.SyntaxError() if state == 2: stop = int(cur) if state == 4: step = int(cur) assert step >= 1 assert start >= 0 assert start <= stop # TODO, can start == stop? return (start, stop, step) PK g\`oL L hash.pynu [ # Copyright (C) 2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """Hashing backwards compatibility wrapper""" import hashlib hashes = {} hashes['MD5'] = hashlib.md5 hashes['SHA1'] = hashlib.sha1 hashes['SHA224'] = hashlib.sha224 hashes['SHA256'] = hashlib.sha256 hashes['SHA384'] = hashlib.sha384 hashes['SHA512'] = hashlib.sha512 def get(algorithm): return hashes[algorithm.upper()] PK g\2Y9 inet.pynu [ # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """Generic Internet address helper functions.""" import socket import dns.ipv4 import dns.ipv6 # We assume that AF_INET is always defined. AF_INET = socket.AF_INET # AF_INET6 might not be defined in the socket module, but we need it. # We'll try to use the socket module's value, and if it doesn't work, # we'll use our own value. try: AF_INET6 = socket.AF_INET6 except AttributeError: AF_INET6 = 9999 def inet_pton(family, text): """Convert the textual form of a network address into its binary form. @param family: the address family @type family: int @param text: the textual address @type text: string @raises NotImplementedError: the address family specified is not implemented. @rtype: string """ if family == AF_INET: return dns.ipv4.inet_aton(text) elif family == AF_INET6: return dns.ipv6.inet_aton(text) else: raise NotImplementedError def inet_ntop(family, address): """Convert the binary form of a network address into its textual form. @param family: the address family @type family: int @param address: the binary address @type address: string @raises NotImplementedError: the address family specified is not implemented. @rtype: string """ if family == AF_INET: return dns.ipv4.inet_ntoa(address) elif family == AF_INET6: return dns.ipv6.inet_ntoa(address) else: raise NotImplementedError def af_for_address(text): """Determine the address family of a textual-form network address. @param text: the textual address @type text: string @raises ValueError: the address family cannot be determined from the input. @rtype: int """ try: dns.ipv4.inet_aton(text) return AF_INET except Exception: try: dns.ipv6.inet_aton(text) return AF_INET6 except: raise ValueError def is_multicast(text): """Is the textual-form network address a multicast address? @param text: the textual address @raises ValueError: the address family cannot be determined from the input. @rtype: bool """ try: first = ord(dns.ipv4.inet_aton(text)[0]) return first >= 224 and first <= 239 except Exception: try: first = ord(dns.ipv6.inet_aton(text)[0]) return first == 255 except Exception: raise ValueError PK g\E!o& & tsig.pynu [ # Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """DNS TSIG support.""" import hmac import struct import dns.exception import dns.hash import dns.rdataclass import dns.name from ._compat import long, string_types, text_type class BadTime(dns.exception.DNSException): """The current time is not within the TSIG's validity time.""" class BadSignature(dns.exception.DNSException): """The TSIG signature fails to verify.""" class PeerError(dns.exception.DNSException): """Base class for all TSIG errors generated by the remote peer""" class PeerBadKey(PeerError): """The peer didn't know the key we used""" class PeerBadSignature(PeerError): """The peer didn't like the signature we sent""" class PeerBadTime(PeerError): """The peer didn't like the time we sent""" class PeerBadTruncation(PeerError): """The peer didn't like amount of truncation in the TSIG we sent""" # TSIG Algorithms HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") HMAC_SHA1 = dns.name.from_text("hmac-sha1") HMAC_SHA224 = dns.name.from_text("hmac-sha224") HMAC_SHA256 = dns.name.from_text("hmac-sha256") HMAC_SHA384 = dns.name.from_text("hmac-sha384") HMAC_SHA512 = dns.name.from_text("hmac-sha512") _hashes = { HMAC_SHA224: 'SHA224', HMAC_SHA256: 'SHA256', HMAC_SHA384: 'SHA384', HMAC_SHA512: 'SHA512', HMAC_SHA1: 'SHA1', HMAC_MD5: 'MD5', } default_algorithm = HMAC_MD5 BADSIG = 16 BADKEY = 17 BADTIME = 18 BADTRUNC = 22 def sign(wire, keyname, secret, time, fudge, original_id, error, other_data, request_mac, ctx=None, multi=False, first=True, algorithm=default_algorithm): """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata for the input parameters, the HMAC MAC calculated by applying the TSIG signature algorithm, and the TSIG digest context. @rtype: (string, string, hmac.HMAC object) @raises ValueError: I{other_data} is too long @raises NotImplementedError: I{algorithm} is not supported """ if isinstance(other_data, text_type): other_data = other_data.encode() (algorithm_name, digestmod) = get_algorithm(algorithm) if first: ctx = hmac.new(secret, digestmod=digestmod) ml = len(request_mac) if ml > 0: ctx.update(struct.pack('!H', ml)) ctx.update(request_mac) id = struct.pack('!H', original_id) ctx.update(id) ctx.update(wire[2:]) if first: ctx.update(keyname.to_digestable()) ctx.update(struct.pack('!H', dns.rdataclass.ANY)) ctx.update(struct.pack('!I', 0)) long_time = time + long(0) upper_time = (long_time >> 32) & long(0xffff) lower_time = long_time & long(0xffffffff) time_mac = struct.pack('!HIH', upper_time, lower_time, fudge) pre_mac = algorithm_name + time_mac ol = len(other_data) if ol > 65535: raise ValueError('TSIG Other Data is > 65535 bytes') post_mac = struct.pack('!HH', error, ol) + other_data if first: ctx.update(pre_mac) ctx.update(post_mac) else: ctx.update(time_mac) mac = ctx.digest() mpack = struct.pack('!H', len(mac)) tsig_rdata = pre_mac + mpack + mac + id + post_mac if multi: ctx = hmac.new(secret, digestmod=digestmod) ml = len(mac) ctx.update(struct.pack('!H', ml)) ctx.update(mac) else: ctx = None return (tsig_rdata, mac, ctx) def hmac_md5(wire, keyname, secret, time, fudge, original_id, error, other_data, request_mac, ctx=None, multi=False, first=True, algorithm=default_algorithm): return sign(wire, keyname, secret, time, fudge, original_id, error, other_data, request_mac, ctx, multi, first, algorithm) def validate(wire, keyname, secret, now, request_mac, tsig_start, tsig_rdata, tsig_rdlen, ctx=None, multi=False, first=True): """Validate the specified TSIG rdata against the other input parameters. @raises FormError: The TSIG is badly formed. @raises BadTime: There is too much time skew between the client and the server. @raises BadSignature: The TSIG signature did not validate @rtype: hmac.HMAC object""" (adcount,) = struct.unpack("!H", wire[10:12]) if adcount == 0: raise dns.exception.FormError adcount -= 1 new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start] current = tsig_rdata (aname, used) = dns.name.from_wire(wire, current) current = current + used (upper_time, lower_time, fudge, mac_size) = \ struct.unpack("!HIHH", wire[current:current + 10]) time = ((upper_time + long(0)) << 32) + (lower_time + long(0)) current += 10 mac = wire[current:current + mac_size] current += mac_size (original_id, error, other_size) = \ struct.unpack("!HHH", wire[current:current + 6]) current += 6 other_data = wire[current:current + other_size] current += other_size if current != tsig_rdata + tsig_rdlen: raise dns.exception.FormError if error != 0: if error == BADSIG: raise PeerBadSignature elif error == BADKEY: raise PeerBadKey elif error == BADTIME: raise PeerBadTime elif error == BADTRUNC: raise PeerBadTruncation else: raise PeerError('unknown TSIG error code %d' % error) time_low = time - fudge time_high = time + fudge if now < time_low or now > time_high: raise BadTime (junk, our_mac, ctx) = sign(new_wire, keyname, secret, time, fudge, original_id, error, other_data, request_mac, ctx, multi, first, aname) if our_mac != mac: raise BadSignature return ctx def get_algorithm(algorithm): """Returns the wire format string and the hash module to use for the specified TSIG algorithm @rtype: (string, hash constructor) @raises NotImplementedError: I{algorithm} is not supported """ if isinstance(algorithm, string_types): algorithm = dns.name.from_text(algorithm) try: return (algorithm.to_digestable(), dns.hash.hashes[_hashes[algorithm]]) except KeyError: raise NotImplementedError("TSIG algorithm " + str(algorithm) + " is not supported") def get_algorithm_and_mac(wire, tsig_rdata, tsig_rdlen): """Return the tsig algorithm for the specified tsig_rdata @raises FormError: The TSIG is badly formed. """ current = tsig_rdata (aname, used) = dns.name.from_wire(wire, current) current = current + used (upper_time, lower_time, fudge, mac_size) = \ struct.unpack("!HIHH", wire[current:current + 10]) current += 10 mac = wire[current:current + mac_size] current += mac_size if current > tsig_rdata + tsig_rdlen: raise dns.exception.FormError return (aname, mac) PK g\ӉG G tokenizer.pynu [ # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """Tokenize DNS master file format""" from io import StringIO import sys import dns.exception import dns.name import dns.ttl from ._compat import long, text_type, binary_type _DELIMITERS = { ' ': True, '\t': True, '\n': True, ';': True, '(': True, ')': True, '"': True} _QUOTING_DELIMITERS = {'"': True} EOF = 0 EOL = 1 WHITESPACE = 2 IDENTIFIER = 3 QUOTED_STRING = 4 COMMENT = 5 DELIMITER = 6 class UngetBufferFull(dns.exception.DNSException): """An attempt was made to unget a token when the unget buffer was full.""" class Token(object): """A DNS master file format token. @ivar ttype: The token type @type ttype: int @ivar value: The token value @type value: string @ivar has_escape: Does the token value contain escapes? @type has_escape: bool """ def __init__(self, ttype, value='', has_escape=False): """Initialize a token instance. @param ttype: The token type @type ttype: int @param value: The token value @type value: string @param has_escape: Does the token value contain escapes? @type has_escape: bool """ self.ttype = ttype self.value = value self.has_escape = has_escape def is_eof(self): return self.ttype == EOF def is_eol(self): return self.ttype == EOL def is_whitespace(self): return self.ttype == WHITESPACE def is_identifier(self): return self.ttype == IDENTIFIER def is_quoted_string(self): return self.ttype == QUOTED_STRING def is_comment(self): return self.ttype == COMMENT def is_delimiter(self): return self.ttype == DELIMITER def is_eol_or_eof(self): return self.ttype == EOL or self.ttype == EOF def __eq__(self, other): if not isinstance(other, Token): return False return (self.ttype == other.ttype and self.value == other.value) def __ne__(self, other): if not isinstance(other, Token): return True return (self.ttype != other.ttype or self.value != other.value) def __str__(self): return '%d "%s"' % (self.ttype, self.value) def unescape(self): if not self.has_escape: return self unescaped = '' l = len(self.value) i = 0 while i < l: c = self.value[i] i += 1 if c == '\\': if i >= l: raise dns.exception.UnexpectedEnd c = self.value[i] i += 1 if c.isdigit(): if i >= l: raise dns.exception.UnexpectedEnd c2 = self.value[i] i += 1 if i >= l: raise dns.exception.UnexpectedEnd c3 = self.value[i] i += 1 if not (c2.isdigit() and c3.isdigit()): raise dns.exception.SyntaxError c = chr(int(c) * 100 + int(c2) * 10 + int(c3)) unescaped += c return Token(self.ttype, unescaped) # compatibility for old-style tuple tokens def __len__(self): return 2 def __iter__(self): return iter((self.ttype, self.value)) def __getitem__(self, i): if i == 0: return self.ttype elif i == 1: return self.value else: raise IndexError class Tokenizer(object): """A DNS master file format tokenizer. A token is a (type, value) tuple, where I{type} is an int, and I{value} is a string. The valid types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING, COMMENT, and DELIMITER. @ivar file: The file to tokenize @type file: file @ivar ungotten_char: The most recently ungotten character, or None. @type ungotten_char: string @ivar ungotten_token: The most recently ungotten token, or None. @type ungotten_token: (int, string) token tuple @ivar multiline: The current multiline level. This value is increased by one every time a '(' delimiter is read, and decreased by one every time a ')' delimiter is read. @type multiline: int @ivar quoting: This variable is true if the tokenizer is currently reading a quoted string. @type quoting: bool @ivar eof: This variable is true if the tokenizer has encountered EOF. @type eof: bool @ivar delimiters: The current delimiter dictionary. @type delimiters: dict @ivar line_number: The current line number @type line_number: int @ivar filename: A filename that will be returned by the L{where} method. @type filename: string """ def __init__(self, f=sys.stdin, filename=None): """Initialize a tokenizer instance. @param f: The file to tokenize. The default is sys.stdin. This parameter may also be a string, in which case the tokenizer will take its input from the contents of the string. @type f: file or string @param filename: the name of the filename that the L{where} method will return. @type filename: string """ if isinstance(f, text_type): f = StringIO(f) if filename is None: filename = '
' elif isinstance(f, binary_type): f = StringIO(f.decode()) if filename is None: filename = '
' else: if filename is None: if f is sys.stdin: filename = '
' else: filename = '
' self.file = f self.ungotten_char = None self.ungotten_token = None self.multiline = 0 self.quoting = False self.eof = False self.delimiters = _DELIMITERS self.line_number = 1 self.filename = filename def _get_char(self): """Read a character from input. @rtype: string """ if self.ungotten_char is None: if self.eof: c = '' else: c = self.file.read(1) if c == '': self.eof = True elif c == '\n': self.line_number += 1 else: c = self.ungotten_char self.ungotten_char = None return c def where(self): """Return the current location in the input. @rtype: (string, int) tuple. The first item is the filename of the input, the second is the current line number. """ return (self.filename, self.line_number) def _unget_char(self, c): """Unget a character. The unget buffer for characters is only one character large; it is an error to try to unget a character when the unget buffer is not empty. @param c: the character to unget @type c: string @raises UngetBufferFull: there is already an ungotten char """ if self.ungotten_char is not None: raise UngetBufferFull self.ungotten_char = c def skip_whitespace(self): """Consume input until a non-whitespace character is encountered. The non-whitespace character is then ungotten, and the number of whitespace characters consumed is returned. If the tokenizer is in multiline mode, then newlines are whitespace. @rtype: int """ skipped = 0 while True: c = self._get_char() if c != ' ' and c != '\t': if (c != '\n') or not self.multiline: self._unget_char(c) return skipped skipped += 1 def get(self, want_leading=False, want_comment=False): """Get the next token. @param want_leading: If True, return a WHITESPACE token if the first character read is whitespace. The default is False. @type want_leading: bool @param want_comment: If True, return a COMMENT token if the first token read is a comment. The default is False. @type want_comment: bool @rtype: Token object @raises dns.exception.UnexpectedEnd: input ended prematurely @raises dns.exception.SyntaxError: input was badly formed """ if self.ungotten_token is not None: token = self.ungotten_token self.ungotten_token = None if token.is_whitespace(): if want_leading: return token elif token.is_comment(): if want_comment: return token else: return token skipped = self.skip_whitespace() if want_leading and skipped > 0: return Token(WHITESPACE, ' ') token = '' ttype = IDENTIFIER has_escape = False while True: c = self._get_char() if c == '' or c in self.delimiters: if c == '' and self.quoting: raise dns.exception.UnexpectedEnd if token == '' and ttype != QUOTED_STRING: if c == '(': self.multiline += 1 self.skip_whitespace() continue elif c == ')': if self.multiline <= 0: raise dns.exception.SyntaxError self.multiline -= 1 self.skip_whitespace() continue elif c == '"': if not self.quoting: self.quoting = True self.delimiters = _QUOTING_DELIMITERS ttype = QUOTED_STRING continue else: self.quoting = False self.delimiters = _DELIMITERS self.skip_whitespace() continue elif c == '\n': return Token(EOL, '\n') elif c == ';': while 1: c = self._get_char() if c == '\n' or c == '': break token += c if want_comment: self._unget_char(c) return Token(COMMENT, token) elif c == '': if self.multiline: raise dns.exception.SyntaxError( 'unbalanced parentheses') return Token(EOF) elif self.multiline: self.skip_whitespace() token = '' continue else: return Token(EOL, '\n') else: # This code exists in case we ever want a # delimiter to be returned. It never produces # a token currently. token = c ttype = DELIMITER else: self._unget_char(c) break elif self.quoting: if c == '\\': c = self._get_char() if c == '': raise dns.exception.UnexpectedEnd if c.isdigit(): c2 = self._get_char() if c2 == '': raise dns.exception.UnexpectedEnd c3 = self._get_char() if c == '': raise dns.exception.UnexpectedEnd if not (c2.isdigit() and c3.isdigit()): raise dns.exception.SyntaxError c = chr(int(c) * 100 + int(c2) * 10 + int(c3)) elif c == '\n': raise dns.exception.SyntaxError('newline in quoted string') elif c == '\\': # # It's an escape. Put it and the next character into # the token; it will be checked later for goodness. # token += c has_escape = True c = self._get_char() if c == '' or c == '\n': raise dns.exception.UnexpectedEnd token += c if token == '' and ttype != QUOTED_STRING: if self.multiline: raise dns.exception.SyntaxError('unbalanced parentheses') ttype = EOF return Token(ttype, token, has_escape) def unget(self, token): """Unget a token. The unget buffer for tokens is only one token large; it is an error to try to unget a token when the unget buffer is not empty. @param token: the token to unget @type token: Token object @raises UngetBufferFull: there is already an ungotten token """ if self.ungotten_token is not None: raise UngetBufferFull self.ungotten_token = token def next(self): """Return the next item in an iteration. @rtype: (int, string) """ token = self.get() if token.is_eof(): raise StopIteration return token __next__ = next def __iter__(self): return self # Helpers def get_int(self): """Read the next token and interpret it as an integer. @raises dns.exception.SyntaxError: @rtype: int """ token = self.get().unescape() if not token.is_identifier(): raise dns.exception.SyntaxError('expecting an identifier') if not token.value.isdigit(): raise dns.exception.SyntaxError('expecting an integer') return int(token.value) def get_uint8(self): """Read the next token and interpret it as an 8-bit unsigned integer. @raises dns.exception.SyntaxError: @rtype: int """ value = self.get_int() if value < 0 or value > 255: raise dns.exception.SyntaxError( '%d is not an unsigned 8-bit integer' % value) return value def get_uint16(self): """Read the next token and interpret it as a 16-bit unsigned integer. @raises dns.exception.SyntaxError: @rtype: int """ value = self.get_int() if value < 0 or value > 65535: raise dns.exception.SyntaxError( '%d is not an unsigned 16-bit integer' % value) return value def get_uint32(self): """Read the next token and interpret it as a 32-bit unsigned integer. @raises dns.exception.SyntaxError: @rtype: int """ token = self.get().unescape() if not token.is_identifier(): raise dns.exception.SyntaxError('expecting an identifier') if not token.value.isdigit(): raise dns.exception.SyntaxError('expecting an integer') value = long(token.value) if value < 0 or value > long(4294967296): raise dns.exception.SyntaxError( '%d is not an unsigned 32-bit integer' % value) return value def get_string(self, origin=None): """Read the next token and interpret it as a string. @raises dns.exception.SyntaxError: @rtype: string """ token = self.get().unescape() if not (token.is_identifier() or token.is_quoted_string()): raise dns.exception.SyntaxError('expecting a string') return token.value def get_identifier(self, origin=None): """Read the next token and raise an exception if it is not an identifier. @raises dns.exception.SyntaxError: @rtype: string """ token = self.get().unescape() if not token.is_identifier(): raise dns.exception.SyntaxError('expecting an identifier') return token.value def get_name(self, origin=None): """Read the next token and interpret it as a DNS name. @raises dns.exception.SyntaxError: @rtype: dns.name.Name object""" token = self.get() if not token.is_identifier(): raise dns.exception.SyntaxError('expecting an identifier') return dns.name.from_text(token.value, origin) def get_eol(self): """Read the next token and raise an exception if it isn't EOL or EOF. @raises dns.exception.SyntaxError: @rtype: string """ token = self.get() if not token.is_eol_or_eof(): raise dns.exception.SyntaxError( 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value)) return token.value def get_ttl(self): token = self.get().unescape() if not token.is_identifier(): raise dns.exception.SyntaxError('expecting an identifier') return dns.ttl.from_text(token.value) PK g\%. . renderer.pynu [ # Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """Help for building DNS wire format messages""" from io import BytesIO import struct import random import time import dns.exception import dns.tsig from ._compat import long QUESTION = 0 ANSWER = 1 AUTHORITY = 2 ADDITIONAL = 3 class Renderer(object): """Helper class for building DNS wire-format messages. Most applications can use the higher-level L{dns.message.Message} class and its to_wire() method to generate wire-format messages. This class is for those applications which need finer control over the generation of messages. Typical use:: r = dns.renderer.Renderer(id=1, flags=0x80, max_size=512) r.add_question(qname, qtype, qclass) r.add_rrset(dns.renderer.ANSWER, rrset_1) r.add_rrset(dns.renderer.ANSWER, rrset_2) r.add_rrset(dns.renderer.AUTHORITY, ns_rrset) r.add_edns(0, 0, 4096) r.add_rrset(dns.renderer.ADDTIONAL, ad_rrset_1) r.add_rrset(dns.renderer.ADDTIONAL, ad_rrset_2) r.write_header() r.add_tsig(keyname, secret, 300, 1, 0, '', request_mac) wire = r.get_wire() @ivar output: where rendering is written @type output: BytesIO object @ivar id: the message id @type id: int @ivar flags: the message flags @type flags: int @ivar max_size: the maximum size of the message @type max_size: int @ivar origin: the origin to use when rendering relative names @type origin: dns.name.Name object @ivar compress: the compression table @type compress: dict @ivar section: the section currently being rendered @type section: int (dns.renderer.QUESTION, dns.renderer.ANSWER, dns.renderer.AUTHORITY, or dns.renderer.ADDITIONAL) @ivar counts: list of the number of RRs in each section @type counts: int list of length 4 @ivar mac: the MAC of the rendered message (if TSIG was used) @type mac: string """ def __init__(self, id=None, flags=0, max_size=65535, origin=None): """Initialize a new renderer. @param id: the message id @type id: int @param flags: the DNS message flags @type flags: int @param max_size: the maximum message size; the default is 65535. If rendering results in a message greater than I{max_size}, then L{dns.exception.TooBig} will be raised. @type max_size: int @param origin: the origin to use when rendering relative names @type origin: dns.name.Name or None. """ self.output = BytesIO() if id is None: self.id = random.randint(0, 65535) else: self.id = id self.flags = flags self.max_size = max_size self.origin = origin self.compress = {} self.section = QUESTION self.counts = [0, 0, 0, 0] self.output.write(b'\x00' * 12) self.mac = '' def _rollback(self, where): """Truncate the output buffer at offset I{where}, and remove any compression table entries that pointed beyond the truncation point. @param where: the offset @type where: int """ self.output.seek(where) self.output.truncate() keys_to_delete = [] for k, v in self.compress.items(): if v >= where: keys_to_delete.append(k) for k in keys_to_delete: del self.compress[k] def _set_section(self, section): """Set the renderer's current section. Sections must be rendered order: QUESTION, ANSWER, AUTHORITY, ADDITIONAL. Sections may be empty. @param section: the section @type section: int @raises dns.exception.FormError: an attempt was made to set a section value less than the current section. """ if self.section != section: if self.section > section: raise dns.exception.FormError self.section = section def add_question(self, qname, rdtype, rdclass=dns.rdataclass.IN): """Add a question to the message. @param qname: the question name @type qname: dns.name.Name @param rdtype: the question rdata type @type rdtype: int @param rdclass: the question rdata class @type rdclass: int """ self._set_section(QUESTION) before = self.output.tell() qname.to_wire(self.output, self.compress, self.origin) self.output.write(struct.pack("!HH", rdtype, rdclass)) after = self.output.tell() if after >= self.max_size: self._rollback(before) raise dns.exception.TooBig self.counts[QUESTION] += 1 def add_rrset(self, section, rrset, **kw): """Add the rrset to the specified section. Any keyword arguments are passed on to the rdataset's to_wire() routine. @param section: the section @type section: int @param rrset: the rrset @type rrset: dns.rrset.RRset object """ self._set_section(section) before = self.output.tell() n = rrset.to_wire(self.output, self.compress, self.origin, **kw) after = self.output.tell() if after >= self.max_size: self._rollback(before) raise dns.exception.TooBig self.counts[section] += n def add_rdataset(self, section, name, rdataset, **kw): """Add the rdataset to the specified section, using the specified name as the owner name. Any keyword arguments are passed on to the rdataset's to_wire() routine. @param section: the section @type section: int @param name: the owner name @type name: dns.name.Name object @param rdataset: the rdataset @type rdataset: dns.rdataset.Rdataset object """ self._set_section(section) before = self.output.tell() n = rdataset.to_wire(name, self.output, self.compress, self.origin, **kw) after = self.output.tell() if after >= self.max_size: self._rollback(before) raise dns.exception.TooBig self.counts[section] += n def add_edns(self, edns, ednsflags, payload, options=None): """Add an EDNS OPT record to the message. @param edns: The EDNS level to use. @type edns: int @param ednsflags: EDNS flag values. @type ednsflags: int @param payload: The EDNS sender's payload field, which is the maximum size of UDP datagram the sender can handle. @type payload: int @param options: The EDNS options list @type options: list of dns.edns.Option instances @see: RFC 2671 """ # make sure the EDNS version in ednsflags agrees with edns ednsflags &= long(0xFF00FFFF) ednsflags |= (edns << 16) self._set_section(ADDITIONAL) before = self.output.tell() self.output.write(struct.pack('!BHHIH', 0, dns.rdatatype.OPT, payload, ednsflags, 0)) if options is not None: lstart = self.output.tell() for opt in options: stuff = struct.pack("!HH", opt.otype, 0) self.output.write(stuff) start = self.output.tell() opt.to_wire(self.output) end = self.output.tell() assert end - start < 65536 self.output.seek(start - 2) stuff = struct.pack("!H", end - start) self.output.write(stuff) self.output.seek(0, 2) lend = self.output.tell() assert lend - lstart < 65536 self.output.seek(lstart - 2) stuff = struct.pack("!H", lend - lstart) self.output.write(stuff) self.output.seek(0, 2) after = self.output.tell() if after >= self.max_size: self._rollback(before) raise dns.exception.TooBig self.counts[ADDITIONAL] += 1 def add_tsig(self, keyname, secret, fudge, id, tsig_error, other_data, request_mac, algorithm=dns.tsig.default_algorithm): """Add a TSIG signature to the message. @param keyname: the TSIG key name @type keyname: dns.name.Name object @param secret: the secret to use @type secret: string @param fudge: TSIG time fudge @type fudge: int @param id: the message id to encode in the tsig signature @type id: int @param tsig_error: TSIG error code; default is 0. @type tsig_error: int @param other_data: TSIG other data. @type other_data: string @param request_mac: This message is a response to the request which had the specified MAC. @type request_mac: string @param algorithm: the TSIG algorithm to use @type algorithm: dns.name.Name object """ self._set_section(ADDITIONAL) before = self.output.tell() s = self.output.getvalue() (tsig_rdata, self.mac, ctx) = dns.tsig.sign(s, keyname, secret, int(time.time()), fudge, id, tsig_error, other_data, request_mac, algorithm=algorithm) keyname.to_wire(self.output, self.compress, self.origin) self.output.write(struct.pack('!HHIH', dns.rdatatype.TSIG, dns.rdataclass.ANY, 0, 0)) rdata_start = self.output.tell() self.output.write(tsig_rdata) after = self.output.tell() assert after - rdata_start < 65536 if after >= self.max_size: self._rollback(before) raise dns.exception.TooBig self.output.seek(rdata_start - 2) self.output.write(struct.pack('!H', after - rdata_start)) self.counts[ADDITIONAL] += 1 self.output.seek(10) self.output.write(struct.pack('!H', self.counts[ADDITIONAL])) self.output.seek(0, 2) def write_header(self): """Write the DNS message header. Writing the DNS message header is done after all sections have been rendered, but before the optional TSIG signature is added. """ self.output.seek(0) self.output.write(struct.pack('!HHHHHH', self.id, self.flags, self.counts[0], self.counts[1], self.counts[2], self.counts[3])) self.output.seek(0, 2) def get_wire(self): """Return the wire format message. @rtype: string """ return self.output.getvalue() PK g\EG exception.pynu [ # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose with or without fee is hereby granted, # provided that the above copyright notice and this permission notice # appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. """Common DNS Exceptions.""" class DNSException(Exception): """Abstract base class shared by all dnspython exceptions. It supports two basic modes of operation: a) Old/compatible mode is used if __init__ was called with empty **kwargs. In compatible mode all *args are passed to standard Python Exception class as before and all *args are printed by standard __str__ implementation. Class variable msg (or doc string if msg is None) is returned from str() if *args is empty. b) New/parametrized mode is used if __init__ was called with non-empty **kwargs. In the new mode *args has to be empty and all kwargs has to exactly match set in class variable self.supp_kwargs. All kwargs are stored inside self.kwargs and used in new __str__ implementation to construct formatted message based on self.fmt string. In the simplest case it is enough to override supp_kwargs and fmt class variables to get nice parametrized messages. """ msg = None # non-parametrized message supp_kwargs = set() # accepted parameters for _fmt_kwargs (sanity check) fmt = None # message parametrized with results from _fmt_kwargs def __init__(self, *args, **kwargs): self._check_params(*args, **kwargs) if kwargs: self.kwargs = self._check_kwargs(**kwargs) self.msg = str(self) else: self.kwargs = dict() # defined but empty for old mode exceptions if self.msg is None: # doc string is better implicit message than empty string self.msg = self.__doc__ if args: super(DNSException, self).__init__(*args) else: super(DNSException, self).__init__(self.msg) def _check_params(self, *args, **kwargs): """Old exceptions supported only args and not kwargs. For sanity we do not allow to mix old and new behavior.""" if args or kwargs: assert bool(args) != bool(kwargs), \ 'keyword arguments are mutually exclusive with positional args' def _check_kwargs(self, **kwargs): if kwargs: assert set(kwargs.keys()) == self.supp_kwargs, \ 'following set of keyword args is required: %s' % ( self.supp_kwargs) return kwargs def _fmt_kwargs(self, **kwargs): """Format kwargs before printing them. Resulting dictionary has to have keys necessary for str.format call on fmt class variable. """ fmtargs = {} for kw, data in kwargs.items(): if isinstance(data, (list, set)): # convert list of
to list of str(
) fmtargs[kw] = list(map(str, data)) if len(fmtargs[kw]) == 1: # remove list brackets [] from single-item lists fmtargs[kw] = fmtargs[kw].pop() else: fmtargs[kw] = data return fmtargs def __str__(self): if self.kwargs and self.fmt: # provide custom message constructed from keyword arguments fmtargs = self._fmt_kwargs(**self.kwargs) return self.fmt.format(**fmtargs) else: # print *args directly in the same way as old DNSException return super(DNSException, self).__str__() class FormError(DNSException): """DNS message is malformed.""" class SyntaxError(DNSException): """Text input is malformed.""" class UnexpectedEnd(SyntaxError): """Text input ended unexpectedly.""" class TooBig(DNSException): """The DNS message is too big.""" class Timeout(DNSException): """The DNS operation timed out.""" supp_kwargs = set(['timeout']) fmt = "The DNS operation timed out after {timeout} seconds" PK g\A ' __pycache__/opcode.cpython-36.opt-1.pycnu [ 3 bW @ s d Z ddlZdZdZdZdZdZeeeeedZe dd ej D ZG d d dejj Zdd Zdd Zdd Zdd Zdd ZdS )zDNS Opcodes. N )QUERYIQUERYSTATUSNOTIFYUPDATEc c s | ]\}}||fV qd S )N ).0xyr r /usr/lib/python3.6/opcode.py
&