Class Mongo::ReplSetConnection
In: lib/mongo/repl_set_connection.rb
Parent: Connection

Instantiates and manages connections to a MongoDB replica set.

Methods

Constants

REPL_SET_OPTS = [:read, :refresh_mode, :refresh_interval, :require_primary, :read_secondary, :rs_name, :name]

Attributes

manager  [R] 
refresh_interval  [R] 
refresh_mode  [R] 
refresh_version  [R] 
replica_set_name  [R] 
seeds  [R] 

Public Class methods

Create a connection to a MongoDB replica set.

If no args are provided, it will check ENV["MONGODB_URI"].

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and Connection#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

@param [Array] seeds "host:port" strings

@option opts [String] :name (nil) The name of the replica set to connect to. You

  can use this option to verify that you're connecting to the right replica set.

@option opts [Boolean, Hash] :safe (false) Set the default safe-mode options

  propogated to DB objects instantiated off of this Connection. This
  default can be overridden upon instantiation of any DB by explicity setting a :safe value
  on initialization.

@option opts [:primary, :secondary] :read (:primary) The default read preference for Mongo::DB

  objects created from this connection object. If +:secondary+ is chosen, reads will be sent
  to one of the closest available secondary nodes. If a secondary node cannot be located, the
  read will be sent to the primary.

@option opts [Logger] :logger (nil) Logger instance to receive driver operation log. @option opts [Integer] :pool_size (1) The maximum number of socket connections allowed per

  connection pool. Note: this setting is relevant only for multi-threaded applications.

@option opts [Float] :pool_timeout (5.0) When all of the connections a pool are checked out,

  this is the number of seconds to wait for a new connection to be released before throwing an exception.
  Note: this setting is relevant only for multi-threaded applications.

@option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out. @option opts [Float] :connect_timeout (30) The number of seconds to wait before timing out a

  connection attempt.

@option opts [Boolean] :ssl (false) If true, create the connection to the server using SSL. @option opts [Boolean] :refresh_mode (false) Set this to :sync to periodically update the

  state of the connection every :refresh_interval seconds. Replica set connection failures
  will always trigger a complete refresh. This option is useful when you want to add new nodes
  or remove replica set nodes not currently in use by the driver.

@option opts [Integer] :refresh_interval (90) If :refresh_mode is enabled, this is the number of seconds

  between calls to check the replica set's state.

@option opts [Boolean] :require_primary (true) If true, require a primary node for the connection

  to succeed. Otherwise, connection will succeed as long as there's at least one secondary node.

Note: that the number of seed nodes does not have to be equal to the number of replica set members. The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

@example Connect to a replica set and provide two seed nodes.

  Mongo::ReplSetConnection.new(['localhost:30000', 'localhost:30001'])

@example Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named ‘prod’:

  Mongo::ReplSetConnection.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

@example Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

  Mongo::ReplSetConnection.new(['localhost:30000', 'localhost:30001'], :read => :secondary)

@see api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby

@raise [MongoArgumentError] If called with no arguments and ENV["MONGODB_URI"] implies a direct connection.

@raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the

  driver fails to connect to a replica set with that name.

[Source]

# File lib/mongo/repl_set_connection.rb, line 87
    def initialize(*args)
      if args.last.is_a?(Hash)
        opts = args.pop
      else
        opts = {}
      end

      nodes = args

      if nodes.empty? and ENV.has_key?('MONGODB_URI')
        parser = URIParser.new ENV['MONGODB_URI'], opts
        if parser.direct?
          raise MongoArgumentError, "Mongo::ReplSetConnection.new called with no arguments, but ENV['MONGODB_URI'] implies a direct connection."
        end
        opts = parser.connection_options
        nodes = parser.nodes
      end

      unless nodes.length > 0
        raise MongoArgumentError, "A ReplSetConnection requires at least one seed node."
      end

      # This is temporary until support for the old format is dropped
      if nodes.first.last.is_a?(Integer)
        warn "Initiating a ReplSetConnection with seeds passed as individual [host, port] array arguments is deprecated."
        warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
        @seeds = nodes
      else
        @seeds = nodes.first.map do |host_port|
          host, port = host_port.split(":")
          [ host, port.to_i ]
        end
      end

      # TODO: add a method for replacing this list of node.
      @seeds.freeze

      # Refresh
      @last_refresh = Time.now
      @refresh_version = 0

      # No connection manager by default.
      @manager = nil
      @old_managers = []

      # Lock for request ids.
      @id_lock = Mutex.new

      @pool_mutex = Mutex.new
      @connected = false

      @safe_mutex_lock = Mutex.new
      @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

      @connect_mutex = Mutex.new
      @refresh_mutex = Mutex.new

      check_opts(opts)
      setup(opts)
    end

Public Instance methods

[Source]

# File lib/mongo/repl_set_connection.rb, line 451
    def arbiters
      local_manager.arbiters.nil? ? [] : local_manager.arbiters
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 309
    def authenticate_pools
      if primary_pool
        primary_pool.authenticate_existing
      end
      secondary_pools.each do |pool|
        pool.authenticate_existing
      end
    end

Checkin a socket used for reading.

[Source]

# File lib/mongo/repl_set_connection.rb, line 394
    def checkin_reader(socket)
      if socket
        socket.pool.checkin(socket)
      end
      sync_refresh
    end

Checkin a socket used for writing.

[Source]

# File lib/mongo/repl_set_connection.rb, line 402
    def checkin_writer(socket)
      if socket
        socket.pool.checkin(socket)
      end
      sync_refresh
    end

Generic socket checkout Takes a block that returns a socket from pool

[Source]

# File lib/mongo/repl_set_connection.rb, line 329
    def checkout(&block)
      if connected?
        sync_refresh
      else
        connect
      end
      
      begin
        socket = block.call
      rescue => ex
        checkin(socket) if socket
        raise ex
      end
      
      if socket
        socket
      else
        @connected = false
        raise ConnectionFailure.new("Could not checkout a socket.")
      end
    end

Checkout best available socket by trying primary pool first and then falling back to secondary.

[Source]

# File lib/mongo/repl_set_connection.rb, line 353
    def checkout_best
      checkout do
        socket = get_socket_from_pool(:primary)
        if !socket
          connect
          socket = get_socket_from_pool(:secondary)
        end
        socket
      end
    end

Checkout a socket for reading (i.e., a secondary node). Note that @read_pool might point to the primary pool if no read pool has been defined.

[Source]

# File lib/mongo/repl_set_connection.rb, line 367
    def checkout_reader
      checkout do
        socket = get_socket_from_pool(:read)
        if !socket
          connect
          socket = get_socket_from_pool(:primary)
        end
        socket
      end
    end

Checkout a socket from a secondary For :read_preference => :secondary_only

[Source]

# File lib/mongo/repl_set_connection.rb, line 380
    def checkout_secondary
      checkout do
        get_socket_from_pool(:secondary)
      end
    end

Checkout a socket for writing (i.e., a primary node).

[Source]

# File lib/mongo/repl_set_connection.rb, line 387
    def checkout_writer
      checkout do
        get_socket_from_pool(:primary)
      end
    end

Close the connection to the database.

[Source]

# File lib/mongo/repl_set_connection.rb, line 274
    def close(opts={})
      if opts[:soft]
        @manager.close(:soft => true) if @manager
      else
        @manager.close if @manager
      end

      # Clear the reference to this object.
      if Thread.current[:managers]
        Thread.current[:managers].delete(self)
      end

      @connected = false
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 409
    def close_socket(socket)
      begin
        socket.close if socket
      rescue IOError
        log(:info, "Tried to close socket #{socket} but already closed.")
      end
    end

Initiate a connection to the replica set.

[Source]

# File lib/mongo/repl_set_connection.rb, line 158
    def connect
      log(:info, "Connecting...")
      @connect_mutex.synchronize do
        return if @connected

        discovered_seeds = @manager ? @manager.seeds : []
        @manager = PoolManager.new(self, discovered_seeds)

        Thread.current[:managers] ||= Hash.new
        Thread.current[:managers][self] = @manager

        @manager.connect
        @refresh_version += 1

        if @require_primary && @manager.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
          close
          raise ConnectionFailure, "Failed to connect to primary node."
        elsif @manager.read_pool.nil?
          close
          raise ConnectionFailure, "Failed to connect to any node."
        else
          @connected = true
        end
      end
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 230
    def connected?
      @connected && (@manager.primary_pool || @manager.read_pool)
    end

@deprecated

[Source]

# File lib/mongo/repl_set_connection.rb, line 235
    def connecting?
      warn "ReplSetConnection#connecting? is deprecated and will be removed in v2.0."
      false
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 417
    def ensure_manager
      Thread.current[:managers] ||= Hash.new

      if Thread.current[:managers][self] != @manager
        Thread.current[:managers][self] = @manager
      end
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 425
    def get_socket_from_pool(pool_type)
      ensure_manager

      pool = case pool_type
        when :primary
          primary_pool
        when :secondary
          secondary_pool
        when :read
          read_pool
      end

      begin
        if pool
          pool.checkout
        end
      rescue ConnectionFailure => ex
        log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
        return nil
      end
    end

Force a hard refresh of this connection‘s view of the replica set.

@return [Boolean] true if hard refresh

  occurred. +false+ is returned when unable
  to get the refresh lock.

[Source]

# File lib/mongo/repl_set_connection.rb, line 214
    def hard_refresh!
      log(:info, "Initiating hard refresh...")
      discovered_seeds = @manager ? @manager.seeds : []
      new_manager = PoolManager.new(self, discovered_seeds | @seeds)
      new_manager.connect

      Thread.current[:managers][self] = new_manager

      # TODO: make sure that connect has succeeded
      @old_managers << @manager
      @manager = new_manager

      @refresh_version += 1
      return true
    end

The replica set primary‘s host name.

@return [String]

[Source]

# File lib/mongo/repl_set_connection.rb, line 243
    def host
      @manager.primary_pool.host
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 464
    def hosts
      local_manager ? local_manager.hosts : []
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 152
    def inspect
      "<Mongo::ReplSetConnection:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
        "@connected=#{@connected}>"
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 447
    def local_manager
      Thread.current[:managers][self] if Thread.current[:managers]
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 318
    def logout_pools(db)
      if primary_pool
        primary_pool.logout_existing(db)
      end
      secondary_pools.each do |pool|
        pool.logout_existing(db)
      end
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 488
    def max_bson_size
      if local_manager && local_manager.max_bson_size
        local_manager.max_bson_size
      else
        Mongo::DEFAULT_MAX_BSON_SIZE
      end
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 254
    def nodes
      warn "ReplSetConnection#nodes is DEPRECATED and will be removed in v2.0. " +
        "Please use ReplSetConnection#seeds instead."
      @seeds
    end

The replica set primary‘s port.

@return [Integer]

[Source]

# File lib/mongo/repl_set_connection.rb, line 250
    def port
      @manager.primary_pool.port
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 455
    def primary
      local_manager ? local_manager.primary : nil
    end
primary?()

Alias for read_primary?

[Source]

# File lib/mongo/repl_set_connection.rb, line 468
    def primary_pool
      local_manager ? local_manager.primary_pool : nil
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 472
    def read_pool
      local_manager ? local_manager.read_pool : nil
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 269
    def read_preference
      @read
    end

Determine whether we‘re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

@return [Boolean]

[Source]

# File lib/mongo/repl_set_connection.rb, line 264
    def read_primary?
      @manager.read_pool == @manager.primary_pool
    end

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running ReplSetConnection#hard_refresh!

@return [Boolean] true unless a hard refresh

  is run and the refresh lock can't be acquired.

[Source]

# File lib/mongo/repl_set_connection.rb, line 191
    def refresh(opts={})
      if !connected?
        log(:info, "Trying to check replica set health but not " +
          "connected...")
        return hard_refresh!
      end

      log(:debug, "Checking replica set connection health...")
      @manager.check_connection_health

      if @manager.refresh_required?
        return hard_refresh!
      end

      return true
    end

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values. @deprecated

[Source]

# File lib/mongo/repl_set_connection.rb, line 292
    def reset_connection
      close
      warn "ReplSetConnection#reset_connection is now deprecated and will be removed in v2.0. " +
        "Use ReplSetConnection#close instead."
    end

Note: might want to freeze these after connecting.

[Source]

# File lib/mongo/repl_set_connection.rb, line 460
    def secondaries
      local_manager ? local_manager.secondaries : []
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 476
    def secondary_pool
      local_manager ? local_manager.secondary_pool : nil
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 480
    def secondary_pools
      local_manager ? local_manager.secondary_pools : []
    end

Returns true if it‘s okay to read from a secondary node. Since this is a replica set, this must always be true.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

@return [Boolean] true

[Source]

# File lib/mongo/repl_set_connection.rb, line 305
    def slave_ok?
      true
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 484
    def tag_map
      local_manager ? local_manager.tag_map : {}
    end

[Source]

# File lib/mongo/repl_set_connection.rb, line 148
    def valid_opts
      GENERIC_OPTS + REPL_SET_OPTS
    end

[Validate]