mongodb
半自动创建索引和分片
-
为什么要写
- 项目上经常性报表数据延迟,大多数都是
mongodb
集合中对应字段未添加。 - 脚本链接版本是基于链接
mongodb_3.2.12
版本,更高版本,可能需要更换连接驱动。
- 项目上经常性报表数据延迟,大多数都是
-
如何实现
- 通过读取本地文件中有集合对应索引,和分配对应片键key,分别存储对应列表。
- 在通过读取配置文件开关,对应获取
dbname
,然后根据dbname
,去获取对应集合中的索引和sharded
状态,在依次创建索引和sharded
分片。
-
后续优化
- 可以把当前使用的key相关信息,在页面保存,然后通过接口自动获取对应最新的key和分片的片键,来达到自动更新创建索引。
-
注意点
- 如何dbname中的集合数据量特别大时,则需要在业务低峰时运行。
脚本代码
- 详细脚本代码
#!/usr/bin/python3
import configparser,logging.config,sys
import pymongo,ast
# from pymongo import MongoClient
from datetime import datetime,timedelta
from urllib import parse
def init_mongodb(MongoDBAuth):
if mongodb_auth:
username = parse.quote_plus(mongodb_user)
password = parse.quote_plus(mongodb_passwd)
ConnPasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/"
try:
clients = pymongo.MongoClient(ConnPasswd)
logger.info("init mongodb conn: " + ConnPasswd)
return clients
except Exception as e:
logger.info("use mongodb user pass conn err: " + str(e))
return False
else:
try:
clients = pymongo.MongoClient(mongodb_ip, int(mongodb_port))
logger.info("init mongodb conn: " + mongodb_ip +":" +mongodb_port)
return clients
except Exception as e:
logger.info("use mongodb user pass conn err: " + str(e))
return False
#查看出全部db
def get_mongodb_dbname():
db_names_list = []
db_names = mongo_client.list_database_names()
for db_name in db_names:
db_names_list.append(db_name)
for filter_dbname in need_filter_dbname_list:
if filter_dbname in db_names_list:
db_names_list.remove(filter_dbname)
logger.info("delete need filter dbname: " + filter_dbname)
# logger.info("get all db_name: " +str(db_names_list))
return db_names_list
#查询出db中全部表
def get_mongodb_tables(entid):
db_collections_list = []
db=mongo_client[entid]
collections = db.list_collection_names()
for collection in collections:
db_collections_list.append(collection)
logger.info("get " + entid + " all collections: " +str(db_collections_list))
return db_collections_list
#查询集合中索引索引和是否分片
def get_index_key_tables(entid,collection_name):
index_list = []
formatted_results = []
db=mongo_client[entid]
collection=db[collection_name]
indexes = collection.list_indexes()
ns_name = entid + "." + collection_name
for result in indexes:
formatted_result = {k.upper(): v for k, v in result.items()}
each_key = formatted_result.get("KEY")
ns_name = formatted_result.get("NS")
ok_index = {key: value for key, value in each_key.items()}
index_list.append(ok_index)
index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d['_id'] == 1)]
collection_stats = db.command("collstats", collection_name)
collection_sharded = collection_stats.get("sharded", False)
logger.info("get now In the collection " + ns_name + " index: " +str(index_list))
logger.info("get now In the collection " + ns_name + " sharded status: " +str(collection_sharded))
return index_list,collection_sharded
#创建集合索引
def craete_index(entid,collection_name,index):
db=mongo_client[entid]
collection=db[collection_name]
logger.info("need craete index: " + entid +"."+collection_name + " : "+ str(index))
# index = (list(index.keys())[0], list(index.values())[0])
index = [(k, v) for k, v in index.items()]
result = collection.create_index(index)
logger.info("mongodb " +entid +"."+collection_name + " create index return msg: " + str(result) )
#查看对应dbname是否已经是shards,弃用
def is_database_sharded(database_name):
db = mongo_client["admin"]
sharded_databases = db.command("listshards")["shards"]
for shard in sharded_databases:
if database_name in db.command("listdatabases")["databases"]:
return True
return False
#创建分片索引片键
def create_sharded_func(entid, collection_name, shard_key):
db = mongo_client["admin"]
collection_path = '{}.{}'.format(entid, collection_name)
logger.info("need craete sharded key : " + collection_path + " : " + str(shard_key))
sharding_colunm,sharding_type = "",""
for key, value in shard_key.items():
sharding_colunm= key
sharding_type = value
try:
db.command('enableSharding', entid)
except Exception as e:
logger.error("create dbname sharded key error: return: " + str(e))
try:
result = db.command('shardCollection', collection_path,key = {sharding_colunm:sharding_type})
logger.info(entid + "." + collection_path + " create sharded key return: " + str(result))
except Exception as e:
logger.error("create sharded key error: return: " + str(e))
#读取文件获取对应索引和片键key信息
def read_file_index(index_file):
index_list = []
Shard_list = []
with open(index_file, 'r') as f:
for line in f.readlines():
line = line.replace(" ", "")
#通过mongodbShard: 来区分那个片键的可以,写
# print(line)
if "mongodbShard:" not in line:
table, key_str = line.strip().split("=")
key = ast.literal_eval(key_str)
index_list.append({table: key})
else:
Shard_key_str = line.strip().split("mongodbShard:")[1]
Shard_key_str = ast.literal_eval(Shard_key_str)
Shard_list.append(Shard_key_str)
return index_list,Shard_list
if __name__=="__main__":
cfgpath = "./cfg/config.ini"
conf = configparser.ConfigParser()
conf.read(cfgpath)
mongodb_ip = conf.get("main", "mongodb_ip")
mongodb_port = conf.get("main", "mongodb_port")
mongodb_auth = conf.getboolean("main", "mongodb_auth")
mongodb_user = conf.get("main", "mongodb_user")
mongodb_passwd = conf.get("main", "mongodb_passwd")
mongodb_auth_db = conf.get("main", "mongodb_auth_db")
need_filter_dbname = conf.get("main", "need_filter_dbname")
need_create_index_table = conf.get("main", "need_create_index_table")
need_create_index_table_list = [item for item in need_create_index_table.split(",") if item != ' ']
index_file = conf.get("main", "index_file")
auth_get_entid = conf.getboolean("main", "auth_get_entid")
is_open_Shard = conf.getboolean("main", "is_open_Shard")
assign_index_open = conf.getboolean("main", "assign_index_open")
need_filter_dbname_list = [item for item in need_filter_dbname.split(",") if item != ' ']
# need_filter_dbname_list = need_filter_dbname.split(",")
#获取配置项
all_ent_id = conf.get("main", "ent_id")
get_dbname_list = all_ent_id.split(",")
logging.config.fileConfig("./cfg/logger.conf")
logger = logging.getLogger("rotatfile")
# 读取索引文件内容
all_index_list,Shard_key_list = read_file_index(index_file)
if len(all_index_list) != 0:
logger.info("now from file read index info: " + str(all_index_list))
logger.info("now from file read Shard key info: " + str(Shard_key_list))
else:
logger.error("index file switch index list fail")
sys.exit(11)
# 初始化 MongoDB
mongo_client = init_mongodb(mongodb_auth)
if mongo_client:
logger.info("MongoDB init successfully")
else:
logger.error("Failed to initialize MongoDB")
sys.exit(10)
if auth_get_entid:
get_dbname_list = get_mongodb_dbname()
logger.info("auth get all dbname list: " + str(get_dbname_list))
else:
logger.info("file get dbname list: " + str(get_dbname_list))
for dbname in get_dbname_list:
get_tables_list = get_mongodb_tables(dbname)
for table in get_tables_list:
talbe_index,table_sharded_status = get_index_key_tables(dbname,table)
file_table_index = [item for item in all_index_list if table in item]
file_Shard_key = [item for item in Shard_key_list if table in item]
for file_keys in file_table_index:
for each_key in file_keys.get(table):
if each_key in talbe_index:
pass
else:
if table in need_create_index_table_list and assign_index_open == True:
logger.info("create assign table index: " + table)
craete_index(dbname,table,each_key)
else:
craete_index(dbname,table,each_key)
if table_sharded_status == False and is_open_Shard == True:
if len(file_Shard_key) != 0:
file_Shard_key = file_Shard_key[0].get(table)
create_sharded_func(dbname,table,file_Shard_key)
else:
logger.error(dbname +"."+ table +" from index file not matching reach corresponding sharding key info")
else:
logger.info("Configuration items is_open_Shard Value setting False not create sharded Key")
- 配置文件说明,根据需要自行配置相关配置文件。
[DEFAULT]
mongodb_ip = 192.168.127.44
mongodb_port = 30000
mongodb_auth = False
mongodb_user = admin
mongodb_passwd = test@123
mongodb_auth_db = admin
#从全部dbname中进行过滤不需要处理的dbname
need_filter_dbname = local,config,admin
#指定需要创建索引的集合,使用逗号分割
need_create_index_table = record_connect_table ,test_item_3
#指定创建索引开关False,则是集合中心查询出多少,对应本地文件对比有多少就创建多少。
assign_index_open = True
[main]
#是否自动获取对应mongodb中全部dbname
auth_get_entid = False
ent_id = test01,test02,20230711
#本地读取索引和分片key片键
index_file = ./cfg/IndexKey_ShardKey_file.txt
#是否开启集合分片
is_open_Shard = True
- 本地key分片信息文件说明,有新增key对应新行进行添加。
record_connect_table = [{"start_time":"hashed"},{"start_time":-1,"end_time":-1},{"agent_id":1,"start_time":-1}]
mongodbShard:{"record_connect_table":{"start_time":"hashed"}}
脚本运行
- 脚本运行,自行编译成二进制文件。
[root@test ~]# tar xf mongodb_create_index_key.tar.gz ; cd mongodb_create_index_key
[root@test mongodb_create_index_key]# vim cfg/config.ini #对照上面配置文件说明进行修改。
[root@test mongodb_create_index_key]# ./create_index_sharded_key
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:25 INFO init mongodb conn: 127.0.0.1:27017
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:171 INFO MongoDB init successfully
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:180 INFO file get dbname list: ['test01', 'test02', 'test003']
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:51 INFO get test01 all collections: []
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:51 INFO get test02 all collections: []
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:51 INFO get test003 all collections: ['record_connect_table', 'agent','test1']
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:72 INFO get test003.record_connect_table index: [{'tset_type': 'hashed'}, {'starttime': -1}, {'age': 1}, {'typem': 1}, {'end_time': -1}]
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:73 INFO get test003.record_connect_table sharded status: False
-
验证对应登录mongodb查看对应集合是否已经创建索引和分片。
-
二进制文件下载地址
mongodb_create_index_key.tar.gz
评论区