Sensu Server是Sensu的核心组件,是Sensu业务逻辑实现的核心组件。
Sensu Server的入口在sensu/bin/sensu-server
options = Sensu::CLI.read
Sensu::Server::Process.run(options)
和这里Client几乎一样,解析命令行参数,调用Process.run
run方法也是,创建一个Server的服务实例,启动服务:server.start,然后处理信号
再看start方法
class="ruby">def start setup_redis setup_transport bootstrap end
?
setup_redis:连接redis;
setup_transport:连接RabbitMQ;
bootstrap:启动;
接下来看bootstrap方法
def bootstrap setup_keepalives setup_results setup_master_monitor @state = :running end
?
setup_keepalives:保持和客户端的联系:订阅客户端的注册的消息,当客户端注册过来(客户端每过20秒会注册自己),则将客户端添加到客户端队列里;
setup_results:订阅check的结果并处理结果;
setup_master_monitor:为保证server的HA定时选举master;
最后将该server的状态设置为running
这时候server就启动起来了
重点关注:process_check_result方法,该方法主要对check的结果存储,并对相同错误累计,并处理结果
def process_check_result(result) @logger.debug("processing result", :result => result) @redis.get("client:#{result[:client]}") do |client_json| # 找到result json里对应的client unless client_json.nil? client = MultiJson.load(client_json) check = case when @settings.check_exists?(result[:check][:name]) && !result[:check][:standalone] # 如果check不在配置里,且不为standalone check @settings[:checks][result[:check][:name]].merge(result[:check]) # 则添加一个check else result[:check] end aggregate_check_result(result) if check[:aggregate] # 如果需要聚合,聚合check结果,稍后再看 store_check_result(client, check) do # 存储check结果 check_history(client, check) do |history, total_state_change| # 计算状态改变百分比 check[:history] = history check[:total_state_change] = total_state_change # 创建一个事件,如果该次check失败,则增加该事件的失败次数,并记录,如果之前失败,这次成功,则删除该事件的失败记录 update_event_registry(client, check) do |event| process_event(event) # 调用handler处理事件 end end end else @logger.warn("client not in registry", :client => result[:client]) end end end
?
存储结果的方法:store_check_result
def store_check_result(client, check, &callback) @redis.sadd("history:#{client[:name]}", check[:name]) # 添加该check到该client的history里,如果已存在则忽略,history:#{client[:name]}包含了该client的所有check result_key = "#{client[:name]}:#{check[:name]}" history_key = "history:#{result_key}" @redis.rpush(history_key, check[:status]) do # 将check的结果push到历史队列里 @redis.set("execution:#{result_key}", check[:executed]) @redis.ltrim(history_key, -21, -1) # 只保存21条历史记录 callback.call end end
?
该方法将check的结果添加到历史队列里,即:redis的队列里。并且只保存21条历史记录。
处理事件方法:process_event,该方法获取事件里的handlers,并循环处理,在处理前会过滤和修改。
def process_event(event) log_level = event[:check][:type] == "metric" ? :debug : :info # 如果是metric,日志级别为debug @logger.send(log_level, "processing event", :event => event) event_bridges(event) handler_list = Array((event[:check][:handlers] || event[:check][:handler]) || "default") # 找到处理的handler(s) handlers = derive_handlers(handler_list) # 根据handler名字找到handler定义 handlers.each do |handler| # 循环处理 @handling_event_count += 1 filter_event(handler, event) do |event| # 过滤事件 mutate_event(handler, event) do |event_data| # 修改事件 handle_event(handler, event_data) # 处理事件 end end end end
?
handler的handle_event方法,记录日志,调用handler_type_router,该方法根据不同的处理类型调用不同的处理方法
def handler_type_router(handler, event_data) case handler[:type] when "pipe" pipe_handler(handler, event_data) when "tcp" tcp_handler(handler, event_data) when "udp" udp_handler(handler, event_data) when "transport" transport_handler(handler, event_data) when "extension" handler_extension(handler, event_data) end end
?
pipe_handler:执行配置的脚本,并日志记录结果;
?tcp_handler:发送数据到handler配置的TCP主机和端口;
udp_handler:和tcp_handler一样,只是发送的时UDP包;
transport_handler:发布一个事件到RabbitMQ;
handler_extension:处理扩展;
?