Multi thread, multi process + code example of Python crawler

Chen Python 2020-11-16 21:21:36
multi thread multi process code


python Multithreading of crawler 、 Multi process

Using multiple processes 、 Multithreading to write crawler code can effectively improve the efficiency of crawler crawling the target website .

Multi person learning python, I don't know where to start .

Many people study python, After mastering the basic grammar , I don't know where to look for cases to start .

A lot of people who have done cases , But I don't know how to learn more advanced knowledge .

So for these three kinds of people , I will provide you with a good learning platform , Get a free video tutorial , electronic text , And the source code of the course !??¤

QQ Group :1057034340

One 、 What are processes and threads

quote   Liao Xuefeng's official website   Explanation of processes and threads :

process : For the operating system , A task is a process (Process), For example, opening a browser is to start a browser process , Open a notepad and start a notepad process , Opening two notebooks starts two Notepad processes , Open one Word It starts a Word process .

Threads : Some processes do more than one thing at a time , such as Word, It can type at the same time 、 Spelling check 、 Printing and so on . Within a process , Do many things at the same time , You need to run multiple “ The subtasks ”, We put these in the process “ The subtasks ” Called thread (Thread).

Each process has to do at least one thing , therefore , A process has at least one thread .

Two 、 Multi process

Four ways to implement multiprocessing

os.fork()

python Of os Modules encapsulate common system calls , among , Multi process calls are fork() function . The specific example code is as follows :

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
"""
fork()
1. Only in Unix Effective in the system ,Windows Invalid in system
2.fork Function call once , Go back twice : In the parent process, the return value is the child process id, In the child process, the return value is 0
"""
import os
pid = os.fork()
if pid == 0:
print(" Execute the subprocess , Subprocesses pid={pid}, The parent process ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid()))
else:
print(" Execute the parent process , Subprocesses pid={pid}, The parent process ppid={ppid}".format(pid=pid, ppid=os.getpid()))
# Execute the parent process , Subprocesses pid=611, The parent process ppid=610
# Execute the subprocess , Subprocesses pid=611, The parent process ppid=610

Process class

adopt Multiprocessing Module Process class , establish Process object .

Process Construction method of class :

init(self, group=None, targent=None, name=None, args=(), kwargs={})

Parameters explain
group Process group , Basically not .
targent Represents the calling object , It's usually a function .
args Represents the ancestor of the call object parameter .
name Process alias .
kwargs The dictionary representing the calling object .

The specific example code is as follows :

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process
def run_process(name):
print(name)
if __name__ == "__main__":
p = Process(target=run_process, args=("test",))
p.start()
p.join()
print(" End of subprocess ")
# test
# End of subprocess 

Inherit Process class

By inheritance Process class , rewrite run Method . Use .start() Method , Automatically called run Method . The specific example code is as follows :

from multiprocessing import Process
class NewProcess(Process):
def __init__(self, n):
super(NewProcess, self).__init__()
self.n = n
def run(self):
print(self.n)
if __name__ == "__main__":
test = "test"
p = NewProcess(test)
p.start()
p.join()
print(" End of subprocess ")
# test
# End of subprocess 

The process of pool Pool class

Pool Class can provide a specified number (  It's usually CPU The number of nuclear  ) A process called by the user , When a new request is submitted Pool In the middle of the day , If the pool is not full , A new process is created to execute these requests .  If the pool is full , The request will tell you to wait . Until there are processes in the pool , Will create a new process to execute these requests .

Be careful : Processes in the process pool cannot share queues and data , and Process Generated subprocesses can share queues .

Pool Common methods in class :

function The function prototype explain
apply() apply(func[, args=()[, kwds={}]]) This function is used to pass indefinite parameters , The main process will be blocked until the end of function execution ( Not recommended , also 3.x There will be no more ).
apply_async() apply_async(func[, args()[, kwds{}[, callback=None]]]) And apply Use the same , But it is non blocking and supports the result return for callback .
map() map(func, utterable[, chunksize=None]) Pool Class map Method , With built-in map Function usage behavior is basically the same , It blocks the process until it returns a result . The second parameter is the iterator , But in practice , After the entire queue is ready , The program will run subprocesses .
close()   Close process pool (Pool), Make it impossible to add new Process.
terminate()   End the work process , No more processing of unprocessed tasks .
join()   The main process is blocked waiting for the child process to exit ,join Method must be in close or terminate Then use .

The specific code is as follows :

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import time
from multiprocessing import Pool
def run(num):
time.sleep(1)
return num * num
if __name__ == "__main__":
testList = [1, 2, 3, 4, 5, 6, 7]
print(' Single process execution ') # Sequential execution
t1 = time.time()
for i in testList:
run(i)
t2 = time.time()
print(' The sequence execution time is :', int(t2 - t1))
print(' Multi process map perform ') # Parallel execution
p = Pool(4) # Create ownership 4 Process pool of number of processes
result = p.map(run, testList)
p.close() # Close process pool , No more new assignments
p.join() # The main process is blocked waiting for the child process to exit
t3 = time.time()
print(' The execution time is :', int(t3 - t2))
print(result)
# Single process execution
# The sequence execution time is : 7
# Multi process map perform
# The execution time is : 2
# [1, 4, 9, 16, 25, 36, 49]

Process of communication

Queue()

queue : fifo , In order

Communication principle : Build the queue data structure model in memory . Multiple processes can queue content , The order in which the contents are taken out is consistent with the order in which the contents are stored .

Method function Parameters
q = Queue(maxsize = 0) Create queue messages , And return the queue object . Represents the maximum number of messages stored . The default is to allocate storage according to memory .
q.put(data, [block, timeout]) Storing messages to a queue . Data: Data to store .block: When the default queue is full , Set up False It's not clogging .timeout: Timeout time .
data = q.get([block, timeout]) Get queue messages . block: When the default queue is empty , Set up False It's not clogging .timeout: Timeout time .
q.full() Judge whether the queue is full .  
q.empty() Determines if the queue is empty .  
q.size() Determine the number of messages in the queue .  
q.close() Close queue .  
#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Queue
def foo(data):
s = data.get() # The other end of the pipe is in the subprocess , The child process received data
if s not in "":
print(' The child process has received data ...')
print(s) # The subprocess prints out the data content ...
if __name__ == '__main__': # To add this line ...
q = Queue() # Create process communication Queue, You can understand that I brought a tube ...
p = Process(target=foo, args=(q,)) # Create child process
print(' The main process is ready to send data ...')
q.put(" Data received successfully ") # Put one end of the pipe here in the main process , The main process drops data into the tube ↑
p.start() # Start child process
p.join()
# The main process is ready to send data ...
# The child process has received data ...
# Data received successfully 

Pipe()

Communication principle : Pipeline space in memory , Generate pipe operands , Multiple processes use “ The same ” Pipeline objects can be operated to achieve communication .

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello']) # Write to the pipeline
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # Read information from the pipeline
p.join()
# prints "[42, None, 'hello']"

manager()

Process manager Methods can share data , Like shared lists , Yuan Zu , Dictionaries , lock , character .

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
def f(m_list):
m_list.append("f")
if __name__ == '__main__':
manager = multiprocessing.Manager()
m_list = manager.list([1, 2, 3])
p = multiprocessing.Process(target=f, args=(m_list, ))
p.start()
p.join()
print(m_list)
# [1, 2, 3, 'f']

3、 ... and 、 Multithreading

Threads are independent in a program 、 Not the execution flow of . There is less isolation between threads than separate processes , They share memory , The state that file handles and other processes should have .  Sharing global variables among multiple threads  .

There are two ways to create multithreading

threading modular Thread class

The specific code is as follows :

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()
t1.join()
t2.join()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s

Custom thread

Inherit threading.Thread Class custom thread class . Its essence is reconstruction Thread Class run Method .

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(1)
print('1s')
time.sleep(1)
print('0s')
time.sleep(1)
if __name__ == '__main__':
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
# task t1
# task t2
# 1s
# 1s
# 0s
# 0s

The guardian thread

setDaemon(True) Turn all the sub threads into the main thread's Guardian thread , So when the main process ends , The child thread will also end . So when the main thread ends , The whole program quit .

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
import time
def run(n):
print("task", n)
time.sleep(1) # At this time, the sub thread stops 1s
print('2')
time.sleep(1)
print('1')
if __name__ == '__main__':
t = threading.Thread(target=run, args=("t1",))
t.setDaemon(True) # Set the subprocess as the daemons , Must be in start() Set before
t.start()
print("end")
# task t1
# end

After the execution of the guard thread is finished , The main process ends again , have access to join Method , Let the main thread wait for the child thread to finish executing .

Lock

The biggest difference between multithreading and multiprocessing is , Multi process , The same variable , Each has a copy in each process , They don't influence each other , And in multithreading , All variables are shared by all threads , therefore , Any variable can be modified by any thread , therefore , The biggest danger of sharing data among threads is that multiple threads change a variable at the same time , Change the content .

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import threading
value = 0
lock = threading.Lock()
def change_it(n):
# Save before you pick up , The result should be 0:
global value
value = value + n
value = value - n
# Unlocked ( Value uncertainty )
def run_thread(n):
for i in range(2000000):
change_it(n)
# Lock
# def run_thread(n):
# for i in range(2000000):
# lock.acquire()
# try:
# change_it(n)
# finally:
# lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(value)
# 29

Because there is only one lock , No matter how many threads , At most one thread holds the lock at the same time , So it won't cause a conflict of changes . When multiple threads execute at the same time  lock.acquire()  when , Only one thread can successfully acquire the lock , Then continue to execute the code , Other threads continue to wait until they get the lock .

The lock must be released when the thread that obtains the lock is used up , Otherwise, the threads waiting for the lock will wait forever , Become a dead thread . So use  try...finally  To make sure that the lock will be released .

The advantage of locking is to ensure that a critical piece of code can only be executed by one thread from beginning to end , Of course, there are many disadvantages , The first is to prevent multithreaded concurrent execution , A piece of code that contains a lock can actually only be executed in a single thread mode , The efficiency has been greatly reduced . secondly , Because multiple locks can exist , Different threads hold different locks , When trying to acquire the lock held by the other party , May cause deadlock , Causes multiple threads to hang , Can't carry out either , It can't end , Only the operating system can force termination .

Semaphore (BoundedSemaphore class )

Lock Only one thread is allowed to change data at the same time , and Semaphore It allows a certain number of threads to change data at the same time .

import threading
import time
def run(n, semaphore):
semaphore.acquire() # Lock
time.sleep(1)
print("run the thread:%s\n" % n)
semaphore.release() # Release
if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # Most allow 5 Threads running at the same time
for i in range(22):
t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('-----all threads done-----')

GIL lock

In Africa python Environment , In the case of a single core , At the same time, only one task can be performed . Multi core can support multiple threads to execute at the same time . But in python in , No matter how many cores , Only one thread can be executed . The reason is , This is it. GIL The existence of .

GIL Full name Global Interpreter Lock( Global interpreter lock ), The source is python Design considerations , Decisions made for data security . A thread wants to execute , You have to get GIL, We can GIL See as “ pass ”, And in one python In progress ,GIL only one . The thread that can't get the pass , You are not allowed to enter CPU perform .GIL Only in cpython Only in the middle , because cpython It's called c The native thread of language , So he can't directly operate cpu, Can only use GIL Ensure that only one thread can get data at the same time , And in the pypy and jpython There is no GIL Of .

python For different types of code execution efficiency is also different .

1、cpu Intensive code ( All kinds of recycling 、 Count, etc ), under these circumstances , Because computers work a lot ,ticks The count will soon reach the threshold . Then the trigger GIL The release and re competition of ( Switching back and forth between multiple threads consumes resources ), therefore python The next multithreaded pair cpu The dense generation is not friendly .

2、IO Intensive code ( Document processing , Web crawlers and other operations related to reading and writing files ), Multithreading can effectively improve efficiency ( Under single thread IO The operation will go on IO wait for , Cause unnecessary waste , And multithreading can be enabled in threads A When waiting for , Automatically switch to thread B, Can not waste CPU Resources for , It can improve the efficiency of program execution ). therefore python The multithreading of IO Dense code is more friendly .

Use advice

python I want to make full use of multi-core CPU, Just use multiple processes . Because every process has its own child GIL, Mutual interference , In this way, parallel execution can be realized in a real sense , stay python in , The execution efficiency of multiprocess is better than multithreading ( Only for multi-core CPU for ).

Four 、 Crawling for Douban movie TOP250

Take three ways . Before crawling 250 Famous movies .

(1) The web links that are crawled : https://movie.douban.com/top250?start=0&filter=

(2) By analyzing the web page , Find the first page of url start=0, On the second page url start=25, Page three url start=50.

(3) Mainly crawls the movie title and the rating , For comparison , So there's just a lot of data extraction and preservation , Just print it out .

Multi process crawling

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
import multiprocessing
from multiprocessing import Process, Queue
import time
from lxml import etree
import requests
class DouBanSpider(Process):
def __init__(self, q, url_list, lock):
# Rewrite the parent class __init__ Method
super(DouBanSpider, self).__init__()
self.url_list = url_list
self.q = q
self.lock = lock
self.headers = {
'Host': 'movie.douban.com',
'Referer': 'https://movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
The method used to send the request
:return: Return to the source page
'''
# When the request is wrong , Repeated requests 3 Time ,
i = 0
while i <= 3:
try:
print(u"[INFO] request url:" + url)
return requests.get(url=url, headers=self.headers).content
except Exception as e:
print(u'[INFO] %s%s' % (e, url))
i += 1
def parse_page(self):
'''
Analyze the source code of the website , And USES the xpath extract The title of the movie is evenly divided into the queue
:return:
'''
time.sleep(0.1)
while 1:
try:
url = self.url_list.pop()
except IndexError as e:
break
self.lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# Get a page of movie data
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# The movie name
title = move.xpath('.//a/span/text()')[0]
# score
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# Add the name and rating of each movie to the queue
self.q.put(score + "\t" + title)
self.lock.release()
class AllUrlSpider(Process):
def __init__(self, url_lis):
super(AllUrlSpider, self).__init__()
self.url_list = url_lis
def run(self):
base_url = 'https://movie.douban.com/top250?start='
# Construct all url
for num in range(225, -1, -25):
self.url_list.append(base_url + str(num))
print(" get URL:{}".format(base_url + str(num)))
def main():
# Create a queue to hold the data obtained by the process
q = Queue()
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
url_list = manager.list()
a = AllUrlSpider(url_list)
p = DouBanSpider(q, url_list, lock)
b = DouBanSpider(q, url_list, lock)
c = DouBanSpider(q, url_list, lock)
a.start()
p.start()
b.start()
c.start()
a.join()
p.join()
b.join()
c.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info] Time consuming :%s' % (time.time() - start))

Multi process crawling takes time 7.15 second , Some of the results are shown in the figure below :

Multithreading

#!/usr/bin/env python2
# -*- coding=utf-8 -*-
from queue import Queue
from threading import Thread
import threading
import time
from lxml import etree
import requests
url_list = []
lock = threading.Lock()
class DouBanSpider(Thread):
def __init__(self, q) :
# Rewrite the parent class __init__ Method
super(DouBanSpider, self).__init__()
self.q = q
self.headers = {
'Host': 'movie.douban.com',
'Referer': 'https://movie.douban.com/top250?start=225&filter=',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
The method used to send the request
:return: Return to the source page
'''
# When the request is wrong , Repeated requests 3 Time ,
i = 0
while i <= 3:
try:
print
u"[INFO] request url:" + url
html = requests.get(url=url, headers=self.headers).content
except Exception as e:
print
u'[INFO] %s%s' % (e, url)
i += 1
else:
return html
def parse_page(self):
'''
Analyze the source code of the website , And USES the xpath extract The title of the movie is evenly divided into the queue
:return:
'''
while 1:
try:
url = url_list.pop()
except IndexError as e:
break
lock.acquire()
response = self.send_request(url)
html = etree.HTML(response)
# Get a page of movie data
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# The movie name
title = move.xpath('.//a/span/text()')[0]
# score
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# Add the name and rating of each movie to the queue
self.q.put(score + "\t" + title)
lock.release()
class AllUrlSpider(Thread):
def run(self):
base_url = 'https://movie.douban.com/top250?start='
# Construct all url
for num in range(225, -1, -25):
url_list.append(base_url + str(num))
print(" get URL:{}".format(base_url + str(num)))
def main():
# Create a queue to hold the data obtained by the process
q = Queue()
a = AllUrlSpider()
a.start()
# Save thread
Thread_list = []
# Create and start the thread
for i in range(5):
p = DouBanSpider(q)
p.start()
Thread_list.append(p)
a.join()
# Let the main thread wait for the child thread to execute
for i in Thread_list:
i.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info] Time consuming :%s' % (time.time() - start))

Multithreaded crawling takes time 5 second , Some of the results are shown in the figure below :

Time consuming has a certain relationship with the quality of the network , The results of each measurement are different . But in theory , The thread is in I/O Intensive operability is higher than process .

版权声明
本文为[Chen Python]所创,转载请带上原文链接,感谢

  1. 利用Python爬虫获取招聘网站职位信息
  2. Using Python crawler to obtain job information of recruitment website
  3. Several highly rated Python libraries arrow, jsonpath, psutil and tenacity are recommended
  4. Python装饰器
  5. Python实现LDAP认证
  6. Python decorator
  7. Implementing LDAP authentication with Python
  8. Vscode configures Python development environment!
  9. In Python, how dare you say you can't log module? ️
  10. 我收藏的有关Python的电子书和资料
  11. python 中 lambda的一些tips
  12. python中字典的一些tips
  13. python 用生成器生成斐波那契数列
  14. python脚本转pyc踩了个坑。。。
  15. My collection of e-books and materials about Python
  16. Some tips of lambda in Python
  17. Some tips of dictionary in Python
  18. Using Python generator to generate Fibonacci sequence
  19. The conversion of Python script to PyC stepped on a pit...
  20. Python游戏开发,pygame模块,Python实现扫雷小游戏
  21. Python game development, pyGame module, python implementation of minesweeping games
  22. Python实用工具,email模块,Python实现邮件远程控制自己电脑
  23. Python utility, email module, python realizes mail remote control of its own computer
  24. 毫无头绪的自学Python,你可能连门槛都摸不到!【最佳学习路线】
  25. Python读取二进制文件代码方法解析
  26. Python字典的实现原理
  27. Without a clue, you may not even touch the threshold【 Best learning route]
  28. Parsing method of Python reading binary file code
  29. Implementation principle of Python dictionary
  30. You must know the function of pandas to parse JSON data - JSON_ normalize()
  31. Python实用案例,私人定制,Python自动化生成爱豆专属2021日历
  32. Python practical case, private customization, python automatic generation of Adu exclusive 2021 calendar
  33. 《Python实例》震惊了,用Python这么简单实现了聊天系统的脏话,广告检测
  34. "Python instance" was shocked and realized the dirty words and advertisement detection of the chat system in Python
  35. Convolutional neural network processing sequence for Python deep learning
  36. Python data structure and algorithm (1) -- enum type enum
  37. 超全大厂算法岗百问百答(推荐系统/机器学习/深度学习/C++/Spark/python)
  38. 【Python进阶】你真的明白NumPy中的ndarray吗?
  39. All questions and answers for algorithm posts of super large factories (recommended system / machine learning / deep learning / C + + / spark / Python)
  40. [advanced Python] do you really understand ndarray in numpy?
  41. 【Python进阶】Python进阶专栏栏主自述:不忘初心,砥砺前行
  42. [advanced Python] Python advanced column main readme: never forget the original intention and forge ahead
  43. python垃圾回收和缓存管理
  44. java调用Python程序
  45. java调用Python程序
  46. Python常用函数有哪些?Python基础入门课程
  47. Python garbage collection and cache management
  48. Java calling Python program
  49. Java calling Python program
  50. What functions are commonly used in Python? Introduction to Python Basics
  51. Python basic knowledge
  52. Anaconda5.2 安装 Python 库(MySQLdb)的方法
  53. Python实现对脑电数据情绪分析
  54. Anaconda 5.2 method of installing Python Library (mysqldb)
  55. Python implements emotion analysis of EEG data
  56. Master some advanced usage of Python in 30 seconds, which makes others envy it
  57. python爬取百度图片并对图片做一系列处理
  58. Python crawls Baidu pictures and does a series of processing on them
  59. python链接mysql数据库
  60. Python link MySQL database