线程
线程是应用程序中工作的最小单元,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。Threading用于线程相关操作
使用多线程和单线程的运行速度对比
import threadingimport timedef run(n): print("task", n) time.sleep(2)t1 = threading.Thread(target=run, args=('t1',))t2 = threading.Thread(target=run, args=('t2',))# t1.start()# t2.start()run(1)run(2)
- start 线程准备就绪,等待CPU调度
- setName 为线程设置名称
- getName 获取线程名称
- setDaemon 设置为后台线程或前台线程(默认)如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止,如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
- join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
- run 线程被cpu调度后自动执行线程对象的run方法
多线程例子
import threadingimport timedef run(n): print("running task", n) time.sleep(2) print("task done", n)start_time = time.time()t_objs = [] # 存线程实例for i in range(50): t = threading.Thread(target=run, args=("t-%s" % i,)) t_objs.append(t) t.start()for t in t_objs: # 循环线程实例列表,等待所有线程执行完毕 t.join()print("all thread has finished")print("cost:", time.time() - start_time)
线程锁(互斥锁Mutex)
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁即:
同一时刻允许一个线程执行操作。import timeimport threadingdef run(n): global num # 在每个线程中都获取这个全局变 time.sleep(0.5) num += 1 # 对此公共变量进行-1操作 print("num:", num) num = 0 # 设定一个共享变量thread_list = []for i in range(100): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() thread_list.append(t)for t in thread_list: # 等待所有线程执行完毕 t.join()print('final num:', num)
正常来讲,这个num结果应该是0,Ubuntu上结果有时不为0,在python2.7下。python3.0+上可能自动加上锁
import timeimport threadingdef addNum(): global num # 在每个线程中都获取这个全局变量 print('--get num:', num) time.sleep(1) lock.acquire() # 修改数据前加锁 num -= 1 # 对此公共变量进行-1操作 lock.release() # 修改后释放num = 100 # 设定一个共享变量thread_list = []lock = threading.Lock() # 生成全局锁for i in range(num): t = threading.Thread(target=addNum) t.start() thread_list.append(t)for t in thread_list: # 等待所有线程执行完毕 t.join()print('final num:', num)
RLOCK(递归锁)
大锁加小锁
import threading,time def run1(): print("grab the first part data") lock.acquire() global num num +=1 lock.release() return numdef run2(): print("grab the second part data") lock.acquire() global num2 num2+=1 lock.release() return num2def run3(): lock.acquire() res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res,res2) if __name__ == '__main__': num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count())else: print('----all threads done---') print(num,num2)
信号量(Semaphone)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s\n" %n) semaphore.release() if __name__ == '__main__': num= 0 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() != 1: pass #print threading.active_count()else: print('----all threads done---') print(num)
Timer
定时器,指定n秒后执行某操作
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello)t.start() # after 1 seconds, "hello, world" will be printed
Events(事件)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
import threading,timeimport randomdef light(): if not event.isSet(): event.set() #wait就不阻塞 #绿灯状态 count = 0 while True: if count < 10: print('\033[42;1m--green light on---\033[0m') elif count <13: print('\033[43;1m--yellow light on---\033[0m') elif count <20: if event.isSet(): event.clear() print('\033[41;1m--red light on---\033[0m') else: count = 0 event.set() #打开绿灯 time.sleep(1) count +=1def car(n): while 1: time.sleep(random.randrange(10)) if event.isSet(): #绿灯 print("car [%s] is running.." % n) else: print("car [%s] is waiting for the red light.." %n)if __name__ == '__main__': event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(3): t = threading.Thread(target=car,args=(i,)) t.start()
queue(队列)
- queue.Queue 先入先出
- queue.LifoQueue 后入先出
- queue.PriorityQueue 存储数据时可设置优先级的队列
import queue'''作用: 1.解耦,使程序之间实现松耦合 2.提高处理效率'''"""queue.Queue(maxsize=0) #first in first outqueue.LifoQueue(maxsize=0) #last in fisrt out class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列Queue.qsize() #获取队列中有几个数据Queue.empty() #return True if empty Queue.full() # return True if full Queue.put(item, block=True, timeout=None) #向队列中添加数据Queue.put_nowait(item) #添加数据超出队列的最大限制,不等等待,抛出异常Queue.get(block=True, timeout=None) #从队列中获取数据Queue.get_nowait() #从队列中获取数据,如果队列已为空,则不等待,抛出异常Queue.task_done()Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.Queue.join() #block直到queue被消费完毕"""
队列和列表的区别:
队列取出一块硬盘,队列里自动减少一块。列表相当于复制一块。子进程相对于父进程,两个进程相互独立,分别属于两块内存空间
生产者消费者模型
什么是生产者消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。import time,randomimport queue,threadingq = queue.Queue()def Producer(name): count = 0 while count < 20: time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1def Consumer(name): count = 0 while count < 20: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1p1 = threading.Thread(target=Producer, args=('A',))c1 = threading.Thread(target=Consumer, args=('B',))p1.start()c1.start()
进程
线程与进程的区别
- 线程是执行的指令集, 进程是资源的集合
- 线程共享内存空间,进程的内存是独立的
- 同一个进程的线程之间可以直接通信,两个进程想要通信,必须通过一个中间代理来实现
- 创建新线程很简单(快), 创建新进程需要对其父进程进行一次克隆(慢)。
- 一个线程可以控制和操作同一个进程里的其他线程,但是进程只能操作子进程
- 对主线程的更改(取消、优先级变更等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。
既生进程何生线程
进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:
- 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
- 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
多进程的应用场景
- io 操作不占用cpu
- 计算占用cpu
- python多线程,不适合cpu密集操作型的任务,适合io操作密集型的任务
from multiprocessing import Processimport osdef info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) # print("\n\n")def f(name): info('\033[31;1m function f\033[0m') print('hello', name)if __name__ == '__main__': info('\033[32;1m main process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()
进程之间的通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
- Queues
from multiprocessing import Processfrom multiprocessing import Queueimport queuedef f(q): q.put([42, 'hello'])if __name__ == '__main__': # q = queue.Queue() # 线程queue q = Queue() p = Process(target=f, args=(q,)) # 复制了一份 p.start() print(q.get()) p.join()
- Pipes
from multiprocessing import Processfrom multiprocessing import Pipedef f(conn): conn.send([42, None, 'hello']) print("from parents:", conn.recv()) conn.close()if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("hi son") p.join()
- Managers
from multiprocessing import Process, Managerimport osdef f(d, list_1): d[os.getpid()] = os.getpid() list_1.append(os.getpid()) print(list_1)if __name__ == '__main__': with Manager() as manager: d = manager.dict() # 生成一个字典,可在多个进程间共享和传递 list_1 = manager.list(range(3)) # 生成一个列表,可在多个进程间共享和传递 p_list = [] for i in range(10): p = Process(target=f, args=(d, list_1)) p.start() p_list.append(p) for res in p_list: # 等待结果 res.join() print(d) print(list_1)
进程池
进程池的作用:限制同一时间使用的进程数量。
from multiprocessing import Processfrom multiprocessing import Poolimport timeimport osdef Foo(i): time.sleep(2) print("in process", os.getpid()) return i + 100def Bar(arg): print('-->exec done:', arg, os.getpid())# print(__name__)if __name__ == '__main__': pool = Pool(processes=5) # 允许进程池同时放入5个进程 print("主进程", os.getpid()) for i in range(10): # 启动十个线程 pool.apply_async(func=Foo, args=(i,), callback=Bar) # pool.apply(func=Foo, args=(i,)) # 串行 # pool.apply_async(func=Foo, args=(i,)) # 并行 pool.close() # 必须先close()后join() pool.join() # 进程池中进程执行完P毕后再关闭,如果注释,那么程序直接关闭。 print('end')
进程同步&进程锁
进程锁存在的意义是,因为多进程共享同一个屏幕,控制屏幕打印的时候不乱。
from multiprocessing import Lockfrom multiprocessing import Processdef f(l, i): l.acquire() print('hello world', i) l.release()if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
协程(Conroutine)
协程是一种用户态的轻量级线程。可以在单线程下实现并发
举个例子:
周末我在家里休息,假如我先洗漱,再煮饭,再下载电影看会很慢,用了协程
的效果就好比,我在下载电影的时候去点火煮饭,此时我马上洗漱,等我洗漱好了,饭也好了,吃完饭了,电影下好了,我可以看了.
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程。协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
- 缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
符合什么条件就能称之为协程?
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 一个协程遇到IO操作自动切换到其它协程
yeild协程例子
def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name, new_baozi)) # time.sleep(1)def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n)if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
Gevent
#Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的#主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系#统进程的内部,但它们被协作式地调度。import gevent#遇到IO操作就进行切换(自动切换)def test1(): print('a') gevent.sleep(2) #模拟IO处理 print('f')def test2(): print('b') gevent.sleep(1) print('e')def test3(): print('c') gevent.sleep(0) print('d')gevent.joinall([ gevent.spawn(test1), #生成 gevent.spawn(test2), gevent.spawn(test3)])
greenlet
#greenlet是一个用C实现的协程模块,相比与python自带的yield,# 它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generatorfrom greenlet import greenletdef test1(): print('a') gr2.switch() print('c') gr2.switch()def test2(): print('b') gr1.switch() print('d')gr1 = greenlet(test1)gr2 = greenlet(test2)#手动的IO切换gr1.switch()
Geven版的sockerserver
#使用协程实现socketserver的多并发import socketimport geventfrom gevent import monkeymonkey.patch_all()def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli)def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close()if __name__ == '__main__': server(8001)