Python 3 multithreading and Mongo 100 million consumption log data fresh demo

acclea 2021-02-22 23:17:25
python multithreading mongo million consumption

Compared with the original version before : The original version was delivered

1、 Added mongo Index of collection

2、 Put some encapsulation methods into one class Convenient for later period Other places / project quote

3、 Some new todo , Mainly for the current test demo 


# -*- coding: utf-8 -*-
Simulated single day 1.2 Billion The flow of business
The average number of transactions per second is about 1389 strip
Turn on 10 Threads , Per thread per second 139 strip
The probability that time changes by one second is 1:200
According to the traffic, single thread takes a long time , So multithreading is used to produce data ,
At the same time, we can also test out the common problems of multithreading
author -- acclea
date -- 2021-02-07
from pymongo import MongoClient, ASCENDING, DESCENDING
except: "can not found pymongo lib, make it install"
import redis
except: "can not found redis lib, make it install"
import random, time, hashlib, json
import threading
class comm_func:
For some general methods / function encapsulate
For ease of invocation , All methods of this class are defined as global functions
# sha256=>64 position 
global sha256hex
def sha256hex(data):
sha256 = hashlib.sha256()
res = sha256.hexdigest()
# print("sha256 Encryption result :", res)
return res
# json_decode
global json_decode
def json_decode(input_data):
if str(type(input_data)).find('object') == -1:
return json.loads(input_data, encoding="utf-8")
f = open(input_data, encoding="utf-8")
return json.load(f, encoding="utf-8")
return False
# return "this input data is error, can not decode json"
# json_encode
global json_encode
def json_encode(input_data):
if str(type(input_data)).find('Object') == -1:
return json.dumps(input_data)
f = open(input_data, encoding="utf-8")
return json.dump(f)
return False
# return "input data is error, can not encode to json"
# isset Judge a list/dict Does the element of exist 
def isset(input_data):
global time_unix
def time_unix(input_time=None, is_full=False):
Time stamp
if input_time == None:
if is_full:
return time.time()
return int(time.time())
time_list = time.strptime(input_time, "%Y-%m-%d %H:%M:%S")
if is_full:
return int(time.mktime(time_list))
return int(time.mktime(time_list))
return "date info is error"
global date_str
def date_str(format, t=0):
Time stamp to time
:param format:
:param t:
if int(t) < 1: t = time.time()
timeArray = time.localtime(t)
return time.strftime(format, timeArray)
except Exception as e:
print(" Transformation error :case%s" % e)
return False
connect mongoDB
def conn_mongo():
conn = MongoClient('', 27017)
return conn.mon_test # Connect mydb database , If not, create it automatically 
create mongo set by set_name
def mongo_create_set(set_name):
mongo_db = conn_mongo()
# if len(index_data) > 1:
# return mongo_db[set_name].create_index(index_data)
return mongo_db[set_name] # Use test_set aggregate , If not, create it automatically 
insert data to mongo
def mongo_insert(data = {}):
if len(data) < 1:return False
# mongo_set = mongo_create_set(set_name)
return cost_set.insert_one(data)
connect redis
def conn_redis():
return redis.Redis(host='', port=6379, decode_responses=True)
redis get
Conduct json Reverse processing
def rds_get(key):
rds = conn_redis()
return json_decode(rds.get(key))
return False
# return "this key is error, check it, please"
redis set
All the input data is converted into json Handle
def rds_set(key,val,exp=3600):
rds = conn_redis()
rds.set(key, json_encode(val))
rds.expire(key, exp)
except: return False
generate user name --- Generate random user names
def generation_user_name():
random_upper_letter_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', ]
random_lower_letter_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
name = ''.join([random.choice(random_upper_letter_list), ''.join(random.sample(random_lower_letter_list, random.randint(2, 10)))])
return name
generate user id
be used for mongo The uniqueness of the system id I won't support it int/bigint/tinyint/smallint Class id Self increasing , So we need to realize
1、 By using a third party to generate a unique self increasing id , If you use redis Self increasing ,
notes :redis Self increasing There is a drawback , as long as redis No restart , That self increment will always be self increment , Not conducive to testing , After restart, self increment from 0 Start
2、 adopt Every time I Taking the maximum +1 Realization
def generation_user_id(set_name):
redis_user_id_key = "user_id"
user_id = int(rds_get(redis_user_id_key))
# user_id = 0
user_id = 0
# print(user_id)
if user_id == None or user_id < 1:
print('user_id cache Be overdue ')
# query the max user_id in mongo set
# cost_set = mongo_create_set(set_name)
# Only the output user_id Field , The first parameter is the query condition , Empty means to query all 
# If you need to output more fields , Don't want a field , You can exclude fields mongo_set.find( {}, {"cost": 0 } )
# stay MongoDB Use in sort() Method to sort the data ,sort() Method can be used to specify the sorting field by parameter , And use 1 and -1 To specify how to sort , among 1 In ascending order ,-1 For the descending order 
results = cost_set.find({}, {"user_id": 1}).sort('user_id', DESCENDING).limit(1)
user_id = [result['user_id'] for result in results][0]
# user_id = 0
user_id = 0
user_id = int(user_id)
# user_id = rds.incr(user_id)
if user_id > 80000000:
user_id = random.randint(1,80000000)
user_id += 1
rds_set(redis_user_id_key, user_id, 60*5)
return user_id
def total_cost_by_day():
# todo
# this day total cost
def day_cost_for_hour():
# todo
# this day per hour total cost
def top50_cost_user():
# todo
# this day user cost top50
def earliest_cost_user():
# todo
# this day earliest cost user, limit 50
def latest_cost_user():
# todo
# this day latest cost user, limit 50
create user cost info
Simulation generates consumption data
def create_user_info(set_name=None):
# Use the current date as the name of the collection 
default_date = date_str("%Y-%m-%d")
default_time = " ".join([default_date, '00:00:00'])
# obtain --- lock 
# Time , Of the day 0:00:00 Start recording , The deadline is 23:59:59, === Set the time for analog data , The actual business time is subject to the system time ===
# user_id = 0
max_time_results = cost_set.find({}, {"create_time": 1}).sort('create_time', DESCENDING).limit(1)
base_time_str = [result['create_time'] for result in max_time_results][0]
if base_time_str < 1:
base_time_str = time_unix(default_time)
base_time_str = time_unix(default_time)
# print(default_time)
# According to the current data volume , The probability that time changes by one second is 1:200
incre_second_flag = False
temp_random_num = random.randint(1, 200)
if temp_random_num == 200:
incre_second_flag = True
if incre_second_flag == True:
if str(type(base_time_str)).find('int') == -1: base_time_str = int(base_time_str)
base_time_str += 1
cur_date = date_str("%Y-%m-%d %H:%M:%S", base_time_str)
user_name = generation_user_name()
# Judge whether the current user exists according to the cached user information , The cache is valid for that day 23:59:59, The actual business depends on the amount of data 
cache_user_info = sha256hex(user_name)
user_id = rds_get(cache_user_info)
if user_id == False:
user_id = generation_user_id(set_name)
rds_set(cache_user_info, user_id, 24*60*60)
# Release --- lock 
# consumption 2 Decimal place 
cost = random.uniform(1, 80000)
cost = round(cost, 2)
# write in mongo
user_cost_info = {"user_name": user_name, "user_id": user_id, "cost": cost, "create_time": base_time_str, "create_at": cur_date, }
# cost_set.create_index('{"user_name":1,"user_id":1,"create_time":-1,}')
def monk_data_create():
for i in range(1, total_data_num, srd_num):
if srd_num < 1 or srd_num > total_data_num:
print(range(i, i + srd_num))
thread_list = []
for cost_user in range(i, i + srd_num):
thr_node = threading.Timer(0, create_user_info)
for thr in thread_list:
print("===== Thread failed to start ====")
if __name__ == '__main__':
# Number of open threads 
srd_num = 10
# The maximum amount of test data 
total_data_num = 120000000
# establish -- Thread lock ,
# notes : Make sure the thread lock is valid / only , So we can only put the thread lock before the thread is turned on 
user_id_lock = threading.Lock()
# pymongo.errors.AutoReconnect: [WinError 10048] Usually every socket address ( agreement / network address / port ) Only one use is allowed 
# The error occurred , It is because the operation of connecting to the database has also been added to multithreading , Need to put Connect to database Independent 
set_name = "_".join(["user_cost_log", date_str(format="%Y%m%d")])
cost_set = mongo_create_set(set_name)
# exit()
# Generate simulation data 



  1. 使用Python开发DeFi项目
  2. python 函数详解
  3. Python工程师是做什么的?前景如何?
  4. Python - zip() 函数
  5. 30 周年生日,Python 先驱是怎么评价这门语言的?
  6. python将excel自适应导入数据库
  7. 从小白到大师,这里有一份Pandas入门指南
  8. [Python] 茎叶图和复合饼图的画法
  9. [Python interface automation] - regular use case parameterization
  10. Translation: practical Python Programming 02_ 02_ Containers
  11. Two years of Java, to write Python and go
  12. Translation: practical Python Programming 02_ 02_ Containers
  13. Two years of Java, to write Python and go
  14. Python-geoplot 空间核密度估计图绘制
  15. Python-seaborn 经济学人经典图表仿制
  16. python空间绘图- regionmask掩膜操作示例
  17. Python 空间绘图 - Cartopy 经纬度添加
  18. Python-pykrige包-克里金(Kriging)插值计算及可视化绘制
  19. Python 批量重采样、掩膜、坡度提取
  20. python - 多种交通方式可达圈分析
  21. Python 空间绘图 - 房价气泡图绘制
  22. Translation: practical Python Programming 02_ 02_ Containers
  23. Research on Portfolio Optimization Based on particle swarm optimization
  24. Ubuntu deploying Django project
  25. Two years of Java, write Python and go without byte beating
  26. Translation: practical Python Programming 02_ 02_ Containers
  27. So learn python, grandfather learned! Introduction to super simple Python
  28. python3 多线程 与 mongo亿级消费日志数据 新鲜demo 【优化第一版】
  29. Summary of Chinese word segmentation based on Jieba
  30. I've heard it n times, but I'm not impressed. After reading this, you'll understand
  31. Summary of Chinese word segmentation based on Jieba
  32. From movie art to Python code to realize God's reverse thinking mode
  33. Summary of Chinese word segmentation based on Jieba
  34. ARIMA模型预测CO2浓度时间序列-python实现
  35. Python belongs to back-end development or front-end development? Introduction to Python!
  36. python isinstance()
  37. I've heard it n times, but I'm not impressed. After reading this, you'll understand
  38. This article will familiarize you with the transformation process of Python - & gt; cafe - & gt; om model
  39. 如何用Python一键修改上万个文件名
  40. One day quick start to Python
  41. Python 学习笔记: List
  42. 翻译:《实用的Python编程》02_03_Formatting
  43. Is there any age requirement for learning Python? Is 30 OK?
  44. Professor Tsinghua! The most complete Python tutorial in 12 hours (free sharing at the end of the article)
  45. Using Python to develop defi project
  46. Detailed explanation of Python function
  47. Python 可变类型作为函数默认参数时的副作用
  48. What do Python engineers do? What's their future?
  49. 这是我见过最好的Python教程:十分钟带你认识Python
  50. Python欢喜冤家:爬虫与反爬虫带着处理方案来给大家拜年了
  51. Python - zip() function
  52. 写Python会遇到如下的错误:ModuleNotFoundError: No module named 'email.mime'; 'email' is not a package
  53. Python类的调用以及私有和公有属性方法的调用
  54. Python类的专有方法
  55. Python基础之:数字字符串和列表
  56. How did Python pioneers evaluate this language on their 30th birthday?
  57. Python基础之:数字字符串和列表
  58. Python基础之:数字字符串和列表
  59. 窥探未来不是梦,python数据分析轻松实现
  60. This article will familiarize you with the transformation process of Python - & gt; cafe - & gt; om model