python3 多线程 与 mongo亿级消费日志数据 新鲜demo 【优化第一版】

acclea 2021-02-22 13:29:38
Python


与之前的最初版对比:最初版传送

1、新增了mongo 集合的索引

2、把一些封装的方法归入一个 class 便于后期 其他位置/项目 引用

3、新增了一些 todo ,主要是针对当前测试 demo 

 

# -*- coding: utf-8 -*-
"""
模拟单日 1.2亿 的交易流水
平均每秒的交易数据约为 1389 条
开启 10 个线程,每个线程每秒 139 条
时间改变一秒的概率为 1:200
根据业务量单线程用时较长,所以采用多线程生产数据,
同时也可以测试出多线程出现的常见问题
author -- acclea
date -- 2021-02-07
"""
try:
from pymongo import MongoClient, ASCENDING, DESCENDING
except: "can not found pymongo lib, make it install"
try:
import redis
except: "can not found redis lib, make it install"
import random, time, hashlib, json
import threading
class comm_func:
"""
对一些通用的方法/函数 进行封装
为了方便调用,该类的所有方法会定义为全局函数
"""
# sha256=>64位
global sha256hex
def sha256hex(data):
sha256 = hashlib.sha256()
sha256.update(data.encode())
res = sha256.hexdigest()
# print("sha256加密结果:", res)
return res
# json_decode
global json_decode
def json_decode(input_data):
try:
if str(type(input_data)).find('object') == -1:
return json.loads(input_data, encoding="utf-8")
else:
f = open(input_data, encoding="utf-8")
return json.load(f, encoding="utf-8")
except:
return False
# return "this input data is error, can not decode json"
# json_encode
global json_encode
def json_encode(input_data):
try:
if str(type(input_data)).find('Object') == -1:
return json.dumps(input_data)
else:
f = open(input_data, encoding="utf-8")
return json.dump(f)
except:
return False
# return "input data is error, can not encode to json"
# isset 判断一个 list/dict 的元素是否存在
def isset(input_data):
pass
global time_unix
def time_unix(input_time=None, is_full=False):
"""
时间戳
:return:
"""
if input_time == None:
if is_full:
return time.time()
return int(time.time())
else:
try:
time_list = time.strptime(input_time, "%Y-%m-%d %H:%M:%S")
if is_full:
return int(time.mktime(time_list))
return int(time.mktime(time_list))
except:
return "date info is error"
global date_str
def date_str(format, t=0):
"""
时间戳转时间
%Y
%m
%d
%H
%M
%S
%z
%a
%A
%b
%B
%c
%I
%p
:param format:
:param t:
:return:
"""
if int(t) < 1: t = time.time()
timeArray = time.localtime(t)
try:
return time.strftime(format, timeArray)
except Exception as e:
print("转化错误:case%s" % e)
return False
"""
connect mongoDB
"""
def conn_mongo():
conn = MongoClient('127.0.0.1', 27017)
return conn.mon_test #连接mydb数据库,没有则自动创建
"""
create mongo set by set_name
"""
def mongo_create_set(set_name):
mongo_db = conn_mongo()
# if len(index_data) > 1:
# return mongo_db[set_name].create_index(index_data)
return mongo_db[set_name] # 使用test_set集合,没有则自动创建
"""
insert data to mongo
"""
def mongo_insert(data = {}):
if len(data) < 1:return False
# mongo_set = mongo_create_set(set_name)
return cost_set.insert_one(data)
"""
connect redis
"""
def conn_redis():
return redis.Redis(host='127.0.0.1', port=6379, decode_responses=True)
"""
redis get
进行 json 逆向处理
"""
def rds_get(key):
try:
rds = conn_redis()
return json_decode(rds.get(key))
except:
return False
# return "this key is error, check it, please"
"""
redis set
所有输入的数据全部转化为 json 处理
"""
def rds_set(key,val,exp=3600):
try:
rds = conn_redis()
rds.set(key, json_encode(val))
rds.expire(key, exp)
except: return False
"""
generate user name --- 生成随机用户名
"""
def generation_user_name():
random_upper_letter_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', ]
random_lower_letter_list = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
name = ''.join([random.choice(random_upper_letter_list), ''.join(random.sample(random_lower_letter_list, random.randint(2, 10)))])
print(name)
return name
"""
generate user id
用于mongo 系统的唯一 id 不支持 int/bigint/tinyint/smallint 类的 id 自增,所以需要自己实现
1、通过借助第三方来生成唯一自增 id ,如使用 redis 自增,
注:redis 自增 有一个缺点,只要 redis 没有重启,那个自增值就会一直自增,不利于测试,重启后自增值 从 0 开始
2、通过 自己每次 取最大值 +1 实现
"""
def generation_user_id(set_name):
redis_user_id_key = "user_id"
try:
user_id = int(rds_get(redis_user_id_key))
# user_id = 0
except:
user_id = 0
# print(user_id)
if user_id == None or user_id < 1:
print('user_id 缓存 过期')
try:
# query the max user_id in mongo set
# cost_set = mongo_create_set(set_name)
# 只输出 user_id 字段,第一个参数为查询条件,空代表查询所有
# 如果需要输出的字段比较多,不想要某个字段,可以用排除字段的方法 mongo_set.find( {}, {"cost": 0 } )
# 在MongoDB中使用sort()方法对数据进行排序,sort()方法可以通过参数指定排序的字段,并使用 1 和 -1 来指定排序的方式,其中 1 为升序,-1为降序
results = cost_set.find({}, {"user_id": 1}).sort('user_id', DESCENDING).limit(1)
user_id = [result['user_id'] for result in results][0]
# user_id = 0
print('=======')
print(user_id)
except:
user_id = 0
user_id = int(user_id)
# user_id = rds.incr(user_id)
if user_id > 80000000:
user_id = random.randint(1,80000000)
else:
user_id += 1
rds_set(redis_user_id_key, user_id, 60*5)
print(user_id)
return user_id
def total_cost_by_day():
# todo
# this day total cost
pass
def day_cost_for_hour():
# todo
# this day per hour total cost
pass
def top50_cost_user():
# todo
# this day user cost top50
pass
def earliest_cost_user():
# todo
# this day earliest cost user, limit 50
pass
def latest_cost_user():
# todo
# this day latest cost user, limit 50
pass
"""
create user cost info
模拟生成消费数据
"""
def create_user_info(set_name=None):
# 以当前的日期作为集合的名称
default_date = date_str("%Y-%m-%d")
default_time = " ".join([default_date, '00:00:00'])
# 获取---锁
user_id_lock.acquire()
# 时间, 当天的 0:00:00 开始记录,截至时间为 23:59:59, === 由于为模拟数据所以设定时间,实际业务的时间以系统时间为准 ===
try:
# user_id = 0
max_time_results = cost_set.find({}, {"create_time": 1}).sort('create_time', DESCENDING).limit(1)
base_time_str = [result['create_time'] for result in max_time_results][0]
print('mongo---time')
print(base_time_str)
if base_time_str < 1:
base_time_str = time_unix(default_time)
except:
base_time_str = time_unix(default_time)
# print(default_time)
print(base_time_str)
# 根据当前数据量,时间改变一秒的概率为 1:200
incre_second_flag = False
temp_random_num = random.randint(1, 200)
if temp_random_num == 200:
incre_second_flag = True
if incre_second_flag == True:
if str(type(base_time_str)).find('int') == -1: base_time_str = int(base_time_str)
base_time_str += 1
cur_date = date_str("%Y-%m-%d %H:%M:%S", base_time_str)
user_name = generation_user_name()
# 根据缓存用户信息判断当前用户是否存在,缓存有效期为当天 23:59:59,实际业务根据数据量决定
cache_user_info = sha256hex(user_name)
user_id = rds_get(cache_user_info)
if user_id == False:
user_id = generation_user_id(set_name)
rds_set(cache_user_info, user_id, 24*60*60)
# 释放---锁
user_id_lock.release()
# 消费 2 位小数
cost = random.uniform(1, 80000)
cost = round(cost, 2)
# 写入mongo
user_cost_info = {"user_name": user_name, "user_id": user_id, "cost": cost, "create_time": base_time_str, "create_at": cur_date, }
mongo_insert(user_cost_info)
# cost_set.create_index('{"user_name":1,"user_id":1,"create_time":-1,}')
def monk_data_create():
for i in range(1, total_data_num, srd_num):
if srd_num < 1 or srd_num > total_data_num:
break
print(range(i, i + srd_num))
thread_list = []
try:
for cost_user in range(i, i + srd_num):
thr_node = threading.Timer(0, create_user_info)
thr_node.start()
thread_list.append(thr_node)
for thr in thread_list:
thr.join()
except:
print("=====线程无法启动====")
if __name__ == '__main__':
# 开启线程数
srd_num = 10
# 最大测试数据量
total_data_num = 120000000
# 创建--线程锁,
# 注:确保线程锁的有效/唯一,因此只能把线程锁放在线程开启前
user_id_lock = threading.Lock()
# pymongo.errors.AutoReconnect: 127.0.0.1:27017: [WinError 10048] 通常每个套接字地址(协议/网络地址/端口)只允许使用一次
# 出现该错误,是由于把连接数据库的操作也加入了多线程,需要把 连接数据库 独立出来
set_name = "_".join(["user_cost_log", date_str(format="%Y%m%d")])
cost_set = mongo_create_set(set_name)
cost_set.create_index('{"user_name":1,"user_id":1,"create_time":-1,}')
# exit()
# 生成模拟数据
monk_data_create()

 

版权声明
本文为[acclea]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/3268486/blog/4958884

  1. Python中的解决中文字符编码的问题
  2. Solving the problem of Chinese character coding in Python
  3. Translation: practical Python Programming 02_ 01_ Datatypes
  4. Installation and use of Python and tensorflow in win10 environment (Python version 3.6, tensorflow version 1.6)
  5. Python series 46
  6. Linux安装Python3
  7. 【python接口自动化】- 正则用例参数化
  8. Python RestFul Api 设计
  9. filecmp --- 文件及目录的比较│Python标准库
  10. Installing python3 on Linux
  11. [Python] Matplotlib 圖表的繪製和美化技巧
  12. (資料科學學習手札108)Python+Dash快速web應用開發——靜態部件篇(上)
  13. 翻譯:《實用的Python程式設計》02_01_Datatypes
  14. 【python接口自动化】- 正则用例参数化
  15. 翻译:《实用的Python编程》02_02_Containers
  16. 两年Java,去字节跳动写Python和Go
  17. [Python interface automation] - regular use case parameterization
  18. Python restful API design
  19. 翻译:《实用的Python编程》02_02_Containers
  20. 两年Java,去字节跳动写Python和Go
  21. 翻译:《实用的Python编程》02_02_Containers
  22. Python基于粒子群优化的投资组合优化研究
  23. ubuntu部署django项目
  24. 兩年Java,去位元組跳動寫Python和Go
  25. 翻譯:《實用的Python程式設計》02_02_Containers
  26. 这样学习Python,爷爷都学会了!超简单Python入门
  27. [Python] 基于 jieba 的中文分词总结
  28. 【python】递归听了N次也没印象,读完这篇你就懂了
  29. [Python] 基于 jieba 的中文分词总结
  30. 人理解迭代,神则体会递归,从电影艺术到Python代码实现神的逆向思维模式
  31. [Python] 基於 jieba 的中文分詞總結
  32. Python属于后端开发还是前端开发?Python入门!
  33. 【python】递归听了N次也没印象,读完这篇你就懂了
  34. 一天快速入门python
  35. 学习Python对年龄有没有要求?30岁可以吗?
  36. 清华教授!12小时整理的最全Python教程(文末无偿分享)
  37. 使用Python开发DeFi项目
  38. python 函数详解
  39. Python工程师是做什么的?前景如何?
  40. Filecmp -- comparison of files and directories
  41. Python - zip() 函数
  42. 30 周年生日,Python 先驱是怎么评价这门语言的?
  43. Drawing and beautifying skills of [Python] Matplotlib chart
  44. Python + dash rapid web application development static components
  45. Translation: practical Python Programming 02_ 01_ Datatypes
  46. python将excel自适应导入数据库
  47. 从小白到大师,这里有一份Pandas入门指南
  48. [Python] 茎叶图和复合饼图的画法
  49. [Python interface automation] - regular use case parameterization
  50. Translation: practical Python Programming 02_ 02_ Containers
  51. Two years of Java, to write Python and go
  52. Translation: practical Python Programming 02_ 02_ Containers
  53. Two years of Java, to write Python and go
  54. Python-geoplot 空间核密度估计图绘制
  55. Python-seaborn 经济学人经典图表仿制
  56. python空间绘图- regionmask掩膜操作示例
  57. Python 空间绘图 - Cartopy 经纬度添加
  58. Python-pykrige包-克里金(Kriging)插值计算及可视化绘制
  59. Python 批量重采样、掩膜、坡度提取
  60. python - 多种交通方式可达圈分析