Compared with the original version before : The original version was delivered
1、 Added mongo Index of collection
2、 Put some encapsulation methods into one class Convenient for later period Other places / project quote
3、 Some new todo , Mainly for the current test demo
# -*- coding: utf-8 -*-
"""
Simulated single day 1.2 Billion The flow of business
The average number of transactions per second is about 1389 strip
Turn on 10 Threads , Per thread per second 139 strip
The probability that time changes by one second is 1:200
According to the traffic, single thread takes a long time , So multithreading is used to produce data ,
At the same time, we can also test out the common problems of multithreading
author -- acclea
date -- 2021-02-07
"""
try:
from pymongo import MongoClient, ASCENDING, DESCENDING
except: "can not found pymongo lib, make it install"
try:
import redis
except: "can not found redis lib, make it install"
import random, time, hashlib, json
import threading
class comm_func:
"""
For some general methods / function encapsulate
For ease of invocation , All methods of this class are defined as global functions
"""
# sha256=>64 position
global sha256hex
def sha256hex(data):
sha256 = hashlib.sha256()
sha256.update(data.encode())
res = sha256.hexdigest()
# print("sha256 Encryption result :", res)
return res
# json_decode
global json_decode
def json_decode(input_data):
try:
if str(type(input_data)).find('object') == -1:
return json.loads(input_data, encoding="utf-8")
else:
f = open(input_data, encoding="utf-8")
return json.load(f, encoding="utf-8")
except:
return False
# return "this input data is error, can not decode json"
# json_encode
global json_encode
def json_encode(input_data):
try:
if str(type(input_data)).find('Object') == -1:
return json.dumps(input_data)
else:
f = open(input_data, encoding="utf-8")
return json.dump(f)
except:
return False
# return "input data is error, can not encode to json"
# isset Judge a list/dict Does the element of exist
def isset(input_data):
pass
global time_unix
def time_unix(input_time=None, is_full=False):
"""
Time stamp
:return:
"""
if input_time == None:
if is_full:
return time.time()
return int(time.time())
else:
try:
time_list = time.strptime(input_time, "%Y-%m-%d %H:%M:%S")
if is_full:
return int(time.mktime(time_list))
return int(time.mktime(time_list))
except:
return "date info is error"
global date_str
def date_str(format, t=0):
"""
Time stamp to time
%Y
%m
%d
%H
%M
%S
%z
%a
%A
%b
%B
%c
%I
%p
:param format:
:param t:
:return:
"""
if int(t) < 1: t = time.time()
timeArray = time.localtime(t)
try:
return time.strftime(format, timeArray)
except Exception as e:
print(" Transformation error :case%s" % e)
return False
"""
connect mongoDB
"""
def conn_mongo():
conn = MongoClient('127.0.0.1', 27017)
return conn.mon_test # Connect mydb database , If not, create it automatically
"""
create mongo set by set_name
"""
def mongo_create_set(set_name):
mongo_db = conn_mongo()
# if len(index_data) > 1:
# return mongo_db[set_name].create_index(index_data)
return mongo_db[set_name] # Use test_set aggregate , If not, create it automatically
"""
insert data to mongo
"""
def mongo_insert(data = {}):
if len(data) < 1:return False
# mongo_set = mongo_create_set(set_name)
return cost_set.insert_one(data)
"""
connect redis
"""
def conn_redis():
return redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
"""
redis get
Conduct json Reverse processing
"""
def rds_get(key):
try:
rds = conn_redis()
return json_decode(rds.get(key))
except:
return False
# return "this key is error, check it, please"
"""
redis set
All the input data is converted into json Handle
"""
def rds_set(key,val,exp=3600):
try:
rds = conn_redis()
rds.set(key, json_encode(val))
rds.expire(key, exp)
except: return False
"""
generate user name --- Generate random user names
"""
def generation_user_name():
random_upper_letter_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', ]
random_lower_letter_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
name = ''.join([random.choice(random_upper_letter_list), ''.join(random.sample(random_lower_letter_list, random.randint(2, 10)))])
print(name)
return name
"""
generate user id
be used for mongo The uniqueness of the system id I won't support it int/bigint/tinyint/smallint Class id Self increasing , So we need to realize
1、 By using a third party to generate a unique self increasing id , If you use redis Self increasing ,
notes :redis Self increasing There is a drawback , as long as redis No restart , That self increment will always be self increment , Not conducive to testing , After restart, self increment from 0 Start
2、 adopt Every time I Taking the maximum +1 Realization
"""
def generation_user_id(set_name):
redis_user_id_key = "user_id"
try:
user_id = int(rds_get(redis_user_id_key))
# user_id = 0
except:
user_id = 0
# print(user_id)
if user_id == None or user_id < 1:
print('user_id cache Be overdue ')
try:
# query the max user_id in mongo set
# cost_set = mongo_create_set(set_name)
# Only the output user_id Field , The first parameter is the query condition , Empty means to query all
# If you need to output more fields , Don't want a field , You can exclude fields mongo_set.find( {}, {"cost": 0 } )
# stay MongoDB Use in sort() Method to sort the data ,sort() Method can be used to specify the sorting field by parameter , And use 1 and -1 To specify how to sort , among 1 In ascending order ,-1 For the descending order
results = cost_set.find({}, {"user_id": 1}).sort('user_id', DESCENDING).limit(1)
user_id = [result['user_id'] for result in results][0]
# user_id = 0
print('=======')
print(user_id)
except:
user_id = 0
user_id = int(user_id)
# user_id = rds.incr(user_id)
if user_id > 80000000:
user_id = random.randint(1,80000000)
else:
user_id += 1
rds_set(redis_user_id_key, user_id, 60*5)
print(user_id)
return user_id
def total_cost_by_day():
# todo
# this day total cost
pass
def day_cost_for_hour():
# todo
# this day per hour total cost
pass
def top50_cost_user():
# todo
# this day user cost top50
pass
def earliest_cost_user():
# todo
# this day earliest cost user, limit 50
pass
def latest_cost_user():
# todo
# this day latest cost user, limit 50
pass
"""
create user cost info
Simulation generates consumption data
"""
def create_user_info(set_name=None):
# Use the current date as the name of the collection
default_date = date_str("%Y-%m-%d")
default_time = " ".join([default_date, '00:00:00'])
# obtain --- lock
user_id_lock.acquire()
# Time , Of the day 0:00:00 Start recording , The deadline is 23:59:59, === Set the time for analog data , The actual business time is subject to the system time ===
try:
# user_id = 0
max_time_results = cost_set.find({}, {"create_time": 1}).sort('create_time', DESCENDING).limit(1)
base_time_str = [result['create_time'] for result in max_time_results][0]
print('mongo---time')
print(base_time_str)
if base_time_str < 1:
base_time_str = time_unix(default_time)
except:
base_time_str = time_unix(default_time)
# print(default_time)
print(base_time_str)
# According to the current data volume , The probability that time changes by one second is 1:200
incre_second_flag = False
temp_random_num = random.randint(1, 200)
if temp_random_num == 200:
incre_second_flag = True
if incre_second_flag == True:
if str(type(base_time_str)).find('int') == -1: base_time_str = int(base_time_str)
base_time_str += 1
cur_date = date_str("%Y-%m-%d %H:%M:%S", base_time_str)
user_name = generation_user_name()
# Judge whether the current user exists according to the cached user information , The cache is valid for that day 23:59:59, The actual business depends on the amount of data
cache_user_info = sha256hex(user_name)
user_id = rds_get(cache_user_info)
if user_id == False:
user_id = generation_user_id(set_name)
rds_set(cache_user_info, user_id, 24*60*60)
# Release --- lock
user_id_lock.release()
# consumption 2 Decimal place
cost = random.uniform(1, 80000)
cost = round(cost, 2)
# write in mongo
user_cost_info = {"user_name": user_name, "user_id": user_id, "cost": cost, "create_time": base_time_str, "create_at": cur_date, }
mongo_insert(user_cost_info)
# cost_set.create_index('{"user_name":1,"user_id":1,"create_time":-1,}')
def monk_data_create():
for i in range(1, total_data_num, srd_num):
if srd_num < 1 or srd_num > total_data_num:
break
print(range(i, i + srd_num))
thread_list = []
try:
for cost_user in range(i, i + srd_num):
thr_node = threading.Timer(0, create_user_info)
thr_node.start()
thread_list.append(thr_node)
for thr in thread_list:
thr.join()
except:
print("===== Thread failed to start ====")
if __name__ == '__main__':
# Number of open threads
srd_num = 10
# The maximum amount of test data
total_data_num = 120000000
# establish -- Thread lock ,
# notes : Make sure the thread lock is valid / only , So we can only put the thread lock before the thread is turned on
user_id_lock = threading.Lock()
# pymongo.errors.AutoReconnect: 127.0.0.1:27017: [WinError 10048] Usually every socket address ( agreement / network address / port ) Only one use is allowed
# The error occurred , It is because the operation of connecting to the database has also been added to multithreading , Need to put Connect to database Independent
set_name = "_".join(["user_cost_log", date_str(format="%Y%m%d")])
cost_set = mongo_create_set(set_name)
cost_set.create_index('{"user_name":1,"user_id":1,"create_time":-1,}')
# exit()
# Generate simulation data
monk_data_create()