utf-8 消费 结束时间 open 操作 earch 处理 ctime
运行多个线程同时运行几个不同的程序类似,但具有以下优点:
进程内共享多线程与主线程相同的数据空间,如果他们是独立的进程,可以共享信息或互相沟通更容易.
线程有时称为轻量级进程,他们并不需要多大的内存开销,他们关心的不是过程便宜.
一个线程都有一个开始,执行顺序,并得出结论。它有一个指令指针,保持它的上下文内正在运行的跟踪.
(1)、它可以是抢占(中断)
(2)、它可以暂时搁置(又称睡眠),而其他线程正在运行
看一下以下的小案例:
- import thread
- from time import sleep, ctime
- def loop0():
- print "loop 0开始时间:",ctime() #第一个函数loop0开始时间
- sleep(4) # 休眠4秒
- print "loop 0 结束时间:_',ctime()
- def loopl():
- print "loop 1 开始时间:",ctime()
- sleep(2)
- print "loop 1 结束时间:_',ctime()
- def main():
- print "程序开始时间:",ctime()
- thread.start_new_thread(loop0,()) # 第二个参数是必不可少的,即使loope没有传递参数,仍然要写一个空元组
- thread.stant_new_thnead(loopl,())
- sleep(6) #这里休眠6秒的原因是确保两个线程己经执行完毕,主线程才接着执行下面的语句
- print "程序结束时间:",ctime()
- if __name__ == '__main__':
- main()
在 web 测试中,不可避免的一个测试就是浏览器兼容性测试,在没有自动化测试前,我们总是苦逼的在一台或多台机器上安装 N 种浏览器,然后手工在不同的浏览器上验证主业务流程和关键功能模块功能,以检测不同浏览器或不同版本浏览器上,我们的 web 应用是否可以正常工作。如果我们使用 selenium webdriver,那我们就能够自动的在 IE、firefox、chrome、等不同浏览器上运行测试用例。为了能在同一台机器上不同浏览器上同时执行测试用例,我们需要多线程技术。下面我们基于 python 的多线程技术来尝试同时启动多个浏览器进行 selenium 自动化测试。
- #-*- coding:utf-8
- from selenium import webdriver
- import sys
- from time import sleep
- from threading import Thread
- reload(sys)
- sys.setdefaultencoding("utf-8")
- def test_baidu_seanch(browsen, url):
- driver = None
- #可添加更多浏览器支持进来
- if browser == "ie":
- driver = webdriver.Ie()
- elif browser == "finefox":
- driver = webdriver.Firefox()
- elif browser == "chrome":
- driver = webdriver.Chnome()
- if driver == None:
- exit()
- driver.get(url)
- sleep(3)
- driver.find_element_by_id("xxx").send_keys(u"xxxx")
- sleep(3)
- driver.find_element_by_id("xxx").click()
- sleep(3)
- driver.quit()
- if __name__ == "__main__":
- #浏览器和首页url
- data = {
- "ie":"http://www.xxx.com",
- "firefox": "http: //www.xxx.com",
- "chrome":"http://www.xxxx.com"
- }
- #构建线程
- threads =[]
- for b, url in data.items():
- t = Thread(target=test_baidu_search,angs=(b, url))
- threads.append(t)
- #启动所有线程
- for thr in threads:
- thr.start()
threading 高级线程接口
- import threading
- class MyThnead(threading.Thread):
- def __init__(self, name=None):
- threading.Thread.__init__(self)
- self.name = name
- def run(self):
- print self.name
- def test():
- for i in range(0, 100):
- t = MyThread("thread_" + str(i))
- t.start()
- if __name__ == '__main__':
- test()
Lock 线程锁
这里创建实现了一个计数器 count 这个全局变量会被多个线程同时操作,使其能够被顺序相加,需要靠线程锁的帮助。
- #-*- encoding: utf-8
- import threading
- import time
- class Test(threading.Thread):
- def __init__(self, num):
- threading.Thread.—init—(self)
- self._run_num = num
- def run(self):
- global count, mutex
- threadname = threading.currentThnead().getName()
- for x in nange(int(self._run_num)):
- mutex.acquire()
- count = count + 1
- mutex.release()
- print (thneadname, x, count)
- time.sleep(l)
- if __name__ == '__main__':
- global count^ mutex
- threads =[]
- num = 5
- count =0
- #创建锁
- mutex = threading.Lock()
- #创建线程对象
- for x in nange(num):
- threads.append(Test(10))
- #启动线程
- for t in threads:
- t. start()
- #等待子线程结束
- for t in threads:
- t.join()
Queue 队列
- #!/usr/bin/env python
- import Queue
- import threading
- import urllib2
- import time
- hosts = ["http://xxxx.com", "http://xxxxx.com","http://xxxxxx.com","http://xxxxx.com", "http://xxxxx.com"]
- queue = Queue.Queue()
- class ThreadUrl(thneading.Thread):
- """Threaded Uni Grab
- def __init__(self, queue):
- threading.Thread.__init__(self)
- self.queue = queue
- def run(self):
- while True:
- #gnabs host from queue
- host = self.queue.get()
- url = urllib2.urlopen(host)
- #gnabs urls of hosts and prints first 1024 bytes of page
- uni = urllib2.urlopen(host)
- #signals to queue job is done
- self.queue.task_done()
- start = time.time()
- def main():
- #spawn a pool of threads, and pass them queue instance
- for i in nange(5):
- t = ThreadUrl(queue)
- t.setDaemon(True)
- t.start()
- #populate queue with data
- for host in hosts:
- queue.put(host)
- #wait on the queue until everything has been processed
- queue.join()
- main()
- print "Elapsed Time: %s" % (time.time() - start)
当线程需要共享数据或资源时,线程可能会变得复杂。线程模块提供许多同步原语,包括信号量,条件变量,事件和锁。虽然存在这些选项,但它被认为是最佳做法,而是专注于使用队列。队列更容易处理,并且使线程编程更安全,因为它们有效地将资源访问单个线程,并允许更清晰和更可读的设计模式。
首先创建一个程序,该程序将按顺序或一个接一个地获取网站的 URL,并打印页面的前 1024 个字节。这是一个经典的例子,可以使用线程更快地完成任务。首先,让我们一起使用这个 urllib2 模块来抓住这些页面,然后再使用代码:
- import urllib2
- import time
- hosts = ["http://xxxx.com", "http://xxxxx.com","http://xxxxxx.com","http://xxxxx.com", "http://xxxxx.com"]
- start = time.time()
- for host in hosts:
- url = urllib2.urlopen(host)
- print url.read(1024)
- print "Elapsed Time: %s" % (time.time() - start)
导入两个模块首先, urllib2 模块是什么是繁重的抓住网页。其次,通过调用创建开始时间值 time.time(),然后再次调用它,并减去初始值以确定程序执行多长时间。最后,在查看程序的速度时," 两个半秒 " 的结果是不可怕的,但是如果您有数百个网页来检索,则考虑到目前的平均值,大概需要 50 秒。看看如何创建一个线程版本加快速度:
- import Queue
- import threading
- import urllib2
- import time
- hosts = ["http://xxxx.com", "http://xxxxx.com","http://xxxxxx.com","http://xxxxx.com", "http://xxxxx.com"]
- queue = Queue.Queue()
- class ThreadUrl(thneading.Thread):
- """Threaded Uni Grab
- def __init__(self, queue):
- threading.Thread.__init__(self)
- self.queue = queue
- def run(self):
- while True:
- #gnabs host from queue
- host = self.queue.get()
- url = urllib2.urlopen(host)
- #gnabs urls of hosts and prints first 1024 bytes of page
- print url.read(1024)
- #signals to queue job is done
- self.queue.task_done()
- def main():
- #spawn a pool of threads, and pass them queue instance
- for i in nange(5):
- t = ThreadUrl(queue)
- t.setDaemon(True)
- t.start()
- for host in hosts:
- queue.put(host)
- queue.join()
- main()
- print "Elapsed Time: %s" % (time.time() - start)
上面的案例并不比第一个线程示例复杂得多,这要归功于使用排队模块。这种模式是使用 Python 的线程的一种非常常见的推荐方式。步骤描述如下:
1. 创建一个实例,Queue.Queue() 然后用数据填充它。
2. 将填充数据的实例传递到您从继承中创建的线程类 threading.Thread。
3. 产生一个守护进程池线程。
4. 一次将一个项目拉出队列,并使用线程内的数据,运行方法来完成工作。
5. 完成工作后,向 queue.task_done() 任务完成发送一个信号到队列。
6. 加入队列,这意味着等到队列为空,然后退出主程序。
来源: http://www.bubuko.com/infodetail-2279792.html