- import os
- import urllib2
- import time
- import multiprocessing.dummy as multiprocessing
- import string
- from random import choice
- import socket
- from ctypes import c_int
- import tempfile
- import dummy
- from logger import log
- "Smart Downloading Module. Written by Itay Brandes."
- shared_bytes_var = multiprocessing.Value(c_int, 0) # a ctypes var that counts the bytes already downloaded
- def DownloadFile(url, path, startByte=0, endByte=None, ShowProgress=True):
- '''
- Function downloads file.
- @param url: File url address.
- @param path: Destination file path.
- @param startByte: Start byte.
- @param endByte: End byte. Will work only if server supports HTTPRange headers.
- @param ShowProgress: If true, shows textual progress bar.
- @return path: Destination file path.
- '''
- url = url.replace(' ', '%20')
- headers = {}
- if endByte is not None:
- headers['Range'] = 'bytes=%d-%d' % (startByte,endByte)
- req = urllib2.Request(url, headers=headers)
- try:
- urlObj = urllib2.urlopen(req, timeout=4)
- except urllib2.HTTPError, e:
- if "HTTP Error 416" in str(e):
- # HTTP 416 Error: Requested Range Not Satisfiable. Happens when we ask
- # for a range that is not available on the server. It will happen when
- # the server will try to send us a .html page that means something like
- # "you opened too many connections to our server". If this happens, we
- # will wait for the other threads to finish their connections and try again.
- log.warning("Thread didn't got the file it was expecting. Retrying...")
- time.sleep(5)
- return DownloadFile(url, path, startByte, endByte, ShowProgress)
- else:
- raise e
- f = open(path, 'wb')
- meta = urlObj.info()
- try:
- filesize = int(meta.getheaders("Content-Length")[0])
- except IndexError:
- log.warning("Server did not send Content-Length.")
- ShowProgress=False
- filesize_dl = 0
- block_sz = 8192
- while True:
- try:
- buff = urlObj.read(block_sz)
- except (socket.timeout, socket.error, urllib2.HTTPError), e:
- dummy.shared_bytes_var.value -= filesize_dl
- raise e
- if not buff:
- break
- filesize_dl += len(buff)
- try:
- dummy.shared_bytes_var.value += len(buff)
- except AttributeError:
- pass
- f.write(buff)
- if ShowProgress:
- status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (filesize_dl / 1024.0 / 1024.0,
- filesize / 1024.0 / 1024.0, progress_bar(1.0*filesize_dl/filesize),
- filesize_dl * 100.0 / filesize)
- status += chr(8)*(len(status)+1)
- print status,
- if ShowProgress:
- print "\\n"
- f.close()
- return path
- def DownloadFile_Parall(url, path=None, processes=6,
- minChunkFile=1024**2, nonBlocking=False):
- '''
- Function downloads file parally.
- @param url: File url address.
- @param path: Destination file path.
- @param processes: Number of processes to use in the pool.
- @param minChunkFile: Minimum chunk file in bytes.
- @param nonBlocking: If true, returns (mapObj, pool). A list of file parts will be returned
- from the mapObj results, and the developer must join them himself.
- Developer also must close and join the pool.
- @return mapObj: Only if nonBlocking is True. A multiprocessing.pool.AsyncResult object.
- @return pool: Only if nonBlocking is True. A multiprocessing.pool object.
- '''
- from HTTPQuery import Is_ServerSupportHTTPRange
- global shared_bytes_var
- shared_bytes_var.value = 0
- url = url.replace(' ', '%20')
- if not path:
- path = get_rand_filename(os.environ['temp'])
- if not os.path.exists(os.path.dirname(path)):
- os.makedirs(os.path.dirname(path))
- log.debug("Downloading to %s..." % path)
- urlObj = urllib2.urlopen(url)
- meta = urlObj.info()
- filesize = int(meta.getheaders("Content-Length")[0])
- if filesize/processes > minChunkFile and Is_ServerSupportHTTPRange(url):
- args = []
- pos = 0
- chunk = filesize/processes
- for i in range(processes):
- startByte = pos
- endByte = pos + chunk
- if endByte > filesize-1:
- endByte = filesize-1
- args.append([url, path+".%.3d" % i, startByte, endByte, False])
- pos += chunk+1
- else:
- args = [[url, path+".000", None, None, False]]
- log.debug("Running %d processes..." % processes)
- pool = multiprocessing.Pool(processes, initializer=_initProcess,initargs=(shared_bytes_var,))
- mapObj = pool.map_async(lambda x: DownloadFile(*x) , args)
- if nonBlocking:
- return mapObj, pool
- while not mapObj.ready():
- status = r"%.2f MB / %.2f MB %s [%3.2f%%]" % (shared_bytes_var.value / 1024.0 / 1024.0,
- filesize / 1024.0 / 1024.0, progress_bar(1.0*shared_bytes_var.value/filesize),
- shared_bytes_var.value * 100.0 / filesize)
- status = status + chr(8)*(len(status)+1)
- print status,
- time.sleep(0.1)
- file_parts = mapObj.get()
- pool.terminate()
- pool.join()
- combine_files(file_parts, path)
- def combine_files(parts, path):
- '''
- Function combines file parts.
- @param parts: List of file paths.
- @param path: Destination path.
- '''
- with open(path,'wb') as output:
- for part in parts:
- with open(part,'rb') as f:
- output.writelines(f.readlines())
- os.remove(part)
- def progress_bar(progress, length=20):
- '''
- Function creates a textual progress bar.
- @param progress: Float number between 0 and 1 describes the progress.
- @param length: The length of the progress bar in chars. Default is 20.
- '''
- length -= 2 # The bracket are 2 chars long.
- return "[" + "#"*int(progress*length) + "-"*(length-int(progress*length)) + "]"
- def get_rand_filename(dir_=os.getcwd()):
- "Function returns a non-existent random filename."
- return tempfile.mkstemp('.tmp', '', dir_)[1]
- def _initProcess(x):
- dummy.shared_bytes_var = x
- #该片段来自于http://www.codesnippet.cn/detail/191120137280.html
来源: http://www.codesnippet.cn/detail/191120137280.html