本文共 6422 字,大约阅读时间需要 21 分钟。
一、要点:
知道key验证存放的目录
key在验证是手动还自动
对不在线的主机的处理
minion_id的命名规范
二、使用的技术栈
saltstack 相关的库:salt.config,salt.client,salt.runner
使用redis 存放两个数据库,第一个为存为字典,用于存放minion_id与物理IP的对应,另一个存为集合,用于项目_业务命名的方式包含相应的主机
使用watchdog对目录变更监控,事件触发机制
三、完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | # coding:utf8 # Created by IntelliJ IDEA. # User: King.gp # Date: 2016/10/9 # Time: 10:55 # To change this template use File | Settings | File Templates. import time import re import sys import redis import logging import salt.config as saltc import salt.runner as saltr import salt.client as SaltC from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' , datefmt = '%a, %d %b %Y %H:%M:%S' , filename = 'myapp.log' , ) ################################################################################################# # 定义一个StreamHandler,将INFO级别或更高的日志信息打印到标准错误,并将其添加到当前的日志处理对象# # console = logging.StreamHandler() # console.setLevel(logging.INFO) # formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') # console.setFormatter(formatter) # logging.getLogger('').addHandler(console) ################################################################################################ class MonitorHandler(FileSystemEventHandler): # time localtime = time.asctime(time.localtime(time.time())) # 文件或目录被移动 def on_moved( self , event): logging.info(MonitorHandler.localtime + " - Event :文件/目录: {0} 被移动!" . format (event.src_path)) call_func() # 文件或目录被创建 def on_created( self , event): logging.info(MonitorHandler.localtime + " - Event: 文件/目录: {0} 被创建" . format (event.src_path)) call_func() # 文件或目录被删除 def on_deleted( self , event): logging.info(MonitorHandler.localtime + " - Event :文件/目录: {0} 被删除!" . format (event.src_path)) call_func() # 文件或目录被修改 def on_modified( self , event): logging.info(MonitorHandler.localtime + " - Event :文件/目录: {0} 被修改!" . format (event.src_path)) call_func() def client(): slc = SaltC.LocalClient() return slc class NodeGroup( object ): def __init__( self ): self .member = redis.StrictRedis(host = '127.0.0.1' , port = 6379 , db = 6 , ) # 字典数据 self .project = redis.StrictRedis(host = '127.0.0.1' , port = 6379 , db = 7 , ) # 集合数据 def update_info( self ): self .member.flushdb() self .project.flushdb() with open ( 'host' , 'rb' ) as fd: lines = fd.readlines() pattern = re. compile (r '[a-z]+' ) for line in lines: line = line.strip().split() # 1.2.3.4 web01.crms.com self .member. set (line[ 1 ], line[ 0 ]) # web01.crms.com profession_number = line[ 1 ].split( "." )[ 0 ] # web01 m = re.match(pattern, profession_number) if m: profession = m.group() project_suffix = line[ 1 ].split( '.' )[ - 2 :] project = '.' .join(project_suffix) # crms.com self .project.sadd( '_' .join([project, profession]), ' ') # crms.com_web: ' ' member_list = [] match_dict = {} for category in self .project.keys(): category_pattern = re. compile (category.split( "_" )[ 1 ]) # vm, logstash, elasticsearch pattern_member_list = [] for key in self .member.keys(): # ['test.logstash.yinker.com', 'test.elastic.yinker.com', 'test.vm.yinker.com'] category_match = category_pattern.search(key) # web we01.crms.com if category_match: pattern_member_list.append(key) match_dict[category_pattern.pattern] = pattern_member_list if match_dict not in member_list: member_list.append(match_dict) # members_dict = {} for p in self .project.keys(): pattern = re. compile (p.split( "_" )[ 0 ]) for k, values in member_list[ 0 ].items(): members_list = [] for v in values: match = pattern.search(v) # suffix , if match: members_list.append(match.string) else : continue # members_dict['_'.join([pattern.pattern, k])] = members_list # print members_list self .project.sadd( '_' .join([pattern.pattern, k]), members_list) self .project.srem( '_' .join([pattern.pattern, k]), '') def append_config( self ): with open ( "/etc/salt/master.d/nodegroups.conf" , "w" ) as fd: fd.write( "nodegroups:\n" ) with open ( "/etc/salt/master.d/nodegroups.conf" , "a+" )as f: for mem in self .project.keys(): # zookeeperg0: 'L@sh-zookeeperg0-10.168.1.32 # ["['idp01.crms.com','idp02.crms.com']"] f.write( " " + mem + ": " + "'" + "L@" + (''.join( [i.replace("['", '').replace("']", '').replace("'" , '') for i in self .project.smembers(mem)]) + "'" + '\n ').replace(' ', ' ')) def control_hosts(): priv_opts = saltc.master_config( '/etc/salt/master' ) priv_runner = saltr.RunnerClient(priv_opts) # r = priv_runner() host_down = priv_runner.cmd( "manage.down" , [ "removekeys=True" ]) # 移除DOWN状态的minion if len (host_down) > 0 : logging.info(MonitorHandler.localtime + "minion_id: {0} 己经下线 " . format ( ' ' .join(host_down))) local = client() host_items = local.cmd( "*" , "grains.item" , [ 'ip4_interfaces' , 'id' ]) time.sleep( 3 ) if host_items: raw = '' for v in host_items.values(): raw = '{0}{1}\n' . format (raw, ' ' .join([v[ 'ip4_interfaces' ][ 'eth0' ][ 0 ], v[ 'id' ]])) with open ( "host" , "w+" ) as f: f.write(raw) return True else : logging.error(MonitorHandler.localtime + "拉取grains信息失败" ) return False def call_func(): status = control_hosts() if status: group_agg = NodeGroup() group_agg.update_info() group_agg.append_config() else : logging.error(MonitorHandler.localtime + "拉取grains信息失败" ) sys.exit(status = 127 ) if __name__ = = "__main__" : Monitor_Dir = '/etc/salt/pki/master/minions' event_handler = MonitorHandler() observer = Observer() NodeGroup_File = '/etc/salt/master.d/nodegroups.conf' observer.schedule(event_handler, path = Monitor_Dir, recursive = True ) observer.start() try : while True : time.sleep( 10 ) except KeyboardInterrupt: observer.stop() observer.join() |
四、关于缺陷
关于怎么判断minion不在线的界定,是因为网络抖动,还是真的下线;
如果出来这样的问题是移除,还是不处理,如何取舍,因为这涉及一个问题,利用分组无法准确的执行任务,因为分组依赖于salt的允许的key对应的主机;
启动时会提示递归溢出,所有移除的主机都会输出到console上;
请自行添加到supervisor中。