侧边栏壁纸
博主头像
平凡的运维之路博主等级

行动起来,活在当下

  • 累计撰写 49 篇文章
  • 累计创建 25 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

mongodb半自动创建索引和分片

平凡的运维之路
2024-08-23 / 0 评论 / 0 点赞 / 44 阅读 / 17923 字

mongodb半自动创建索引和分片

  • 为什么要写

    • 项目上经常性报表数据延迟,大多数都是mongodb集合中对应字段未添加。
    • 脚本链接版本是基于链接mongodb_3.2.12版本,更高版本,可能需要更换连接驱动。
  • 如何实现

    • 通过读取本地文件中有集合对应索引,和分配对应片键key,分别存储对应列表。
    • 在通过读取配置文件开关,对应获取dbname,然后根据dbname,去获取对应集合中的索引和sharded状态,在依次创建索引和sharded分片。
  • 后续优化

    • 可以把当前使用的key相关信息,在页面保存,然后通过接口自动获取对应最新的key和分片的片键,来达到自动更新创建索引。
  • 注意点

    • 如何dbname中的集合数据量特别大时,则需要在业务低峰时运行。

脚本代码

  • 详细脚本代码
#!/usr/bin/python3
import configparser,logging.config,sys
import pymongo,ast
# from pymongo import MongoClient
from datetime import datetime,timedelta
from urllib import parse

def init_mongodb(MongoDBAuth):
    if mongodb_auth:
        username = parse.quote_plus(mongodb_user)
        password = parse.quote_plus(mongodb_passwd)
        ConnPasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/"
        try:
            clients = pymongo.MongoClient(ConnPasswd)
            logger.info("init mongodb conn: " + ConnPasswd)
            return clients
        except  Exception as e:
            logger.info("use mongodb user pass conn err: " +  str(e))
            return False
    else:
        try:
            clients = pymongo.MongoClient(mongodb_ip, int(mongodb_port))
            logger.info("init mongodb conn: " + mongodb_ip +":" +mongodb_port)
            return clients
        except  Exception as e:
            logger.info("use mongodb user pass conn err: " + str(e))
            return False

#查看出全部db
def get_mongodb_dbname():
    db_names_list = []
    db_names  = mongo_client.list_database_names()
    for db_name  in db_names:
        db_names_list.append(db_name)
    for filter_dbname in need_filter_dbname_list:
        if filter_dbname in db_names_list:
            db_names_list.remove(filter_dbname)
            logger.info("delete need filter dbname: " + filter_dbname)
    # logger.info("get all db_name: " +str(db_names_list))
    return db_names_list

#查询出db中全部表
def get_mongodb_tables(entid):
    db_collections_list = []
    db=mongo_client[entid]
    collections = db.list_collection_names()
    for collection  in collections:
        db_collections_list.append(collection)
    logger.info("get " + entid + " all collections: " +str(db_collections_list))
    return db_collections_list

#查询集合中索引索引和是否分片
def get_index_key_tables(entid,collection_name):
    index_list = []
    formatted_results = []
    db=mongo_client[entid]
    collection=db[collection_name]
    indexes = collection.list_indexes()
    ns_name = entid + "." + collection_name
    for result  in indexes:
        formatted_result = {k.upper(): v for k, v in result.items()}
        each_key = formatted_result.get("KEY")
        ns_name = formatted_result.get("NS")
        ok_index = {key: value for key, value in each_key.items()}
        index_list.append(ok_index)
    index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d['_id'] == 1)]

    collection_stats = db.command("collstats", collection_name)
    collection_sharded = collection_stats.get("sharded", False)
    logger.info("get now In the collection " + ns_name + " index: " +str(index_list))
    logger.info("get now In the collection " + ns_name + " sharded status: " +str(collection_sharded))
    return index_list,collection_sharded


#创建集合索引
def craete_index(entid,collection_name,index):
    db=mongo_client[entid]
    collection=db[collection_name]
    logger.info("need craete index: " + entid +"."+collection_name + " : "+ str(index))
    
    # index = (list(index.keys())[0], list(index.values())[0])
    index = [(k, v) for k, v in index.items()]
    result = collection.create_index(index)
    logger.info("mongodb " +entid +"."+collection_name + " create index return msg: " + str(result) )

#查看对应dbname是否已经是shards,弃用
def is_database_sharded(database_name):
    db = mongo_client["admin"]
    sharded_databases = db.command("listshards")["shards"]
    for shard in sharded_databases:
        if database_name in db.command("listdatabases")["databases"]:
            return True
    return False

#创建分片索引片键
def create_sharded_func(entid, collection_name, shard_key):
    db = mongo_client["admin"]
    collection_path = '{}.{}'.format(entid, collection_name)
    logger.info("need craete sharded key : " + collection_path + " : " + str(shard_key))
    sharding_colunm,sharding_type =  "",""
    for key, value in shard_key.items():
        sharding_colunm= key 
        sharding_type = value
    try:
        db.command('enableSharding', entid)
    except  Exception as e:
        logger.error("create dbname sharded key error: return: " + str(e))

    try:
        result = db.command('shardCollection', collection_path,key = {sharding_colunm:sharding_type})
        logger.info(entid + "." + collection_path + " create sharded key return: " + str(result))
    except  Exception as e:
        logger.error("create sharded key error: return: " + str(e))

#读取文件获取对应索引和片键key信息
def read_file_index(index_file):
    index_list = []
    Shard_list = []
    with open(index_file, 'r') as f:
        for line in f.readlines():
            line = line.replace(" ", "")
            #通过mongodbShard: 来区分那个片键的可以,写
            # print(line)
            if "mongodbShard:" not in line:
                table, key_str = line.strip().split("=")
                key = ast.literal_eval(key_str)
                index_list.append({table: key})
            else:
                Shard_key_str = line.strip().split("mongodbShard:")[1]
                Shard_key_str = ast.literal_eval(Shard_key_str)
                Shard_list.append(Shard_key_str)
    return index_list,Shard_list

if __name__=="__main__":
    cfgpath = "./cfg/config.ini"
    conf = configparser.ConfigParser()
    conf.read(cfgpath)
    mongodb_ip = conf.get("main", "mongodb_ip")
    mongodb_port = conf.get("main", "mongodb_port")
    mongodb_auth = conf.getboolean("main", "mongodb_auth")
    mongodb_user = conf.get("main", "mongodb_user")
    mongodb_passwd = conf.get("main", "mongodb_passwd")
    mongodb_auth_db = conf.get("main", "mongodb_auth_db")
    need_filter_dbname = conf.get("main", "need_filter_dbname")
    need_create_index_table = conf.get("main", "need_create_index_table")
    need_create_index_table_list = [item for item in need_create_index_table.split(",") if item != ' ']
    index_file = conf.get("main", "index_file")
    auth_get_entid = conf.getboolean("main", "auth_get_entid")
    is_open_Shard = conf.getboolean("main", "is_open_Shard")
    assign_index_open = conf.getboolean("main", "assign_index_open")
    need_filter_dbname_list = [item for item in need_filter_dbname.split(",") if item != ' ']
    # need_filter_dbname_list = need_filter_dbname.split(",")
    
    #获取配置项
    all_ent_id = conf.get("main", "ent_id")
    get_dbname_list = all_ent_id.split(",")
    logging.config.fileConfig("./cfg/logger.conf")
    logger = logging.getLogger("rotatfile")


    # 读取索引文件内容
    all_index_list,Shard_key_list = read_file_index(index_file)
    if len(all_index_list) != 0:
        logger.info("now from file read index info: " + str(all_index_list))
        logger.info("now from file read Shard key info: " + str(Shard_key_list))
    else:
        logger.error("index file switch index list fail")
        sys.exit(11)

    # 初始化 MongoDB
    mongo_client = init_mongodb(mongodb_auth)
    if mongo_client:
        logger.info("MongoDB init successfully")
    else:
        logger.error("Failed to initialize MongoDB")
        sys.exit(10)

    if auth_get_entid:
        get_dbname_list = get_mongodb_dbname()
        logger.info("auth get all dbname list: " + str(get_dbname_list))
    else:
        logger.info("file get dbname list: " + str(get_dbname_list))

    for dbname in get_dbname_list:
        get_tables_list = get_mongodb_tables(dbname)
        for table in get_tables_list:
            talbe_index,table_sharded_status = get_index_key_tables(dbname,table)
            file_table_index = [item for item in all_index_list if table in item]
            file_Shard_key = [item for item in Shard_key_list if table in item]
            for file_keys in file_table_index:
                for  each_key in file_keys.get(table):
                    if each_key in talbe_index:
                        pass
                    else:
                        if  table in need_create_index_table_list and assign_index_open == True:
                            logger.info("create assign table index: " + table)
                            craete_index(dbname,table,each_key)
                        else:
                            craete_index(dbname,table,each_key)
            if table_sharded_status == False and is_open_Shard == True:
                if len(file_Shard_key) != 0:
                    file_Shard_key = file_Shard_key[0].get(table)
                    create_sharded_func(dbname,table,file_Shard_key)
                else:
                    logger.error(dbname +"."+ table +" from index file not matching reach corresponding sharding key info")
            else:
                logger.info("Configuration items is_open_Shard Value setting False not create  sharded Key")
  • 配置文件说明,根据需要自行配置相关配置文件。
[DEFAULT]
mongodb_ip = 192.168.127.44
mongodb_port = 30000
mongodb_auth = False
mongodb_user = admin
mongodb_passwd =  test@123
mongodb_auth_db = admin
#从全部dbname中进行过滤不需要处理的dbname
need_filter_dbname = local,config,admin
#指定需要创建索引的集合,使用逗号分割
need_create_index_table  = record_connect_table ,test_item_3
#指定创建索引开关False,则是集合中心查询出多少,对应本地文件对比有多少就创建多少。
assign_index_open  = True

[main]
#是否自动获取对应mongodb中全部dbname
auth_get_entid = False
ent_id  = test01,test02,20230711
#本地读取索引和分片key片键
index_file = ./cfg/IndexKey_ShardKey_file.txt
#是否开启集合分片
is_open_Shard =  True
  • 本地key分片信息文件说明,有新增key对应新行进行添加。
record_connect_table = [{"start_time":"hashed"},{"start_time":-1,"end_time":-1},{"agent_id":1,"start_time":-1}]
mongodbShard:{"record_connect_table":{"start_time":"hashed"}}

脚本运行

  • 脚本运行,自行编译成二进制文件。
[root@test ~]# tar xf mongodb_create_index_key.tar.gz ; cd mongodb_create_index_key
[root@test mongodb_create_index_key]#  vim cfg/config.ini #对照上面配置文件说明进行修改。
[root@test mongodb_create_index_key]# ./create_index_sharded_key 
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:25 INFO init mongodb conn: 127.0.0.1:27017
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:171 INFO MongoDB init successfully
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:180 INFO file get dbname list: ['test01', 'test02', 'test003']
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:51 INFO get test01 all collections: []
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:51 INFO get test02 all collections: []
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:51 INFO get test003 all collections: ['record_connect_table', 'agent','test1']
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:72 INFO get test003.record_connect_table index: [{'tset_type': 'hashed'}, {'starttime': -1}, {'age': 1}, {'typem': 1}, {'end_time': -1}]
2024-08-22 18:12:53 140258642511680 create_index_sharded_key.py:73 INFO get test003.record_connect_table sharded status: False
0

评论区