Class | MCollective::RPC::Client |
In: |
lib/mcollective/rpc/client.rb
|
Parent: | Object |
The main component of the Simple RPC client system, this wraps around MCollective::Client and just brings in a lot of convention and standard approached.
agent | [R] | |
batch_mode | [R] | |
batch_size | [R] | |
batch_sleep_time | [R] | |
client | [R] | |
config | [RW] | |
ddl | [R] | |
discovery_method | [R] | |
discovery_options | [R] | |
filter | [RW] | |
limit_method | [R] | |
limit_seed | [R] | |
limit_targets | [R] | |
output_format | [R] | |
progress | [RW] | |
reply_to | [RW] | |
stats | [R] | |
timeout | [RW] | |
ttl | [RW] | |
verbose | [RW] |
Creates a stub for a remote agent, you can pass in an options array in the flags which will then be used else it will just create a default options array with filtering enabled based on the standard command line use.
rpc = RPC::Client.new("rpctest", :configfile => "client.cfg", :options => options)
You typically would not call this directly you‘d use MCollective::RPC#rpcclient instead which is a wrapper around this that can be used as a Mixin
# File lib/mcollective/rpc/client.rb, line 20 20: def initialize(agent, flags = {}) 21: if flags.include?(:options) 22: initial_options = flags[:options] 23: 24: elsif @@initial_options 25: initial_options = Marshal.load(@@initial_options) 26: 27: else 28: oparser = MCollective::Optionparser.new({:verbose => false, :progress_bar => true, :mcollective_limit_targets => false, :batch_size => nil, :batch_sleep_time => 1}, "filter") 29: 30: initial_options = oparser.parse do |parser, opts| 31: if block_given? 32: yield(parser, opts) 33: end 34: 35: Helpers.add_simplerpc_options(parser, opts) 36: end 37: 38: @@initial_options = Marshal.dump(initial_options) 39: end 40: 41: @initial_options = initial_options 42: @stats = Stats.new 43: @agent = agent 44: @timeout = initial_options[:timeout] || 5 45: @verbose = initial_options[:verbose] 46: @filter = initial_options[:filter] 47: @config = initial_options[:config] 48: @discovered_agents = nil 49: @progress = initial_options[:progress_bar] 50: @limit_targets = initial_options[:mcollective_limit_targets] 51: @limit_method = Config.instance.rpclimitmethod 52: @limit_seed = initial_options[:limit_seed] || nil 53: @output_format = initial_options[:output_format] || :console 54: @force_direct_request = false 55: @reply_to = initial_options[:reply_to] 56: @discovery_method = initial_options[:discovery_method] 57: @discovery_options = initial_options[:discovery_options] || [] 58: @force_display_mode = initial_options[:force_display_mode] || false 59: 60: @batch_size = Integer(initial_options[:batch_size] || 0) 61: @batch_sleep_time = Float(initial_options[:batch_sleep_time] || 1) 62: @batch_mode = @batch_size > 0 63: 64: agent_filter agent 65: 66: @client = MCollective::Client.new(@config) 67: @client.options = initial_options 68: 69: @discovery_timeout = discovery_timeout 70: 71: @collective = @client.collective 72: @ttl = initial_options[:ttl] || Config.instance.ttl 73: 74: # if we can find a DDL for the service override 75: # the timeout of the client so we always magically 76: # wait appropriate amounts of time. 77: # 78: # We add the discovery timeout to the ddl supplied 79: # timeout as the discovery timeout tends to be tuned 80: # for local network conditions and fact source speed 81: # which would other wise not be accounted for and 82: # some results might get missed. 83: # 84: # We do this only if the timeout is the default 5 85: # seconds, so that users cli overrides will still 86: # get applied 87: begin 88: @ddl = DDL.new(agent) 89: @stats.ddl = @ddl 90: @timeout = @ddl.meta[:timeout] + @discovery_timeout if @timeout == 5 91: rescue Exception => e 92: Log.debug("Could not find DDL: #{e}") 93: @ddl = nil 94: end 95: 96: # allows stderr and stdout to be overridden for testing 97: # but also for web apps that might not want a bunch of stuff 98: # generated to actual file handles 99: if initial_options[:stderr] 100: @stderr = initial_options[:stderr] 101: else 102: @stderr = STDERR 103: @stderr.sync = true 104: end 105: 106: if initial_options[:stdout] 107: @stdout = initial_options[:stdout] 108: else 109: @stdout = STDOUT 110: @stdout.sync = true 111: end 112: end
Sets the agent filter
# File lib/mcollective/rpc/client.rb, line 379 379: def agent_filter(agent) 380: @filter["agent"] << agent 381: @filter["agent"].compact! 382: reset 383: end
# File lib/mcollective/rpc/client.rb, line 655 655: def aggregate_reply(reply, aggregate) 656: return nil unless aggregate 657: 658: aggregate.call_functions(reply) 659: return aggregate 660: rescue Exception => e 661: Log.error("Failed to calculate aggregate summaries for reply from %s, calculating summaries disabled: %s: %s (%s)" % [reply[:senderid], e.backtrace.first, e.to_s, e.class]) 662: return nil 663: end
Sets the batch size, if the size is set to 0 that will disable batch mode
# File lib/mcollective/rpc/client.rb, line 576 576: def batch_size=(limit) 577: raise "Can only set batch size if direct addressing is supported" unless Config.instance.direct_addressing 578: 579: @batch_size = Integer(limit) 580: @batch_mode = @batch_size > 0 581: end
# File lib/mcollective/rpc/client.rb, line 583 583: def batch_sleep_time=(time) 584: raise "Can only set batch sleep time if direct addressing is supported" unless Config.instance.direct_addressing 585: 586: @batch_sleep_time = Float(time) 587: end
Handles traditional calls to the remote agents with full stats blocks, non blocks and everything else supported.
Other methods of calling the nodes can reuse this code by for example specifying custom options and discovery data
# File lib/mcollective/rpc/client.rb, line 782 782: def call_agent(action, args, opts, disc=:auto, &block) 783: # Handle fire and forget requests and make sure 784: # the :process_results value is set appropriately 785: # 786: # specific reply-to requests should be treated like 787: # fire and forget since the client will never get 788: # the responses 789: if args[:process_results] == false || @reply_to 790: return fire_and_forget_request(action, args) 791: else 792: args[:process_results] = true 793: end 794: 795: # Do discovery when no specific discovery array is given 796: # 797: # If an array is given set the force_direct_request hint that 798: # will tell the message object to be a direct request one 799: if disc == :auto 800: discovered = discover 801: else 802: @force_direct_request = true if Config.instance.direct_addressing 803: discovered = disc 804: end 805: 806: req = new_request(action.to_s, args) 807: 808: message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => opts[:filter], :options => opts}) 809: message.discovered_hosts = discovered.clone 810: 811: results = [] 812: respcount = 0 813: 814: if discovered.size > 0 815: message.type = :direct_request if @force_direct_request 816: 817: if @progress && !block_given? 818: twirl = Progress.new 819: @stdout.puts 820: @stdout.print twirl.twirl(respcount, discovered.size) 821: end 822: 823: aggregate = load_aggregate_functions(action, @ddl) 824: 825: @client.req(message) do |resp| 826: respcount += 1 827: 828: if block_given? 829: aggregate = process_results_with_block(action, resp, block, aggregate) 830: else 831: @stdout.print twirl.twirl(respcount, discovered.size) if @progress 832: 833: result, aggregate = process_results_without_block(resp, action, aggregate) 834: 835: results << result 836: end 837: end 838: 839: @stats.aggregate_summary = aggregate.summarize if aggregate 840: @stats.client_stats = @client.stats 841: else 842: @stderr.print("\nNo request sent, we did not discover any nodes.") 843: end 844: 845: @stats.finish_request 846: 847: RPC.stats(@stats) 848: 849: @stdout.print("\n\n") if @progress 850: 851: if block_given? 852: return stats 853: else 854: return [results].flatten 855: end 856: end
Calls an agent in a way very similar to call_agent but it supports batching the queries to the network.
The result sets, stats, block handling etc is all exactly like you would expect from normal call_agent.
This is used by method_missing and works only with direct addressing mode
# File lib/mcollective/rpc/client.rb, line 698 698: def call_agent_batched(action, args, opts, batch_size, sleep_time, &block) 699: raise "Batched requests requires direct addressing" unless Config.instance.direct_addressing 700: raise "Cannot bypass result processing for batched requests" if args[:process_results] == false 701: 702: batch_size = Integer(batch_size) 703: sleep_time = Float(sleep_time) 704: 705: Log.debug("Calling #{agent}##{action} in batches of #{batch_size} with sleep time of #{sleep_time}") 706: 707: @force_direct_request = true 708: 709: discovered = discover 710: results = [] 711: respcount = 0 712: 713: if discovered.size > 0 714: req = new_request(action.to_s, args) 715: 716: aggregate = load_aggregate_functions(action, @ddl) 717: 718: if @progress && !block_given? 719: twirl = Progress.new 720: @stdout.puts 721: @stdout.print twirl.twirl(respcount, discovered.size) 722: end 723: 724: @stats.requestid = nil 725: 726: discovered.in_groups_of(batch_size) do |hosts, last_batch| 727: message = Message.new(req, nil, {:agent => @agent, :type => :direct_request, :collective => @collective, :filter => opts[:filter], :options => opts}) 728: 729: # first time round we let the Message object create a request id 730: # we then re-use it for future requests to keep auditing sane etc 731: @stats.requestid = message.create_reqid unless @stats.requestid 732: message.requestid = @stats.requestid 733: 734: message.discovered_hosts = hosts.clone.compact 735: 736: @client.req(message) do |resp| 737: respcount += 1 738: 739: if block_given? 740: aggregate = process_results_with_block(action, resp, block, aggregate) 741: else 742: @stdout.print twirl.twirl(respcount, discovered.size) if @progress 743: 744: result, aggregate = process_results_without_block(resp, action, aggregate) 745: 746: results << result 747: end 748: end 749: 750: @stats.noresponsefrom.concat @client.stats[:noresponsefrom] 751: @stats.responses += @client.stats[:responses] 752: @stats.blocktime += @client.stats[:blocktime] + sleep_time 753: @stats.totaltime += @client.stats[:totaltime] 754: @stats.discoverytime += @client.stats[:discoverytime] 755: 756: sleep sleep_time unless last_batch 757: end 758: 759: @stats.aggregate_summary = aggregate.summarize if aggregate 760: else 761: @stderr.print("\nNo request sent, we did not discover any nodes.") 762: end 763: 764: @stats.finish_request 765: 766: RPC.stats(@stats) 767: 768: @stdout.print("\n") if @progress 769: 770: if block_given? 771: return stats 772: else 773: return [results].flatten 774: end 775: end
Sets the class filter
# File lib/mcollective/rpc/client.rb, line 355 355: def class_filter(klass) 356: @filter["cf_class"] << klass 357: @filter["cf_class"].compact! 358: reset 359: end
Sets the collective we are communicating with
# File lib/mcollective/rpc/client.rb, line 541 541: def collective=(c) 542: raise "Unknown collective #{c}" unless Config.instance.collectives.include?(c) 543: 544: @collective = c 545: @client.options = options 546: reset 547: end
Set a compound filter
# File lib/mcollective/rpc/client.rb, line 393 393: def compound_filter(filter) 394: @filter["compound"] << Matcher.create_compound_callstack(filter) 395: reset 396: end
Constructs custom requests with custom filters and discovery data the idea is that this would be used in web applications where you might be using a cached copy of data provided by a registration agent to figure out on your own what nodes will be responding and what your filter would be.
This will help you essentially short circuit the traditional cycle of:
mc discover / call / wait for discovered nodes
by doing discovery however you like, contructing a filter and a list of nodes you expect responses from.
Other than that it will work exactly like a normal call, blocks will behave the same way, stats will be handled the same way etcetc
If you just wanted to contact one machine for example with a client that already has other filter options setup you can do:
puppet.custom_request("runonce", {}, ["your.box.com"], {:identity => "your.box.com"})
This will do runonce action on just ‘your.box.com’, no discovery will be done and after receiving just one response it will stop waiting for responses
If direct_addressing is enabled in the config file you can provide an empty hash as a filter, this will force that request to be a directly addressed request which technically does not need filters. If you try to use this mode with direct addressing disabled an exception will be raise
# File lib/mcollective/rpc/client.rb, line 282 282: def custom_request(action, args, expected_agents, filter = {}, &block) 283: @ddl.validate_rpc_request(action, args) if @ddl 284: 285: if filter == {} && !Config.instance.direct_addressing 286: raise "Attempted to do a filterless custom_request without direct_addressing enabled, preventing unexpected call to all nodes" 287: end 288: 289: @stats.reset 290: 291: custom_filter = Util.empty_filter 292: custom_options = options.clone 293: 294: # merge the supplied filter with the standard empty one 295: # we could just use the merge method but I want to be sure 296: # we dont merge in stuff that isnt actually valid 297: ["identity", "fact", "agent", "cf_class", "compound"].each do |ftype| 298: if filter.include?(ftype) 299: custom_filter[ftype] = [filter[ftype], custom_filter[ftype]].flatten 300: end 301: end 302: 303: # ensure that all filters at least restrict the call to the agent we're a proxy for 304: custom_filter["agent"] << @agent unless custom_filter["agent"].include?(@agent) 305: custom_options[:filter] = custom_filter 306: 307: # Fake out the stats discovery would have put there 308: @stats.discovered_agents([expected_agents].flatten) 309: 310: # Handle fire and forget requests 311: # 312: # If a specific reply-to was set then from the client perspective this should 313: # be a fire and forget request too since no response will ever reach us - it 314: # will go to the reply-to destination 315: if args[:process_results] == false || @reply_to 316: return fire_and_forget_request(action, args, custom_filter) 317: end 318: 319: # Now do a call pretty much exactly like in method_missing except with our own 320: # options and discovery magic 321: if block_given? 322: call_agent(action, args, custom_options, [expected_agents].flatten) do |r| 323: block.call(r) 324: end 325: else 326: call_agent(action, args, custom_options, [expected_agents].flatten) 327: end 328: end
Disconnects cleanly from the middleware
# File lib/mcollective/rpc/client.rb, line 115 115: def disconnect 116: @client.disconnect 117: end
Does discovery based on the filters set, if a discovery was previously done return that else do a new discovery.
Alternatively if identity filters are given and none of them are regular expressions then just use the provided data as discovered data, avoiding discovery
Discovery can be forced if direct_addressing is enabled by passing in an array of nodes with :nodes or JSON data like those produced by mcollective RPC JSON output using :json
Will show a message indicating its doing discovery if running verbose or if the :verbose flag is passed in.
Use reset to force a new discovery
# File lib/mcollective/rpc/client.rb, line 425 425: def discover(flags={}) 426: flags.keys.each do |key| 427: raise "Unknown option #{key} passed to discover" unless [:verbose, :hosts, :nodes, :json].include?(key) 428: end 429: 430: flags.include?(:verbose) ? verbose = flags[:verbose] : verbose = @verbose 431: 432: verbose = false unless @output_format == :console 433: 434: # flags[:nodes] and flags[:hosts] are the same thing, we should never have 435: # allowed :hosts as that was inconsistent with the established terminology 436: flags[:nodes] = flags.delete(:hosts) if flags.include?(:hosts) 437: 438: reset if flags[:nodes] || flags[:json] 439: 440: unless @discovered_agents 441: # if either hosts or JSON is supplied try to figure out discovery data from there 442: # if direct_addressing is not enabled this is a critical error as the user might 443: # not have supplied filters so raise an exception 444: if flags[:nodes] || flags[:json] 445: raise "Can only supply discovery data if direct_addressing is enabled" unless Config.instance.direct_addressing 446: 447: hosts = [] 448: 449: if flags[:nodes] 450: hosts = Helpers.extract_hosts_from_array(flags[:nodes]) 451: elsif flags[:json] 452: hosts = Helpers.extract_hosts_from_json(flags[:json]) 453: end 454: 455: raise "Could not find any hosts in discovery data provided" if hosts.empty? 456: 457: @discovered_agents = hosts 458: @force_direct_request = true 459: 460: # if an identity filter is supplied and it is all strings no regex we can use that 461: # as discovery data, technically the identity filter is then redundant if we are 462: # in direct addressing mode and we could empty it out but this use case should 463: # only really be for a few -I's on the CLI 464: # 465: # For safety we leave the filter in place for now, that way we can support this 466: # enhancement also in broadcast mode. 467: # 468: # This is only needed for the 'mc' discovery method, other methods might change 469: # the concept of identity to mean something else so we should pass the full 470: # identity filter to them 471: elsif options[:filter]["identity"].size > 0 && @discovery_method == "mc" 472: regex_filters = options[:filter]["identity"].select{|i| i.match("^\/")}.size 473: 474: if regex_filters == 0 475: @discovered_agents = options[:filter]["identity"].clone 476: @force_direct_request = true if Config.instance.direct_addressing 477: end 478: end 479: end 480: 481: # All else fails we do it the hard way using a traditional broadcast 482: unless @discovered_agents 483: @stats.time_discovery :start 484: 485: # if compound filters are used the only real option is to use the mc 486: # discovery plugin since its the only capable of using data queries etc 487: # and we do not want to degrade that experience just to allow compounds 488: # on other discovery plugins the UX would be too bad raising complex sets 489: # of errors etc. 490: @client.discoverer.force_discovery_method_by_filter(options[:filter]) 491: 492: actual_timeout = options[:disctimeout] + @client.timeout_for_compound_filter(options[:filter]["compound"]) 493: if actual_timeout > 0 494: @stderr.print("Discovering hosts using the %s method for %d second(s) .... " % [@client.discoverer.discovery_method, actual_timeout]) if verbose 495: else 496: @stderr.print("Discovering hosts using the %s method .... " % [@client.discoverer.discovery_method]) if verbose 497: end 498: 499: @client.options = options 500: 501: # if the requested limit is a pure number and not a percent 502: # and if we're configured to use the first found hosts as the 503: # limit method then pass in the limit thus minimizing the amount 504: # of work we do in the discover phase and speeding it up significantly 505: if @limit_method == :first and @limit_targets.is_a?(Fixnum) 506: @discovered_agents = @client.discover(@filter, options[:disctimeout], @limit_targets) 507: else 508: @discovered_agents = @client.discover(@filter, options[:disctimeout]) 509: end 510: 511: @stderr.puts(@discovered_agents.size) if verbose 512: 513: @force_direct_request = @client.discoverer.force_direct_mode? 514: 515: @stats.time_discovery :end 516: end 517: 518: @stats.discovered_agents(@discovered_agents) 519: RPC.discovered(@discovered_agents) 520: 521: @discovered_agents 522: end
# File lib/mcollective/rpc/client.rb, line 335 335: def discovery_method=(method) 336: @discovery_method = method 337: 338: if @initial_options[:discovery_options] 339: @discovery_options = @initial_options[:discovery_options] 340: else 341: @discovery_options.clear 342: end 343: 344: @client.options = options 345: @discovery_timeout = discovery_timeout 346: reset 347: end
# File lib/mcollective/rpc/client.rb, line 349 349: def discovery_options=(options) 350: @discovery_options = [options].flatten 351: reset 352: end
# File lib/mcollective/rpc/client.rb, line 330 330: def discovery_timeout 331: return @initial_options[:disctimeout] if @initial_options[:disctimeout] 332: return @client.discoverer.ddl.meta[:timeout] 333: end
Sets the fact filter
# File lib/mcollective/rpc/client.rb, line 362 362: def fact_filter(fact, value=nil, operator="=") 363: return if fact.nil? 364: return if fact == false 365: 366: if value.nil? 367: parsed = Util.parse_fact_string(fact) 368: @filter["fact"] << parsed unless parsed == false 369: else 370: parsed = Util.parse_fact_string("#{fact}#{operator}#{value}") 371: @filter["fact"] << parsed unless parsed == false 372: end 373: 374: @filter["fact"].compact! 375: reset 376: end
for requests that do not care for results just return the request id and don‘t do any of the response processing.
We send the :process_results flag with to the nodes so they can make decisions based on that.
Should only be called via method_missing
# File lib/mcollective/rpc/client.rb, line 678 678: def fire_and_forget_request(action, args, filter=nil) 679: @ddl.validate_rpc_request(action, args) if @ddl 680: 681: req = new_request(action.to_s, args) 682: 683: filter = options[:filter] unless filter 684: 685: message = Message.new(req, nil, {:agent => @agent, :type => :request, :collective => @collective, :filter => filter, :options => options}) 686: message.reply_to = @reply_to if @reply_to 687: 688: return @client.sendreq(message, nil) 689: end
Sets the identity filter
# File lib/mcollective/rpc/client.rb, line 386 386: def identity_filter(identity) 387: @filter["identity"] << identity 388: @filter["identity"].compact! 389: reset 390: end
Sets and sanity check the limit_method variable used to determine how to limit targets if limit_targets is set
# File lib/mcollective/rpc/client.rb, line 567 567: def limit_method=(method) 568: method = method.to_sym unless method.is_a?(Symbol) 569: 570: raise "Unknown limit method #{method} must be :random or :first" unless [:random, :first].include?(method) 571: 572: @limit_method = method 573: end
Sets and sanity checks the limit_targets variable used to restrict how many nodes we‘ll target
# File lib/mcollective/rpc/client.rb, line 551 551: def limit_targets=(limit) 552: if limit.is_a?(String) 553: raise "Invalid limit specified: #{limit} valid limits are /^\d+%*$/" unless limit =~ /^\d+%*$/ 554: 555: begin 556: @limit_targets = Integer(limit) 557: rescue 558: @limit_targets = limit 559: end 560: else 561: @limit_targets = Integer(limit) 562: end 563: end
# File lib/mcollective/rpc/client.rb, line 644 644: def load_aggregate_functions(action, ddl) 645: return nil unless ddl 646: return nil unless ddl.action_interface(action).keys.include?(:aggregate) 647: 648: return Aggregate.new(ddl.action_interface(action)) 649: 650: rescue => e 651: Log.error("Failed to load aggregate functions, calculating summaries disabled: %s: %s (%s)" % [e.backtrace.first, e.to_s, e.class]) 652: return nil 653: end
Magic handler to invoke remote methods
Once the stub is created using the constructor or the RPC#rpcclient helper you can call remote actions easily:
ret = rpc.echo(:msg => "hello world")
This will call the ‘echo’ action of the ‘rpctest’ agent and return the result as an array, the array will be a simplified result set from the usual full MCollective::Client#req with additional error codes and error text:
{
:sender => "remote.box.com", :statuscode => 0, :statusmsg => "OK", :data => "hello world"
}
If :statuscode is 0 then everything went find, if it‘s 1 then you supplied the correct arguments etc but the request could not be completed, you‘ll find a human parsable reason in :statusmsg then.
Codes 2 to 5 maps directly to UnknownRPCAction, MissingRPCData, InvalidRPCData and UnknownRPCError see below for a description of those, in each case :statusmsg would be the reason for failure.
To get access to the full result of the MCollective::Client#req calls you can pass in a block:
rpc.echo(:msg => "hello world") do |resp| pp resp end
In this case resp will the result from MCollective::Client#req. Instead of returning simple text and codes as above you‘ll also need to handle the following exceptions:
UnknownRPCAction - There is no matching action on the agent MissingRPCData - You did not supply all the needed parameters for the action InvalidRPCData - The data you did supply did not pass validation UnknownRPCError - Some other error prevented the agent from running
During calls a progress indicator will be shown of how many results we‘ve received against how many nodes were discovered, you can disable this by setting progress to false:
rpc.progress = false
This supports a 2nd mode where it will send the SimpleRPC request and never handle the responses. It‘s a bit like UDP, it sends the request with the filter attached and you only get back the requestid, you have no indication about results.
You can invoke this using:
puts rpc.echo(:process_results => false)
This will output just the request id.
Batched processing is supported:
printrpc rpc.ping(:batch_size => 5)
This will do everything exactly as normal but communicate to only 5 agents at a time
# File lib/mcollective/rpc/client.rb, line 217 217: def method_missing(method_name, *args, &block) 218: # set args to an empty hash if nothings given 219: args = args[0] 220: args = {} if args.nil? 221: 222: action = method_name.to_s 223: 224: @stats.reset 225: 226: @ddl.validate_rpc_request(action, args) if @ddl 227: 228: # if a global batch size is set just use that else set it 229: # in the case that it was passed as an argument 230: batch_mode = args.include?(:batch_size) || @batch_mode 231: batch_size = args.delete(:batch_size) || @batch_size 232: batch_sleep_time = args.delete(:batch_sleep_time) || @batch_sleep_time 233: 234: # if we were given a batch_size argument thats 0 and batch_mode was 235: # determined to be on via global options etc this will allow a batch_size 236: # of 0 to disable or batch_mode for this call only 237: batch_mode = (batch_mode && Integer(batch_size) > 0) 238: 239: # Handle single target requests by doing discovery and picking 240: # a random node. Then do a custom request specifying a filter 241: # that will only match the one node. 242: if @limit_targets 243: target_nodes = pick_nodes_from_discovered(@limit_targets) 244: Log.debug("Picked #{target_nodes.join(',')} as limited target(s)") 245: 246: custom_request(action, args, target_nodes, {"identity" => /^(#{target_nodes.join('|')})$/}, &block) 247: elsif batch_mode 248: call_agent_batched(action, args, options, batch_size, batch_sleep_time, &block) 249: else 250: call_agent(action, args, options, :auto, &block) 251: end 252: end
Creates a suitable request hash for the SimpleRPC agent.
You‘d use this if you ever wanted to take care of sending requests on your own - perhaps via Client#sendreq if you didn‘t care for responses.
In that case you can just do:
msg = your_rpc.new_request("some_action", :foo => :bar) filter = your_rpc.filter your_rpc.client.sendreq(msg, msg[:agent], filter)
This will send a SimpleRPC request to the action some_action with arguments :foo = :bar, it will return immediately and you will have no indication at all if the request was receieved or not
Clearly the use of this technique should be limited and done only if your code requires such a thing
# File lib/mcollective/rpc/client.rb, line 147 147: def new_request(action, data) 148: callerid = PluginManager["security_plugin"].callerid 149: 150: raise 'callerid received from security plugin is not valid' unless PluginManager["security_plugin"].valid_callerid?(callerid) 151: 152: {:agent => @agent, 153: :action => action, 154: :caller => callerid, 155: :data => data} 156: end
Provides a normal options hash like you would get from Optionparser
# File lib/mcollective/rpc/client.rb, line 526 526: def options 527: {:disctimeout => @discovery_timeout, 528: :timeout => @timeout, 529: :verbose => @verbose, 530: :filter => @filter, 531: :collective => @collective, 532: :output_format => @output_format, 533: :ttl => @ttl, 534: :discovery_method => @discovery_method, 535: :discovery_options => @discovery_options, 536: :force_display_mode => @force_display_mode, 537: :config => @config} 538: end
Pick a number of nodes from the discovered nodes
The count should be a string that can be either just a number or a percentage like 10%
It will select nodes from the discovered list based on the rpclimitmethod configuration option which can be either :first or anything else
- :first would be a simple way to do a distance based selection - anything else will just pick one at random - if random chosen, and batch-seed set, then set srand for the generator, and reset afterwards
# File lib/mcollective/rpc/client.rb, line 603 603: def pick_nodes_from_discovered(count) 604: if count =~ /%$/ 605: pct = Integer((discover.size * (count.to_f / 100))) 606: pct == 0 ? count = 1 : count = pct 607: else 608: count = Integer(count) 609: end 610: 611: return discover if discover.size <= count 612: 613: result = [] 614: 615: if @limit_method == :first 616: return discover[0, count] 617: else 618: # we delete from the discovered list because we want 619: # to be sure there is no chance that the same node will 620: # be randomly picked twice. So we have to clone the 621: # discovered list else this method will only ever work 622: # once per discovery cycle and not actually return the 623: # right nodes. 624: haystack = discover.clone 625: 626: if @limit_seed 627: haystack.sort! 628: srand(@limit_seed) 629: end 630: 631: count.times do 632: rnd = rand(haystack.size) 633: result << haystack.delete_at(rnd) 634: end 635: 636: # Reset random number generator to fresh seed 637: # As our seed from options is most likely short 638: srand if @limit_seed 639: end 640: 641: [result].flatten 642: end
process client requests by calling a block on each result in this mode we do not do anything fancy with the result objects and we raise exceptions if there are problems with the data
# File lib/mcollective/rpc/client.rb, line 881 881: def process_results_with_block(action, resp, block, aggregate) 882: @stats.node_responded(resp[:senderid]) 883: 884: result = rpc_result_from_reply(@agent, action, resp) 885: aggregate = aggregate_reply(result, aggregate) if aggregate 886: 887: if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1 888: @stats.ok if resp[:body][:statuscode] == 0 889: @stats.fail if resp[:body][:statuscode] == 1 890: @stats.time_block_execution :start 891: 892: case block.arity 893: when 1 894: block.call(resp) 895: when 2 896: block.call(resp, result) 897: end 898: 899: @stats.time_block_execution :end 900: else 901: @stats.fail 902: 903: case resp[:body][:statuscode] 904: when 2 905: raise UnknownRPCAction, resp[:body][:statusmsg] 906: when 3 907: raise MissingRPCData, resp[:body][:statusmsg] 908: when 4 909: raise InvalidRPCData, resp[:body][:statusmsg] 910: when 5 911: raise UnknownRPCError, resp[:body][:statusmsg] 912: end 913: end 914: 915: return aggregate 916: end
Handles result sets that has no block associated, sets fails and ok in the stats object and return a hash of the response to send to the caller
# File lib/mcollective/rpc/client.rb, line 861 861: def process_results_without_block(resp, action, aggregate) 862: @stats.node_responded(resp[:senderid]) 863: 864: result = rpc_result_from_reply(@agent, action, resp) 865: aggregate = aggregate_reply(result, aggregate) if aggregate 866: 867: if resp[:body][:statuscode] == 0 || resp[:body][:statuscode] == 1 868: @stats.ok if resp[:body][:statuscode] == 0 869: @stats.fail if resp[:body][:statuscode] == 1 870: else 871: @stats.fail 872: end 873: 874: [result, aggregate] 875: end
Resets various internal parts of the class, most importantly it clears out the cached discovery
# File lib/mcollective/rpc/client.rb, line 400 400: def reset 401: @discovered_agents = nil 402: end
Reet the filter to an empty one
# File lib/mcollective/rpc/client.rb, line 405 405: def reset_filter 406: @filter = Util.empty_filter 407: agent_filter @agent 408: end