python Multithreading of crawler 、 Multi process
Using multiple processes 、 Multithreading to write crawler code can effectively improve the efficiency of crawler crawling the target website .
Multi person learning python, I don't know where to start .
Many people study python, After mastering the basic grammar , I don't know where to look for cases to start .
A lot of people who have done cases , But I don't know how to learn more advanced knowledge .
So for these three kinds of people , I will provide you with a good learning platform , Get a free video tutorial , electronic text , And the source code of the course !??¤
QQ Group :1057034340
One 、 What are processes and threads
quote Liao Xuefeng's official website Explanation of processes and threads :
process : For the operating system , A task is a process (Process), For example, opening a browser is to start a browser process , Open a notepad and start a notepad process , Opening two notebooks starts two Notepad processes , Open one Word It starts a Word process .
Threads : Some processes do more than one thing at a time , such as Word, It can type at the same time 、 Spelling check 、 Printing and so on . Within a process , Do many things at the same time , You need to run multiple “ The subtasks ”, We put these in the process “ The subtasks ” Called thread (Thread).
Each process has to do at least one thing , therefore , A process has at least one thread .
Two 、 Multi process
Four ways to implement multiprocessing
os.fork()
python Of os Modules encapsulate common system calls , among , Multi process calls are fork() function . The specific example code is as follows :
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
"""
fork()
1. Only in Unix Effective in the system ,Windows Invalid in system
2.fork Function call once , Go back twice : In the parent process, the return value is the child process id, In the child process, the return value is 0
"""
import os
pid = os.fork()
if pid == 0:
print(" Execute the subprocess , Subprocesses pid={pid}, The parent process ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid()))
else:
print(" Execute the parent process , Subprocesses pid={pid}, The parent process ppid={ppid}".format(pid=pid, ppid=os.getpid()))
# Execute the parent process , Subprocesses pid=611, The parent process ppid=610
# Execute the subprocess , Subprocesses pid=611, The parent process ppid=610
Process class
adopt Multiprocessing Module Process class , establish Process object .
Process Construction method of class :
init(self, group=None, targent=None, name=None, args=(), kwargs={})
Parameters | explain |
---|---|
group | Process group , Basically not . |
targent | Represents the calling object , It's usually a function . |
args | Represents the ancestor of the call object parameter . |
name | Process alias . |
kwargs | The dictionary representing the calling object . |
The specific example code is as follows :
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process
def run_process(name):
print(name)
if __name__ == "__main__":
p = Process(target=run_process, args=("test",))
p.start()
p.join()
print(" End of subprocess ")
# test
# End of subprocess
Inherit Process class
By inheritance Process class , rewrite run Method . Use .start() Method , Automatically called run Method . The specific example code is as follows :
from multiprocessing import Process
class NewProcess(Process):
def __init__(self, n):
super(NewProcess, self).__init__()
self.n = n
def run(self):
print(self.n)
if __name__ == "__main__":
test = "test"
p = NewProcess(test)
p.start()
p.join()
print(" End of subprocess ")
# test
# End of subprocess
The process of pool Pool class
Pool Class can provide a specified number ( It's usually CPU The number of nuclear ) A process called by the user , When a new request is submitted Pool In the middle of the day , If the pool is not full , A new process is created to execute these requests . If the pool is full , The request will tell you to wait . Until there are processes in the pool , Will create a new process to execute these requests .
Be careful : Processes in the process pool cannot share queues and data , and Process Generated subprocesses can share queues .
Pool Common methods in class :
function | The function prototype | explain |
---|---|---|
apply() | apply(func[, args=()[, kwds={}]]) | This function is used to pass indefinite parameters , The main process will be blocked until the end of function execution ( Not recommended , also 3.x There will be no more ). |
apply_async() | apply_async(func[, args()[, kwds{}[, callback=None]]]) | And apply Use the same , But it is non blocking and supports the result return for callback . |
map() | map(func, utterable[, chunksize=None]) | Pool Class map Method , With built-in map Function usage behavior is basically the same , It blocks the process until it returns a result . The second parameter is the iterator , But in practice , After the entire queue is ready , The program will run subprocesses . |
close() | Close process pool (Pool), Make it impossible to add new Process. | |
terminate() | End the work process , No more processing of unprocessed tasks . | |
join() | The main process is blocked waiting for the child process to exit ,join Method must be in close or terminate Then use . |
The specific code is as follows :
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import time
from multiprocessing import Pool
def run(num):
time.sleep(1)
return num * num
if __name__ == "__main__":
testList = [1, 2, 3, 4, 5, 6, 7]
print(' Single process execution ') # Sequential execution
t1 = time.time()
for i in testList:
run(i)
t2 = time.time()
print(' The sequence execution time is :', int(t2 - t1))
print(' Multi process map perform ') # Parallel execution
p = Pool(4) # Create ownership 4 Process pool of number of processes
result = p.map(run, testList)
p.close() # Close process pool , No more new assignments
p.join() # The main process is blocked waiting for the child process to exit
t3 = time.time()
print(' The execution time is :', int(t3 - t2))
print(result)
# Single process execution
# The sequence execution time is : 7
# Multi process map perform
# The execution time is : 2
# [1, 4, 9, 16, 25, 36, 49]
Process of communication
Queue()
queue : fifo , In order
Communication principle : Build the queue data structure model in memory . Multiple processes can queue content , The order in which the contents are taken out is consistent with the order in which the contents are stored .
Method | function | Parameters |
---|---|---|
q = Queue(maxsize = 0) | Create queue messages , And return the queue object . | Represents the maximum number of messages stored . The default is to allocate storage according to memory . |
q.put(data, [block, timeout]) | Storing messages to a queue . | Data: Data to store .block: When the default queue is full , Set up False It's not clogging .timeout: Timeout time . |
data = q.get([block, timeout]) | Get queue messages . | block: When the default queue is empty , Set up False It's not clogging .timeout: Timeout time . |
q.full() | Judge whether the queue is full . | |
q.empty() | Determines if the queue is empty . | |
q.size() | Determine the number of messages in the queue . | |
q.close() | Close queue . |
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Queue
def foo(data):
s = data.get() # The other end of the pipe is in the subprocess , The child process received data
if s not in "":
print(' The child process has received data ...')
print(s) # The subprocess prints out the data content ...
if __name__ == '__main__': # To add this line ...
q = Queue() # Create process communication Queue, You can understand that I brought a tube ...
p = Process(target=foo, args=(q,)) # Create child process
print(' The main process is ready to send data ...')
q.put(" Data received successfully ") # Put one end of the pipe here in the main process , The main process drops data into the tube ↑
p.start() # Start child process
p.join()
# The main process is ready to send data ...
# The child process has received data ...
# Data received successfully
Pipe()
Communication principle : Pipeline space in memory , Generate pipe operands , Multiple processes use “ The same ” Pipeline objects can be operated to achieve communication .
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello']) # Write to the pipeline
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # Read information from the pipeline
p.join()
# prints "[42, None, 'hello']"
manager()
Process manager Methods can share data , Like shared lists , Yuan Zu , Dictionaries , lock , character .
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
def f(m_list):
m_list.append("f")
if __name__ == '__main__':
manager = multiprocessing.Manager()
m_list = manager.list([1, 2, 3])
p = multiprocessing.Process(target=f, args=(m_list, ))
p.start()
p.join()
print(m_list)
# [1, 2, 3, 'f']
3、 ... and 、 Multithreading
Threads are independent in a program 、 Not the execution flow of . There is less isolation between threads than separate processes , They share memory , The state that file handles and other processes should have . Sharing global variables among multiple threads .
There are two ways to create multithreading
threading modular Thread class
The specific code is as follows :
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()
t1.join()
t2.join()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s
Custom thread
Inherit threading.Thread Class custom thread class . Its essence is reconstruction Thread Class run Method .
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s
The guardian thread
setDaemon(True) Turn all the sub threads into the main thread's Guardian thread , So when the main process ends , The child thread will also end . So when the main thread ends , The whole program quit .
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1) # At this time, the sub thread stops 1s
print('2')
time.sleep(1)
print('1')
if __name__ == '__main__':
t = threading.Thread(target=run, args=("t1",))
t.setDaemon(True) # Set the subprocess as the daemons , Must be in start() Set before
t.start()
print("end")
# task t1
# end
After the execution of the guard thread is finished , The main process ends again , have access to join Method , Let the main thread wait for the child thread to finish executing .
Lock
The biggest difference between multithreading and multiprocessing is , Multi process , The same variable , Each has a copy in each process , They don't influence each other , And in multithreading , All variables are shared by all threads , therefore , Any variable can be modified by any thread , therefore , The biggest danger of sharing data among threads is that multiple threads change a variable at the same time , Change the content .
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
value = 0
lock = threading.Lock()
def change_it(n):
# Save before you pick up , The result should be 0:
global value
value = value + n
value = value - n
# Unlocked ( Value uncertainty )
def run_thread(n):
for i in range(2000000):
change_it(n)
# Lock
# def run_thread(n):
# for i in range(2000000):
# lock.acquire()
# try:
# change_it(n)
# finally:
# lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(value)
# 29
Because there is only one lock , No matter how many threads , At most one thread holds the lock at the same time , So it won't cause a conflict of changes . When multiple threads execute at the same time lock.acquire() when , Only one thread can successfully acquire the lock , Then continue to execute the code , Other threads continue to wait until they get the lock .
The lock must be released when the thread that obtains the lock is used up , Otherwise, the threads waiting for the lock will wait forever , Become a dead thread . So use try...finally To make sure that the lock will be released .
The advantage of locking is to ensure that a critical piece of code can only be executed by one thread from beginning to end , Of course, there are many disadvantages , The first is to prevent multithreaded concurrent execution , A piece of code that contains a lock can actually only be executed in a single thread mode , The efficiency has been greatly reduced . secondly , Because multiple locks can exist , Different threads hold different locks , When trying to acquire the lock held by the other party , May cause deadlock , Causes multiple threads to hang , Can't carry out either , It can't end , Only the operating system can force termination .
Semaphore (BoundedSemaphore class )
Lock Only one thread is allowed to change data at the same time , and Semaphore It allows a certain number of threads to change data at the same time .
import threading
import time
def run(n, semaphore):
semaphore.acquire() # Lock
time.sleep(1)
print("run the thread:%s\n" % n)
semaphore.release() # Release
if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # Most allow 5 Threads running at the same time
for i in range(22):
t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----all threads done-----')
GIL lock
In Africa python Environment , In the case of a single core , At the same time, only one task can be performed . Multi core can support multiple threads to execute at the same time . But in python in , No matter how many cores , Only one thread can be executed . The reason is , This is it. GIL The existence of .
GIL Full name Global Interpreter Lock( Global interpreter lock ), The source is python Design considerations , Decisions made for data security . A thread wants to execute , You have to get GIL, We can GIL See as “ pass ”, And in one python In progress ,GIL only one . The thread that can't get the pass , You are not allowed to enter CPU perform .GIL Only in cpython Only in the middle , because cpython It's called c The native thread of language , So he can't directly operate cpu, Can only use GIL Ensure that only one thread can get data at the same time , And in the pypy and jpython There is no GIL Of .
python For different types of code execution efficiency is also different .
1、cpu Intensive code ( All kinds of recycling 、 Count, etc ), under these circumstances , Because computers work a lot ,ticks The count will soon reach the threshold . Then the trigger GIL The release and re competition of ( Switching back and forth between multiple threads consumes resources ), therefore python The next multithreaded pair cpu The dense generation is not friendly .
2、IO Intensive code ( Document processing , Web crawlers and other operations related to reading and writing files ), Multithreading can effectively improve efficiency ( Under single thread IO The operation will go on IO wait for , Cause unnecessary waste , And multithreading can be enabled in threads A When waiting for , Automatically switch to thread B, Can not waste CPU Resources for , It can improve the efficiency of program execution ). therefore python The multithreading of IO Dense code is more friendly .
Use advice
python I want to make full use of multi-core CPU, Just use multiple processes . Because every process has its own child GIL, Mutual interference , In this way, parallel execution can be realized in a real sense , stay python in , The execution efficiency of multiprocess is better than multithreading ( Only for multi-core CPU for ).
Four 、 Crawling for Douban movie TOP250
Take three ways . Before crawling 250 Famous movies .
(1) The web links that are crawled : https://movie.douban.com/top250?start=0&filter=
(2) By analyzing the web page , Find the first page of url start=0, On the second page url start=25, Page three url start=50.
(3) Mainly crawls the movie title and the rating , For comparison , So there's just a lot of data extraction and preservation , Just print it out .
Multi process crawling
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
from multiprocessing import Process, Queue
import time
from lxml import etree
import requests
class DouBanSpider(Process):
def __init__(self, q, url_list, lock):
# Rewrite the parent class __init__ Method
super(DouBanSpider, self).__init__()
self.url_list = url_list
self.q = q
self.lock = lock
self.headers = {
'Host': 'movie.douban.com',
'Referer': 'https://movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
The method used to send the request
:return: Return to the source page
'''
# When the request is wrong , Repeated requests 3 Time ,
i = 0
while i <= 3:
try:
print(u"[INFO] request url:" + url)
return requests.get(url=url, headers=self.headers).content
except Exception as e:
print(u'[INFO] %s%s' % (e, url))
i += 1
def parse_page(self):
'''
Analyze the source code of the website , And USES the xpath extract The title of the movie is evenly divided into the queue
:return:
'''
time.sleep(0.1)
while 1:
try:
url = self.url_list.pop()
except IndexError as e:
break
self.lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# Get a page of movie data
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# The movie name
title = move.xpath('.//a/span/text()')[0]
# score
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# Add the name and rating of each movie to the queue
self.q.put(score + "\t" + title)
self.lock.release()
class AllUrlSpider(Process):
def __init__(self, url_lis):
super(AllUrlSpider, self).__init__()
self.url_list = url_lis
def run(self):
base_url = 'https://movie.douban.com/top250?start='
# Construct all url
for num in range(225, -1, -25):
self.url_list.append(base_url + str(num))
print(" get URL:{}".format(base_url + str(num)))
def main():
# Create a queue to hold the data obtained by the process
q = Queue()
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
url_list = manager.list()
a = AllUrlSpider(url_list)
p = DouBanSpider(q, url_list, lock)
b = DouBanSpider(q, url_list, lock)
c = DouBanSpider(q, url_list, lock)
a.start()
p.start()
b.start()
c.start()
a.join()
p.join()
b.join()
c.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info] Time consuming :%s' % (time.time() - start))
Multi process crawling takes time 7.15 second , Some of the results are shown in the figure below :
Multithreading
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from queue import Queue
from threading import Thread
import threading
import time
from lxml import etree
import requests
url_list = []
lock = threading.Lock()
class DouBanSpider(Thread):
def __init__(self, q) :
# Rewrite the parent class __init__ Method
super(DouBanSpider, self).__init__()
self.q = q
self.headers = {
'Host': 'movie.douban.com',
'Referer': 'https://movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
The method used to send the request
:return: Return to the source page
'''
# When the request is wrong , Repeated requests 3 Time ,
i = 0
while i <= 3:
try:
print
u"[INFO] request url:" + url
html = requests.get(url=url, headers=self.headers).content
except Exception as e:
print
u'[INFO] %s%s' % (e, url)
i += 1
else:
return html
def parse_page(self):
'''
Analyze the source code of the website , And USES the xpath extract The title of the movie is evenly divided into the queue
:return:
'''
while 1:
try:
url = url_list.pop()
except IndexError as e:
break
lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# Get a page of movie data
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# The movie name
title = move.xpath('.//a/span/text()')[0]
# score
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# Add the name and rating of each movie to the queue
self.q.put(score + "\t" + title)
lock.release()
class AllUrlSpider(Thread):
def run(self):
base_url = 'https://movie.douban.com/top250?start='
# Construct all url
for num in range(225, -1, -25):
url_list.append(base_url + str(num))
print(" get URL:{}".format(base_url + str(num)))
def main():
# Create a queue to hold the data obtained by the process
q = Queue()
a = AllUrlSpider()
a.start()
# Save thread
Thread_list = []
# Create and start the thread
for i in range(5):
p = DouBanSpider(q)
p.start()
Thread_list.append(p)
a.join()
# Let the main thread wait for the child thread to execute
for i in Thread_list:
i.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info] Time consuming :%s' % (time.time() - start))
Multithreaded crawling takes time 5 second , Some of the results are shown in the figure below :
Time consuming has a certain relationship with the quality of the network , The results of each measurement are different . But in theory , The thread is in I/O Intensive operability is higher than process .