packages: db4.6/patch.4.6.21.2 (NEW), db4.6/patch.4.6.21.3 (NEW), db4.6/pat...
glen
glen at pld-linux.org
Fri Jun 5 00:05:08 CEST 2009
Author: glen Date: Thu Jun 4 22:05:08 2009 GMT
Module: packages Tag: HEAD
---- Log message:
- official patches
---- Files affected:
packages/db4.6:
patch.4.6.21.2 (NONE -> 1.1) (NEW), patch.4.6.21.3 (NONE -> 1.1) (NEW), patch.4.6.21.4 (NONE -> 1.1) (NEW)
---- Diffs:
================================================================
Index: packages/db4.6/patch.4.6.21.2
diff -u /dev/null packages/db4.6/patch.4.6.21.2:1.1
--- /dev/null Fri Jun 5 00:05:08 2009
+++ packages/db4.6/patch.4.6.21.2 Fri Jun 5 00:05:03 2009
@@ -0,0 +1,27 @@
+*** mp/mp_region.c 2007-05-18 03:18:01.000000000 +1000
+--- mp/mp_region.c 2008-06-24 13:15:56.000000000 +1000
+***************
+*** 249,256 ****
+ mtx_base = htab[0].mtx_hash;
+ }
+
+ if (mtx_base != MUTEX_INVALID)
+! mtx_base += reginfo_off * htab_buckets;
+
+ /* Allocate hash table space and initialize it. */
+ if ((ret = __env_alloc(infop,
+--- 249,262 ----
+ mtx_base = htab[0].mtx_hash;
+ }
+
++ /*
++ * We preallocated all of the mutexes in a block, so for regions after
++ * the first, we skip mutexes in use in earlier regions. Each region
++ * has the same number of buckets and there are two mutexes per hash
++ * bucket (the bucket mutex and the I/O mutex).
++ */
+ if (mtx_base != MUTEX_INVALID)
+! mtx_base += reginfo_off * htab_buckets * 2;
+
+ /* Allocate hash table space and initialize it. */
+ if ((ret = __env_alloc(infop,
================================================================
Index: packages/db4.6/patch.4.6.21.3
diff -u /dev/null packages/db4.6/patch.4.6.21.3:1.1
--- /dev/null Fri Jun 5 00:05:08 2009
+++ packages/db4.6/patch.4.6.21.3 Fri Jun 5 00:05:03 2009
@@ -0,0 +1,74 @@
+*** sequence/sequence.c
+--- sequence/sequence.c
+***************
+*** 196,202 ****
+ if ((ret = __db_get_flags(dbp, &tflags)) != 0)
+ goto err;
+
+! if (DB_IS_READONLY(dbp)) {
+ ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open");
+ goto err;
+ }
+--- 196,206 ----
+ if ((ret = __db_get_flags(dbp, &tflags)) != 0)
+ goto err;
+
+! /*
+! * We can let replication clients open sequences, but must
+! * check later that they do not update them.
+! */
+! if (F_ISSET(dbp, DB_AM_RDONLY)) {
+ ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open");
+ goto err;
+ }
+***************
+*** 252,257 ****
+--- 256,266 ----
+ if ((ret != DB_NOTFOUND && ret != DB_KEYEMPTY) ||
+ !LF_ISSET(DB_CREATE))
+ goto err;
++ if (IS_REP_CLIENT(dbenv) &&
++ !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
++ ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
++ goto err;
++ }
+ ret = 0;
+
+ rp = &seq->seq_record;
+***************
+*** 304,310 ****
+ */
+ rp = seq->seq_data.data;
+ if (rp->seq_version == DB_SEQUENCE_OLDVER) {
+! oldver: rp->seq_version = DB_SEQUENCE_VERSION;
+ if (__db_isbigendian()) {
+ if (IS_DB_AUTO_COMMIT(dbp, txn)) {
+ if ((ret =
+--- 313,324 ----
+ */
+ rp = seq->seq_data.data;
+ if (rp->seq_version == DB_SEQUENCE_OLDVER) {
+! oldver: if (IS_REP_CLIENT(dbenv) &&
+! !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
+! ret = __db_rdonly(dbenv, "DB_SEQUENCE->open");
+! goto err;
+! }
+! rp->seq_version = DB_SEQUENCE_VERSION;
+ if (__db_isbigendian()) {
+ if (IS_DB_AUTO_COMMIT(dbp, txn)) {
+ if ((ret =
+***************
+*** 713,718 ****
+--- 727,738 ----
+
+ MUTEX_LOCK(dbenv, seq->mtx_seq);
+
++ if (handle_check && IS_REP_CLIENT(dbenv) &&
++ !F_ISSET(dbp, DB_AM_NOT_DURABLE)) {
++ ret = __db_rdonly(dbenv, "DB_SEQUENCE->get");
++ goto err;
++ }
++
+ if (rp->seq_min + delta > rp->seq_max) {
+ __db_errx(dbenv, "Sequence overflow");
+ ret = EINVAL;
================================================================
Index: packages/db4.6/patch.4.6.21.4
diff -u /dev/null packages/db4.6/patch.4.6.21.4:1.1
--- /dev/null Fri Jun 5 00:05:08 2009
+++ packages/db4.6/patch.4.6.21.4 Fri Jun 5 00:05:03 2009
@@ -0,0 +1,1414 @@
+*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700
+--- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 36,41 ****
+--- 36,55 ----
+ #endif
+
+ /*
++ * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
++ * a queue per connection, waiting for TCP buffer space to become available in
++ * the kernel. Rather than exceeding this limit, we simply discard additional
++ * messages (since this is always allowed by the replication protocol).
++ * As a special dispensation, if a message is destined for a specific remote
++ * site (i.e., it's not a broadcast), then we first try blocking the sending
++ * thread, waiting for space to become available (though we only wait a limited
++ * time). This is so as to be able to handle the immediate flood of (a
++ * potentially large number of) outgoing messages that replication generates, in
++ * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
++ */
++ #define OUT_QUEUE_LIMIT 10
++
++ /*
+ * The system value is available from sysconf(_SC_HOST_NAME_MAX).
+ * Historically, the maximum host name was 256.
+ */
+***************
+*** 47,52 ****
+--- 61,71 ----
+ #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
+ typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
+
++ /* Default timeout values, in seconds. */
++ #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC)
++ #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC)
++ #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC)
++
+ struct __repmgr_connection;
+ typedef struct __repmgr_connection REPMGR_CONNECTION;
+ struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
+***************
+*** 171,178 ****
+ #ifdef DB_WIN32
+ WSAEVENT event_object;
+ #endif
+! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */
+! #define CONN_DEFUNCT 0x02 /* socket close pending */
+ u_int32_t flags;
+
+ /*
+--- 190,198 ----
+ #ifdef DB_WIN32
+ WSAEVENT event_object;
+ #endif
+! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */
+! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */
+! #define CONN_DEFUNCT 0x04 /* socket close pending */
+ u_int32_t flags;
+
+ /*
+***************
+*** 180,189 ****
+ * send() function's thread. But if TCP doesn't have enough network
+ * buffer space for us when we first try it, we instead allocate some
+ * memory, and copy the message, and then send it as space becomes
+! * available in our main select() thread.
+ */
+ OUT_Q_HEADER outbound_queue;
+ int out_queue_length;
+
+ /*
+ * Input: while we're reading a message, we keep track of what phase
+--- 200,215 ----
+ * send() function's thread. But if TCP doesn't have enough network
+ * buffer space for us when we first try it, we instead allocate some
+ * memory, and copy the message, and then send it as space becomes
+! * available in our main select() thread. In some cases, if the queue
+! * gets too long we wait until it's drained, and then append to it.
+! * This condition variable's associated mutex is the normal per-repmgr
+! * db_rep->mutex, because that mutex is always held anyway whenever the
+! * output queue is consulted.
+ */
+ OUT_Q_HEADER outbound_queue;
+ int out_queue_length;
++ cond_var_t drained;
++ int blockers; /* ref count of msg threads waiting on us */
+
+ /*
+ * Input: while we're reading a message, we keep track of what phase
+*** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
+--- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
+***************
+*** 1420,1425 ****
+--- 1420,1428 ----
+ #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders at DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_await_ack __repmgr_await_ack at DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline at DB_VERSION_UNIQUE_NAME@
++ #define __repmgr_await_drain __repmgr_await_drain at DB_VERSION_UNIQUE_NAME@
++ #define __repmgr_alloc_cond __repmgr_alloc_cond at DB_VERSION_UNIQUE_NAME@
++ #define __repmgr_free_cond __repmgr_free_cond at DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_init_sync __repmgr_init_sync at DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_close_sync __repmgr_close_sync at DB_VERSION_UNIQUE_NAME@
+ #define __repmgr_net_init __repmgr_net_init at DB_VERSION_UNIQUE_NAME@
+*** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
+--- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
+***************
+*** 21,30 ****
+ int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
+ void __repmgr_stash_generation __P((DB_ENV *));
+ int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
+ int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
+! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+ int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+ int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
+ int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
+--- 21,30 ----
+ int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
+ void __repmgr_stash_generation __P((DB_ENV *));
+ int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
+! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
+ int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
+! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
+ int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
+ int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
+ int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
+***************
+*** 39,44 ****
+--- 39,47 ----
+ int __repmgr_wake_waiting_senders __P((DB_ENV *));
+ int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
+ void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
++ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
++ int __repmgr_alloc_cond __P((cond_var_t *));
++ int __repmgr_free_cond __P((cond_var_t *));
+ int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
+ int __repmgr_close_sync __P((DB_ENV *));
+ int __repmgr_net_init __P((DB_ENV *, DB_REP *));
+*** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 196,204 ****
+ int ret;
+
+ /* Set some default values. */
+! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */
+! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */
+! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */
+ db_rep->config_nsites = 0;
+ db_rep->peer = DB_EID_INVALID;
+ db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+--- 196,204 ----
+ int ret;
+
+ /* Set some default values. */
+! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
+! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
+! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
+ db_rep->config_nsites = 0;
+ db_rep->peer = DB_EID_INVALID;
+ db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
+***************
+*** 238,243 ****
+--- 238,244 ----
+ DB_ENV *dbenv;
+ {
+ DB_REP *db_rep;
++ REPMGR_CONNECTION *conn;
+ int ret;
+
+ db_rep = dbenv->rep_handle;
+***************
+*** 254,259 ****
+--- 255,266 ----
+
+ if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
+ goto unlock;
++
++ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
++ if (conn->blockers > 0 &&
++ ((ret = __repmgr_signal(&conn->drained)) != 0))
++ goto unlock;
++ }
+ UNLOCK_MUTEX(db_rep->mutex);
+
+ return (__repmgr_wake_main_thread(dbenv));
+*** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 183,192 ****
+
+ /*
+ * Acknowledges a message.
+- *
+- * !!!
+- * Note that this cannot be called from the select() thread, in case we call
+- * __repmgr_bust_connection(..., FALSE).
+ */
+ static int
+ ack_message(dbenv, generation, lsn)
+--- 183,188 ----
+***************
+*** 227,235 ****
+ rec2.size = 0;
+
+ conn = site->ref.conn;
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+! &control2, &rec2)) == DB_REP_UNAVAIL)
+! ret = __repmgr_bust_connection(dbenv, conn, FALSE);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+--- 223,236 ----
+ rec2.size = 0;
+
+ conn = site->ref.conn;
++ /*
++ * It's hard to imagine anyone would care about a lost ack if
++ * the path to the master is so congested as to need blocking;
++ * so pass "blockable" argument as FALSE.
++ */
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
+! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
+! ret = __repmgr_bust_connection(dbenv, conn);
+ }
+
+ UNLOCK_MUTEX(db_rep->mutex);
+*** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700
+--- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700
+***************
+*** 63,69 ****
+ static void setup_sending_msg
+ __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+ static int __repmgr_send_internal
+! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
+ static int enqueue_msg
+ __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+ static int flatten __P((DB_ENV *, struct sending_msg *));
+--- 63,69 ----
+ static void setup_sending_msg
+ __P((struct sending_msg *, u_int, const DBT *, const DBT *));
+ static int __repmgr_send_internal
+! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
+ static int enqueue_msg
+ __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
+ static int flatten __P((DB_ENV *, struct sending_msg *));
+***************
+*** 73,85 ****
+ * __repmgr_send --
+ * The send function for DB_ENV->rep_set_transport.
+ *
+- * !!!
+- * This is only ever called as the replication transport call-back, which means
+- * it's either on one of our message processing threads or an application
+- * thread. It mustn't be called from the select() thread, because we might call
+- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
+- * select() thread.
+- *
+ * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
+ * PUBLIC: const DB_LSN *, int, u_int32_t));
+ */
+--- 73,78 ----
+***************
+*** 126,134 ****
+ }
+
+ conn = site->ref.conn;
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+! control, rec)) == DB_REP_UNAVAIL &&
+! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
+ ret = t_ret;
+ if (ret != 0)
+ goto out;
+--- 119,128 ----
+ }
+
+ conn = site->ref.conn;
++ /* Pass the "blockable" argument as TRUE. */
+ if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
+! control, rec, TRUE)) == DB_REP_UNAVAIL &&
+! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+ ret = t_ret;
+ if (ret != 0)
+ goto out;
+***************
+*** 222,228 ****
+ if (site->state != SITE_CONNECTED)
+ return (NULL);
+
+! if (F_ISSET(site->ref.conn, CONN_CONNECTING))
+ return (NULL);
+ return (site);
+ }
+--- 216,222 ----
+ if (site->state != SITE_CONNECTED)
+ return (NULL);
+
+! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
+ return (NULL);
+ return (site);
+ }
+***************
+*** 235,244 ****
+ *
+ * !!!
+ * Caller must hold dbenv->mutex.
+- *
+- * !!!
+- * Note that this cannot be called from the select() thread, in case we call
+- * __repmgr_bust_connection(..., FALSE).
+ */
+ static int
+ __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
+--- 229,234 ----
+***************
+*** 268,281 ****
+ !IS_VALID_EID(conn->eid))
+ continue;
+
+! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
+ site = SITE_FROM_EID(conn->eid);
+ nsites++;
+ if (site->priority > 0)
+ npeers++;
+ } else if (ret == DB_REP_UNAVAIL) {
+! if ((ret = __repmgr_bust_connection(
+! dbenv, conn, FALSE)) != 0)
+ return (ret);
+ } else
+ return (ret);
+--- 258,277 ----
+ !IS_VALID_EID(conn->eid))
+ continue;
+
+! /*
+! * Broadcast messages are either application threads committing
+! * transactions, or replication status message that we can
+! * afford to lose. So don't allow blocking for them (pass
+! * "blockable" argument as FALSE).
+! */
+! if ((ret = __repmgr_send_internal(dbenv,
+! conn, &msg, FALSE)) == 0) {
+ site = SITE_FROM_EID(conn->eid);
+ nsites++;
+ if (site->priority > 0)
+ npeers++;
+ } else if (ret == DB_REP_UNAVAIL) {
+! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+ return (ret);
+ } else
+ return (ret);
+***************
+*** 301,339 ****
+ * intersperse writes that are part of two single messages.
+ *
+ * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+! * PUBLIC: u_int, const DBT *, const DBT *));
+ */
+ int
+! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ u_int msg_type;
+ const DBT *control, *rec;
+ {
+ struct sending_msg msg;
+
+ setup_sending_msg(&msg, msg_type, control, rec);
+! return (__repmgr_send_internal(dbenv, conn, &msg));
+ }
+
+ /*
+ * Attempts a "best effort" to send a message on the given site. If there is an
+! * excessive backlog of message already queued on the connection, we simply drop
+! * this message, and still return 0 even in this case.
+ */
+ static int
+! __repmgr_send_internal(dbenv, conn, msg)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ struct sending_msg *msg;
+ {
+! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
+ REPMGR_IOVECS iovecs;
+ SITE_STRING_BUFFER buffer;
+ int ret;
+ size_t nw;
+ size_t total_written;
+
+ DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ /*
+--- 297,355 ----
+ * intersperse writes that are part of two single messages.
+ *
+ * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
+! * PUBLIC: u_int, const DBT *, const DBT *, int));
+ */
+ int
+! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ u_int msg_type;
+ const DBT *control, *rec;
++ int blockable;
+ {
+ struct sending_msg msg;
+
+ setup_sending_msg(&msg, msg_type, control, rec);
+! return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
+ }
+
+ /*
+ * Attempts a "best effort" to send a message on the given site. If there is an
+! * excessive backlog of message already queued on the connection, what shall we
+! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of
+! * time) for the queue to drain. Otherwise we'll simply drop the message. This
+! * is always allowed by the replication protocol. But in the case of a
+! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
+! * almost always get a flood of messages that instantly fills our queue, so
+! * blocking improves performance (by avoiding the need for the client to
+! * re-request).
+! *
+! * How long shall we wait? We could of course create a new timeout
+! * configuration type, so that the application could set it directly. But that
+! * would start to overwhelm the user with too many choices to think about. We
+! * already have an ACK timeout, which is the user's estimate of how long it
+! * should take to send a message to the client, have it be processed, and return
+! * a message back to us. We multiply that by the queue size, because that's how
+! * many messages have to be swallowed up by the client before we're able to
+! * start sending again (at least to a rough approximation).
+ */
+ static int
+! __repmgr_send_internal(dbenv, conn, msg, blockable)
+ DB_ENV *dbenv;
+ REPMGR_CONNECTION *conn;
+ struct sending_msg *msg;
++ int blockable;
+ {
+! DB_REP *db_rep;
+ REPMGR_IOVECS iovecs;
+ SITE_STRING_BUFFER buffer;
++ db_timeout_t drain_to;
+ int ret;
+ size_t nw;
+ size_t total_written;
+
++ db_rep = dbenv->rep_handle;
++
+ DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
+ if (!STAILQ_EMPTY(&conn->outbound_queue)) {
+ /*
+***************
+*** 344,358 ****
+ RPRINT(dbenv, (dbenv, "msg to %s to be queued",
+ __repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer)));
+ if (conn->out_queue_length < OUT_QUEUE_LIMIT)
+ return (enqueue_msg(dbenv, conn, msg, 0));
+ else {
+ RPRINT(dbenv, (dbenv, "queue limit exceeded"));
+ STAT(dbenv->rep_handle->
+ region->mstat.st_msgs_dropped++);
+! return (0);
+ }
+ }
+
+ /*
+ * Send as much data to the site as we can, without blocking. Keep
+--- 360,393 ----
+ RPRINT(dbenv, (dbenv, "msg to %s to be queued",
+ __repmgr_format_eid_loc(dbenv->rep_handle,
+ conn->eid, buffer)));
++ if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
++ blockable && !F_ISSET(conn, CONN_CONGESTED)) {
++ RPRINT(dbenv, (dbenv,
++ "block msg thread, await queue space"));
<<Diff was trimmed, longer than 597 lines>>
More information about the pld-cvs-commit
mailing list