API Reference 0.4.0couchclientCouchClientImpl

CouchClientImpl class

class CouchClientImpl extends MemcachedClientImpl implements CouchClient {
 RestClient _restClient;
 ViewConnection _viewConn;
 CouchbaseConnectionFactory _connFactory;
 Logger _logger;

 static Future<CouchClient> connect(CouchbaseConnectionFactory factory) {
   return factory.vbucketConfig.then((config) {
     ViewConnection viewConn = null;
     List<SocketAddress> saddrs =
         HttpUtil.parseSocketAddressesFromStrings(config.servers);
     if (config.configType == ConfigType.COUCHBASE) {
       List<SocketAddress> uaddrs = _toSocketAddressesFromUri(config.couchServers);
       viewConn = factory.createViewConnection(uaddrs);
     }
     return factory.createConnection(saddrs)
     .then((conn) => new CouchClientImpl(viewConn, conn, factory))
     .then((client) {
       return factory.configProvider.subscribe(factory.bucketName, client)
       .then((ok) => client);
     });
   });
 }

 static List<SocketAddress> _toSocketAddressesFromUri(List<Uri> uris) {
   List<SocketAddress> saddrs = HttpUtil.parseSocketAddressesFromUris(uris);
   if (saddrs.isEmpty)
     throw new ArgumentError("servers cannot be empty");

   return saddrs;
 }

 CouchClientImpl(ViewConnection viewConn, CouchbaseConnection memcachedConn,
                  CouchbaseConnectionFactory connFactory)
     : _viewConn = viewConn,
       _connFactory = connFactory,
       super(memcachedConn, connFactory) {

   _logger = initLogger('couchclient', this);
 }

 CouchbaseConnectionFactory get connectionFactory => _connFactory;

 Future<bool> addDesignDoc(DesignDoc doc) {
   PutDesignDocOP op = new PutDesignDocOP(_connFactory.bucketName, doc.name, doc.toJson());
   _handleHttpOperation(op);
   return op.future;
 }

 Future<bool> deleteDesignDoc(String docName) {
   DeleteDesignDocOP op = new DeleteDesignDocOP(_connFactory.bucketName, docName);
   _handleHttpOperation(op);
   return op.future;
 }

 Future<DesignDoc> getDesignDoc(String docName) {
   GetDesignDocOP op = new GetDesignDocOP(_connFactory.bucketName, docName);
   _handleHttpOperation(op);
   return op.future;
 }

 Future<View> getView(String docName, String viewName) {
   GetViewOP op = new GetViewOP(_connFactory.bucketName, docName, viewName);
   _handleHttpOperation(op);
   return op.future;
 }

 Future<SpatialView> getSpatialView(String docName, String viewName) {
   GetSpatialViewOP op = new GetSpatialViewOP(_connFactory.bucketName, docName, viewName);
   _handleHttpOperation(op);
   return op.future;
 }

 Future<ViewResponse> query(ViewBase view, Query query) {
   if (view.hasReduce && !query.args.containsKey('reduce')) {
     query.reduce = true;
   }

   if (query.willReduce) {
     return _queryReduced(view, query);
   } else if (query.includeDocs) {
     return _queryWithDocs(view, query);
   } else {
     return _queryNoDocs(view, query);
   }
 }

 Future<Map<SocketAddress, ObserveResult>> observe(String key, {int cas}) {
   return _connFactory.vbucketConfig
   .then((Config cfg) {
      VbucketNodeLocator vlocator = locator;
      int vb = vlocator.getVbucketIndex(key);
      List<MemcachedNode> nodes = new List();
      MemcachedNode primary = vlocator.getServerByIndex(cfg.getMaster(vb));
      nodes.add(primary);
      for (int j = 0, repCount = cfg.replicasCount; j < repCount; ++j) {
        int replica = cfg.getReplica(vb, j);
        if (replica >= 0)
          nodes.add(vlocator.getServerByIndex(replica));
      }
      return handleBroadcastOperation(() =>
          opFactory.newObserveOP(key, cas), nodes.iterator)
      .then((results) {
        results[primary.socketAddress].isPrimary = true;
        return results;
      });
   });
 }

 Future<bool> observePoll(String key, {
     int cas,
     PersistTo persistTo: PersistTo.ZERO,
     ReplicateTo replicateTo: ReplicateTo.ZERO,
     bool isDelete: false}) {

   final Completer<bool> cmpl = new Completer();
   _connFactory.checkConfigAgainstDurability(persistTo, replicateTo)
   .then((_) => _connFactory.vbucketConfig)
   .then((cfg) {
     final int numPersistReplica = persistTo.value > 0 ? persistTo.value - 1 : 0;
     final int numReplica = replicateTo.value;
     final bool isPersistMaster = persistTo.value > 0;
     VbucketNodeLocator vlocator = locator;

     _checkObserveReplica(key, numPersistReplica, numReplica, cfg, vlocator);

     if (numReplica <= 0 && numPersistReplica <= 0 && !isPersistMaster) {
       cmpl.complete(true);
     } else {
       final int obsPolls = 0;
       final int obsPollMax = _connFactory.observePollMax;
       final Duration obsPollInterval =
           new Duration(milliseconds: _connFactory.observePollInterval);

       _observePoll0(
           cmpl,
           key,
           cas,
           isDelete,
           cfg,
           vlocator,
           numReplica,
           numPersistReplica,
           isPersistMaster,
           obsPolls,
           obsPollMax,
           obsPollInterval);
     }
   })
   .catchError((err) => cmpl.completeError(err));

   return cmpl.future;
 }

 RestClient get restClient {
   if (_restClient == null) {
     _restClient = new RestClientImpl(_connFactory);
   }
   return _restClient;
 }

 void close() {
   if (isClosing) return;
   super.close();
   //TODO(20130605,henrichen): Shutdown the monitor channel
   _connFactory.configProvider.shutdown();
 }

 void _observePoll0(
     Completer cmpl,
     String key,
     int cas,
     bool isDelete,
     Config cfg,
     VbucketNodeLocator vlocator,
     int numReplica,
     int numPersistReplica,
     bool isPersistMaster,
     int obsPolls,
     int obsPollMax,
     Duration obsPollInterval) {

   _checkObserveReplica(key, numPersistReplica, numReplica, cfg, vlocator);

   this.observe(key, cas:cas)
   .then((response) {
     int vb = vlocator.getVbucketIndex(key);
     MemcachedNode master = vlocator.getServerByIndex(cfg.getMaster(vb));

     int observePersistReplica = 0;
     int observeReplica = 0;
     bool observePersistMaster = false;
     for (SocketAddress node in response.keys) {
       final bool isMaster = node == master.socketAddress;
       final ObserveStatus status = response[node].status;
       if (isMaster && status == ObserveStatus.MODIFIED) {
         throw new ObservedModifiedException("Key was modified");
       }
       if (!isDelete) {
         if (!isMaster && status == ObserveStatus.NOT_PERSISTED) {
           observeReplica++;
         } else if (status == ObserveStatus.PERSISTED) {
           if (isMaster) {
             observePersistMaster = true;
           } else {
             observeReplica++;
             observePersistReplica++;
           }
         }
       } else {
//TODO(20130520, henrichen): The following code is from Couchbase Java client
//  but the program logic is strange to me, so I rewrite it to be symetric to
//  of isDelete is false.
//          if (status == ObserveStatus.LOGICALLY_DELETED) {
//            observeReplica++;
//          } else if (status == ObserveStatus.NOT_FOUND) {
//            observeReplica++;
//            observePersistReplica++;
//            if (isMaster) {
//              obervePersistMaster = true;
//            } else {
//              observePersistReplica++;
//            }
//          }
         if (!isMaster && status == ObserveStatus.LOGICALLY_DELETED) {
           observeReplica++;
         } else if (status == ObserveStatus.NOT_FOUND) {
           if (isMaster) {
             observePersistMaster = true;
           } else {
             observeReplica++;
             observePersistReplica++;
           }
         }
       }
     }

     if (numReplica <= observeReplica
         && numPersistReplica <= observePersistReplica
         && (observePersistMaster || !isPersistMaster)) {
       cmpl.complete(true);
     } else {
       if (++obsPolls >= obsPollMax) {
         int timeTried = obsPollMax * obsPollInterval.inMilliseconds;
         throw new ObservedTimeoutException("Observe Timeout - Polled"
             " Unsuccessfully for at least $timeTried milliseconds.");
       }

       return new Future.delayed(obsPollInterval)
       .then((_) =>
           _observePoll0( //recursive
               cmpl,
               key,
               cas,
               isDelete,
               cfg,
               vlocator,
               numReplica,
               numPersistReplica,
               isPersistMaster,
               obsPolls,
               obsPollMax,
               obsPollInterval)
       );
     }
   })
   .catchError((err) => cmpl.completeError(err));
 }

 void _checkObserveReplica(String key, int numPersist, int numReplica,
                           Config cfg, VbucketNodeLocator locator) {
   if(numReplica > 0) {
     int vbucketIndex = locator.getVbucketIndex(key);
     int currentReplicaNum = cfg.getReplica(vbucketIndex, numReplica - 1);
     if (currentReplicaNum < 0) {
       throw new ObservedException("Currently, there is no replica available "
           "for the given replica index. This can be the case because of a "
           "failed over node which has not yet been rebalanced.");
     }
   }

   int replicaCount = math.min(locator.allNodes.length - 1, cfg.replicasCount);

   if (numReplica > replicaCount) {
     throw new ObservedException("Requested replication to $numReplica"
         " node(s), but only $replicaCount are avaliable");
   } else if (numPersist > replicaCount + 1) {
     throw new ObservedException("Requested persistence to $numPersist"
         " node(s), but only (${replicaCount + 1}) are available.");
   }
 }

 Future<ViewResponseWithDocs> _queryWithDocs(ViewBase view, Query query) {
   WithDocsOP op = new WithDocsOP(view, query);
   _handleHttpOperation(op);
   Completer<ViewResponseWithDocs> cmpl = new Completer();
   op.future.then((vr) {
     List<String> ids = new List();
     for (ViewRowNoDocs row in vr.rows) {
       ids.add(row.id);
     }
     _logger.finest("--->ids:$ids");
     Map<String, GetResult> results = new HashMap();
     if (ids.isEmpty) {
       cmpl.complete(new ViewResponseWithDocs([], [], results));
     } else {
       Stream<GetResult> st = getAll(ids);
       st.listen(
         (data) {
           results[data.key] = data;
           _logger.finest("data:${data.key}");
         },
         onError: (err) => print(err),
         onDone: () {
           List<ViewRowWithDocs> docs = new List();
           for (ViewRowNoDocs r in vr.rows) {
             docs.add(new ViewRowWithDocs(r.id, r.key, r.value, results[r.id]));
           }
           cmpl.complete(new ViewResponseWithDocs(docs, vr.errors, results));
         }
       );
     }
   });
   return cmpl.future;
 }

 Future<ViewResponseNoDocs> _queryNoDocs(ViewBase view, Query query) {
   NoDocsOP op = new NoDocsOP(view, query);
   _handleHttpOperation(op);
   return op.future;
 }

 Future<ViewResponseReduced> _queryReduced(ViewBase view, Query query) {
   ReducedOP op = new ReducedOP(view, query);
   _handleHttpOperation(op);
   return op.future;
 }

 void _handleHttpOperation(HttpOP op) {
   _viewConn.addOP(op);
 }

//--Reconfigurable--//
 bool _reconfiguring = false;
 void reconfigure(Bucket bucket) {
   if (_reconfiguring) return;
   try {
     _reconfiguring = true;
     if (bucket.isNotUpdating) {
       _logger.info("Bucket configuration is disconnected from cluster "
           "configuration updates, attempting to reconnect.");
       _connFactory.requestConfigReconnect(_connFactory.bucketName, this);
       _connFactory.checkConfigUpdate();
     }
     _connFactory.configProvider.buckets[_connFactory.bucketName] = bucket;

     if(_viewConn != null) {
       _viewConn.reconfigure(bucket);
     }
     if (memcachedConn is Reconfigurable) {
       (memcachedConn as Reconfigurable).reconfigure(bucket);
     }
   } finally {
     _reconfiguring = false;
   }
 }
}

Extends

MemcachedClientImpl > CouchClientImpl

Implements

CouchClient

Static Methods

Future<CouchClient> connect(CouchbaseConnectionFactory factory) #

Create a new client connectting to the specified Couchbase bucket per the given initial server list in the cluster; this method returns a Future that will complete with either a CouchClient once connected or an error if the server-lookup or connection failed.

  • baseList - the Uri list of one or more servers from the cluster
  • bucket - the bucket name in the cluster you want to connect.
  • password - the password of the bucket
docs inherited from CouchClient
static Future<CouchClient> connect(CouchbaseConnectionFactory factory) {
 return factory.vbucketConfig.then((config) {
   ViewConnection viewConn = null;
   List<SocketAddress> saddrs =
       HttpUtil.parseSocketAddressesFromStrings(config.servers);
   if (config.configType == ConfigType.COUCHBASE) {
     List<SocketAddress> uaddrs = _toSocketAddressesFromUri(config.couchServers);
     viewConn = factory.createViewConnection(uaddrs);
   }
   return factory.createConnection(saddrs)
   .then((conn) => new CouchClientImpl(viewConn, conn, factory))
   .then((client) {
     return factory.configProvider.subscribe(factory.bucketName, client)
     .then((ok) => client);
   });
 });
}

Constructors

new CouchClientImpl(ViewConnection viewConn, CouchbaseConnection memcachedConn, CouchbaseConnectionFactory connFactory) #

Creates a new Object instance.

Object instances have no meaningful state, and are only useful through their identity. An Object instance is equal to itself only.

docs inherited from Object
CouchClientImpl(ViewConnection viewConn, CouchbaseConnection memcachedConn,
                CouchbaseConnectionFactory connFactory)
   : _viewConn = viewConn,
     _connFactory = connFactory,
     super(memcachedConn, connFactory) {

 _logger = initLogger('couchclient', this);
}

Properties

final List<SocketAddress> availableServers #

inherited from MemcachedClientImpl

Returns the addresses of available servers at this moment.

List<SocketAddress> get availableServers {
 List<SocketAddress> rv = new List();
 for (MemcachedNode node in locator.allNodes) {
   if (node.isActive)
     rv.add(node.socketAddress);
 }
 return rv;
}

final CouchbaseConnectionFactory connectionFactory #

CouchbaseConnectionFactory get connectionFactory => _connFactory;

final bool isClosing #

inherited from MemcachedClientImpl
bool get isClosing => _closing;

final NodeLocator locator #

inherited from MemcachedClientImpl

Returns default Transcoder used with this MemcachedClient.

Returns the locator of the server nodes in the cluster.

docs inherited from MemcachedClient
NodeLocator get locator => _memcachedConn.locator;

final MemcachedConnection memcachedConn #

inherited from MemcachedClientImpl
MemcachedConnection get memcachedConn => _memcachedConn;

final OPFactory opFactory #

inherited from MemcachedClientImpl
OPFactory get opFactory => _opFactory;

final RestClient restClient #

Get the RESTful client associated with this CouchClient.

docs inherited from CouchClient
RestClient get restClient {
 if (_restClient == null) {
   _restClient = new RestClientImpl(_connFactory);
 }
 return _restClient;
}

final List<SocketAddress> unavailableServers #

inherited from MemcachedClientImpl

Returns the address of unavailable servers at this moment.

List<SocketAddress> get unavailableServers {
 List<SocketAddress> rv = new List();
 for (MemcachedNode node in locator.allNodes) {
   if (!node.isActive)
     rv.add(node.socketAddress);
 }
 return rv;
}

Methods

Future<bool> add(String key, List<int> doc, {int exptime}) #

inherited from MemcachedClientImpl

add command

Future<bool> add(String key, List<int> doc, {int exptime}) =>
   _store(OPType.add, key, 0, exptime, doc);

Future<bool> addDesignDoc(DesignDoc doc) #

Create a DesignDoc and add into Couchbase; asynchronously return true if succeed.

docs inherited from CouchClient
Future<bool> addDesignDoc(DesignDoc doc) {
 PutDesignDocOP op = new PutDesignDocOP(_connFactory.bucketName, doc.name, doc.toJson());
 _handleHttpOperation(op);
 return op.future;
}

Future<bool> append(String key, List<int> doc, {int cas, int exptime}) #

inherited from MemcachedClientImpl

append command

Future<bool> append(String key, List<int> doc, {int cas, int exptime}) =>
   _store(OPType.append, key, 0, exptime, doc, cas);

void close() #

Close this memcached client.

docs inherited from MemcachedClient
void close() {
 if (isClosing) return;
 super.close();
 //TODO(20130605,henrichen): Shutdown the monitor channel
 _connFactory.configProvider.shutdown();
}

Future<int> decrement(String key, int by, {int def, int exptime}) #

inherited from MemcachedClientImpl

decrement command

Future<int> decrement(String key, int by, {int def, int exptime}) {
 MutateOP op = _opFactory.newMutateOP(OPType.decr, key, by, def, exptime);
 _handleOperation(key, op);
 return op.future;
}

Future<bool> delete(String key, {int cas}) #

inherited from MemcachedClientImpl

delete command

Future<bool> delete(String key, {int cas}) {
 DeleteOP op = _opFactory.newDeleteOP(key, cas);
 _handleOperation(key, op);
 return op.future;
}

Future<bool> deleteDesignDoc(String docName) #

Delete the named DesignDoc.

docs inherited from CouchClient
Future<bool> deleteDesignDoc(String docName) {
 DeleteDesignDocOP op = new DeleteDesignDocOP(_connFactory.bucketName, docName);
 _handleHttpOperation(op);
 return op.future;
}

Future<GetResult> get(String key) #

inherited from MemcachedClientImpl

get command

Future<GetResult> get(String key) {
 GetSingleOP op = _opFactory.newGetSingleOP(OPType.get, key);
 _handleOperation(key, op);
 return op.future;
}

Stream<GetResult> getAll(List<String> keys) #

inherited from MemcachedClientImpl

get command with multiple keys

Stream<GetResult> getAll(List<String> keys) =>
   _retrieveAll(OPType.get, keys);

Future<GetResult> getAndLock(String key, int locktime) #

inherited from MemcachedClientImpl

getAndLock command

Future<GetResult> getAndLock(String key, int locktime) {
 GetAndLockOP op = _opFactory.newGetAndLockOP(key, locktime);
 _handleOperation(key, op);
 return op.future;
}

Future<GetResult> getAndTouch(String key, int exp) #

inherited from MemcachedClientImpl

getAndTouch command

Future<GetResult> getAndTouch(String key, int exp) {
 GetAndTouchOP op = _opFactory.newGetAndTouchOP(key, exp);
 _handleOperation(key, op);
 return op.future;
}

Future<DesignDoc> getDesignDoc(String docName) #

Retrieve the named DesignDoc.

docs inherited from CouchClient
Future<DesignDoc> getDesignDoc(String docName) {
 GetDesignDocOP op = new GetDesignDocOP(_connFactory.bucketName, docName);
 _handleHttpOperation(op);
 return op.future;
}

Future<GetResult> gets(String key) #

inherited from MemcachedClientImpl

gets(with cas data version token) command

Future<GetResult> gets(String key) {
 GetSingleOP op = _opFactory.newGetSingleOP(OPType.gets, key);
 _handleOperation(key, op);
 return op.future;
}

Stream<GetResult> getsAll(List<String> keys) #

inherited from MemcachedClientImpl

gets(with cas data version token) command with multiple keys

Stream<GetResult> getsAll(List<String> keys) =>
   _retrieveAll(OPType.gets, keys);

Future<SpatialView> getSpatialView(String docName, String viewName) #

Retrieve the named SpatialView in the named DesignDoc.

docs inherited from CouchClient
Future<SpatialView> getSpatialView(String docName, String viewName) {
 GetSpatialViewOP op = new GetSpatialViewOP(_connFactory.bucketName, docName, viewName);
 _handleHttpOperation(op);
 return op.future;
}

Future<View> getView(String docName, String viewName) #

Retrieve the named View in the named DesignDoc.

docs inherited from CouchClient
Future<View> getView(String docName, String viewName) {
 GetViewOP op = new GetViewOP(_connFactory.bucketName, docName, viewName);
 _handleHttpOperation(op);
 return op.future;
}

Future<Map<SocketAddress, dynamic>> handleBroadcastOperation(OP newOP(), Iterator<MemcachedNode> nodeIterator) #

inherited from MemcachedClientImpl
Future<Map<SocketAddress, dynamic>> handleBroadcastOperation(OP newOP(),
   Iterator<MemcachedNode> nodeIterator) {
 return new Future.sync(() {
   Map<SocketAddress, dynamic> results = new HashMap();
   List<Future> futures = new List();
   _memcachedConn.broadcastOP(newOP, nodeIterator)
   .forEach((saddr, op) {
     op.future
     .then((rv) => results[saddr] = rv)
     .catchError((err) => _logger.warning("broadcastOP. saddr: $saddr, OP: $op, Error: $err"));

     futures.add(op.future);
   });
   return Future.wait(futures).then((_) => results);
 });
}

Future<int> increment(String key, int by, {int def, int exptime}) #

inherited from MemcachedClientImpl

increment command

Future<int> increment(String key, int by, {int def, int exptime}) {
 MutateOP op = _opFactory.newMutateOP(OPType.incr, key, by, def, exptime);
 _handleOperation(key, op);
 return op.future;
}

Future<Map<String, String>> keyStats(String key) #

inherited from MemcachedClientImpl

keystats command

Future<Map<String, String>> keyStats(String key) {
 KeyStatsOP op = _opFactory.newKeyStatsOP(key);
 _handleOperation(key, op);
 return op.future;
}

Future<Set<String>> listSaslMechs() #

inherited from MemcachedClientImpl

Returns the set of supported SASL authentication mechanisms.

docs inherited from MemcachedClient
Future<Set<String>> listSaslMechs() {
 return handleBroadcastOperation(() =>
     _opFactory.newSaslMechsOP(), locator.allNodes.iterator).then((map) {
   HashSet<String> set = new HashSet();
   for(List<String> mechs in map.values)
     set.addAll(mechs);
   return set;
 });
}

Future<Map<SocketAddress, ObserveResult>> observe(String key, {int cas}) #

Observe a document with the specified key and check its persistency and replicas status in the cluster.

  • key - key of the document
  • cas - expected version of the observed document; null to ignore it. If specified and the document has been updated, ObserverStatus.MODIFIED would be returned in ObserveResult.status field.

docs inherited from CouchClient
Future<Map<SocketAddress, ObserveResult>> observe(String key, {int cas}) {
 return _connFactory.vbucketConfig
 .then((Config cfg) {
    VbucketNodeLocator vlocator = locator;
    int vb = vlocator.getVbucketIndex(key);
    List<MemcachedNode> nodes = new List();
    MemcachedNode primary = vlocator.getServerByIndex(cfg.getMaster(vb));
    nodes.add(primary);
    for (int j = 0, repCount = cfg.replicasCount; j < repCount; ++j) {
      int replica = cfg.getReplica(vb, j);
      if (replica >= 0)
        nodes.add(vlocator.getServerByIndex(replica));
    }
    return handleBroadcastOperation(() =>
        opFactory.newObserveOP(key, cas), nodes.iterator)
    .then((results) {
      results[primary.socketAddress].isPrimary = true;
      return results;
    });
 });
}

Future<bool> observePoll(String key, {int cas, PersistTo persistTo: PersistTo.ZERO, ReplicateTo replicateTo: ReplicateTo.ZERO, bool isDelete: false}) #

Poll and observe a key with the given cas and persist settings.

Based on the given persistTo, replicateTo, isDelete settings, it observes the key and raises an exception if a timeout has been reached. This method is normally used to make sure that a value is stored/deleted to the status you want it in the cluster.

If persistTo is not specified, it will default to PersistTo.ZERO and if replicateTo is not specified, it will default to ReplicateTo.ZERO. This is the default behavior and is the same as not observing at all.

  • key - the key to observe.
  • cas - (optional) CAS version for the key; default: null to ignore cas check.
  • persistTo - (optional) persistence setting; default: PersistTo.ZERO.
  • replicateTo - (optional) replication setting; default: ReplicateTo.ZERO.
  • isDelete - (optional) if the key is to be deleted; default: false.
docs inherited from CouchClient
Future<bool> observePoll(String key, {
   int cas,
   PersistTo persistTo: PersistTo.ZERO,
   ReplicateTo replicateTo: ReplicateTo.ZERO,
   bool isDelete: false}) {

 final Completer<bool> cmpl = new Completer();
 _connFactory.checkConfigAgainstDurability(persistTo, replicateTo)
 .then((_) => _connFactory.vbucketConfig)
 .then((cfg) {
   final int numPersistReplica = persistTo.value > 0 ? persistTo.value - 1 : 0;
   final int numReplica = replicateTo.value;
   final bool isPersistMaster = persistTo.value > 0;
   VbucketNodeLocator vlocator = locator;

   _checkObserveReplica(key, numPersistReplica, numReplica, cfg, vlocator);

   if (numReplica <= 0 && numPersistReplica <= 0 && !isPersistMaster) {
     cmpl.complete(true);
   } else {
     final int obsPolls = 0;
     final int obsPollMax = _connFactory.observePollMax;
     final Duration obsPollInterval =
         new Duration(milliseconds: _connFactory.observePollInterval);

     _observePoll0(
         cmpl,
         key,
         cas,
         isDelete,
         cfg,
         vlocator,
         numReplica,
         numPersistReplica,
         isPersistMaster,
         obsPolls,
         obsPollMax,
         obsPollInterval);
   }
 })
 .catchError((err) => cmpl.completeError(err));

 return cmpl.future;
}

Future<bool> prepend(String key, List<int> doc, {int cas, int exptime}) #

inherited from MemcachedClientImpl

prepend command

Future<bool> prepend(String key, List<int> doc, {int cas, int exptime}) =>
   _store(OPType.prepend, key, 0, exptime, doc, cas);

Future<ViewResponse> query(ViewBase view, Query query) #

query data from the couchbase with the spcified View(can be View or SpatialView) and query condition.

docs inherited from CouchClient
Future<ViewResponse> query(ViewBase view, Query query) {
 if (view.hasReduce && !query.args.containsKey('reduce')) {
   query.reduce = true;
 }

 if (query.willReduce) {
   return _queryReduced(view, query);
 } else if (query.includeDocs) {
   return _queryWithDocs(view, query);
 } else {
   return _queryNoDocs(view, query);
 }
}

void reconfigure(Bucket bucket) #

Called on configuration updates.

docs inherited from Reconfigurable
void reconfigure(Bucket bucket) {
 if (_reconfiguring) return;
 try {
   _reconfiguring = true;
   if (bucket.isNotUpdating) {
     _logger.info("Bucket configuration is disconnected from cluster "
         "configuration updates, attempting to reconnect.");
     _connFactory.requestConfigReconnect(_connFactory.bucketName, this);
     _connFactory.checkConfigUpdate();
   }
   _connFactory.configProvider.buckets[_connFactory.bucketName] = bucket;

   if(_viewConn != null) {
     _viewConn.reconfigure(bucket);
   }
   if (memcachedConn is Reconfigurable) {
     (memcachedConn as Reconfigurable).reconfigure(bucket);
   }
 } finally {
   _reconfiguring = false;
 }
}

Future<bool> replace(String key, List<int> doc, {int cas, int exptime}) #

inherited from MemcachedClientImpl

replace command

Future<bool> replace(String key, List<int> doc, {int cas, int exptime}) =>
   _store(OPType.replace, key, 0, exptime, doc, cas);

Future<bool> set(String key, List<int> doc, {int cas, int exptime}) #

inherited from MemcachedClientImpl

set command

Future<bool> set(String key, List<int> doc, {int cas, int exptime}) =>
   _store(OPType.set, key, 0, exptime, doc, cas);

Future<Map<SocketAddress, Map<String, String>>> stats({String prefix}) #

inherited from MemcachedClientImpl

stats command

Future<Map<SocketAddress, Map<String, String>>> stats({String prefix}) =>
 handleBroadcastOperation(() =>
     _opFactory.newStatsOP(prefix), locator.allNodes.iterator);

Future<bool> touch(String key, int exp, [bool noreply]) #

inherited from MemcachedClientImpl

touch command

Future<bool> touch(String key, int exp, [bool noreply]) {
 TouchOP op = _opFactory.newTouchOP(key, exp);
 _handleOperation(key, op);
 return op.future;
}

Future<bool> unlock(String key, {int cas}) #

inherited from MemcachedClientImpl

unlock command

Future<bool> unlock(String key, {int cas}) {
 UnlockOP op = _opFactory.newUnlockOP(key, cas);
 _handleOperation(key, op);
 return op.future;
}

Future<Map<SocketAddress, String>> versions() #

inherited from MemcachedClientImpl

versions command

Future<Map<SocketAddress, String>> versions() =>
 handleBroadcastOperation(() =>
     _opFactory.newVersionOP(), locator.allNodes.iterator);