[packages/memcached] up to 1.4.36; drop repcached patch (unmaintained)
glen
glen at pld-linux.org
Tue Apr 25 01:12:03 CEST 2017
commit dacfa27336a56b0388e2779d16efed2b4ef3db22
Author: Elan Ruusamäe <glen at delfi.ee>
Date: Tue Apr 25 02:09:23 2017 +0300
up to 1.4.36; drop repcached patch (unmaintained)
memcached.spec | 13 +-
repcached.patch | 1828 -------------------------------------------------------
2 files changed, 4 insertions(+), 1837 deletions(-)
---
diff --git a/memcached.spec b/memcached.spec
index b3b6272..1fe5c48 100644
--- a/memcached.spec
+++ b/memcached.spec
@@ -8,23 +8,19 @@
# ^
#crawler.c:229:13: warning: format '%ld' expects argument of type 'long int', but argument 5 has type 'time_t {aka long long int}' [-Wformat=]
-# Conditional build:
-%bcond_with repcached # repcached support, http://repcached.lab.klab.org/
-
Summary: A high-performance, distributed memory object caching system
Summary(pl.UTF-8): Rozproszony, wysokiej wydajności system cache'owania obiektów
Name: memcached
-Version: 1.4.33
-Release: 2
+Version: 1.4.36
+Release: 1
License: BSD
Group: Networking/Daemons
Source0: http://www.memcached.org/files/%{name}-%{version}.tar.gz
-# Source0-md5: 2d7f6476283cd36e21e521d901d37a8f
+# Source0-md5: 1e028fbab7288911fcaa5ed2a21817fe
Source1: %{name}.init
Source2: %{name}.sysconfig
Source3: %{name}.tmpfiles
-URL: http://memcached.org/
-Patch0: repcached.patch
+URL: https://memcached.org/about
BuildRequires: autoconf
BuildRequires: automake
BuildRequires: libevent-devel >= 1.1
@@ -61,7 +57,6 @@ require access to the memcached binary include files.
%prep
%setup -q
-%{?with_repcached:%patch0 -p1}
sed -nie '1,/^$/p' ChangeLog
diff --git a/repcached.patch b/repcached.patch
deleted file mode 100644
index 4193fae..0000000
--- a/repcached.patch
+++ /dev/null
@@ -1,1828 +0,0 @@
---- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009
-+++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010
-@@ -31,6 +31,10 @@
- memcached_SOURCES += sasl_defs.c
- endif
-
-+if ENABLE_REPLICATION
-+memcached_SOURCES += replication.h replication.c
-+endif
-+
- memcached_debug_SOURCES = $(memcached_SOURCES)
- memcached_CPPFLAGS = -DNDEBUG
- memcached_debug_LDADD = @PROFILER_LDFLAGS@
---- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009
-+++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010
-@@ -258,3 +258,51 @@
- }
-
-
-+#ifdef USE_REPLICATION
-+char *assoc_key_snap(int *n)
-+{
-+ char *p = NULL;
-+ char *b = NULL;
-+ item *i = NULL;
-+ int co = 0;
-+ int sz = 1;
-+ int hs = 0;
-+ int hm = hashsize(hashpower);
-+
-+ hs = hm;
-+ while(hs--){
-+ if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
-+ i = old_hashtable[hs];
-+ }else{
-+ i = primary_hashtable[hs];
-+ }
-+ while(i){
-+ sz += i->nkey + 1;
-+ co++;
-+ i = i->h_next;
-+ }
-+ }
-+
-+ if(co){
-+ if((p = b = malloc(sz))){
-+ hs = hm;
-+ while(hs--){
-+ if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
-+ i = old_hashtable[hs];
-+ }else{
-+ i = primary_hashtable[hs];
-+ }
-+ while(i){
-+ memcpy(p, ITEM_key(i), i->nkey);
-+ p += i->nkey;
-+ *(p++) = 0;
-+ i = i->h_next;
-+ }
-+ }
-+ *(p++) = 0;
-+ }
-+ }
-+ if(n) *n = co;
-+ return(b);
-+}
-+#endif /* USE_REPLICATION */
---- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009
-+++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010
-@@ -7,3 +7,6 @@
- int start_assoc_maintenance_thread(void);
- void stop_assoc_maintenance_thread(void);
-
-+#ifdef USE_REPLICATION
-+char *assoc_key_snap(int *n);
-+#endif /*USE_REPLICATION*/
---- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009
-+++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010
-@@ -99,6 +99,9 @@
- /* Define to 1 if you have the ANSI C header files. */
- #undef STDC_HEADERS
-
-+/* Define this if you want to use replication */
-+#undef USE_REPLICATION
-+
- /* Version number of package */
- #undef VERSION
-
---- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009
-+++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010
-@@ -382,6 +382,18 @@
- AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
- fi
-
-+dnl Check whether the user wants replication or not
-+AC_ARG_ENABLE(replication,
-+ [AS_HELP_STRING([--enable-replication],[support replication])],
-+ [if test "x$enable_threads" = "xyes"; then
-+ AC_MSG_ERROR([Can't enable threads and replication together.])
-+ else
-+ AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication])
-+ fi
-+ ])
-+
-+AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes")
-+
- AC_CHECK_FUNCS(mlockall)
- AC_CHECK_FUNCS(getpagesizes)
- AC_CHECK_FUNCS(memcntl)
---- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009
-+++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010
-@@ -155,6 +155,9 @@
- STATS_LOCK();
- stats.evictions++;
- STATS_UNLOCK();
-+#ifdef USE_REPLICATION
-+ replication_call_del(ITEM_key(search), search->nkey);
-+#endif /* USE_REPLICATION */
- }
- do_item_unlink(search);
- break;
-@@ -288,8 +291,14 @@
- stats.total_items += 1;
- STATS_UNLOCK();
-
-+#ifdef USE_REPLICATION
-+ /* Allocate a new CAS ID on link. */
-+ if(!(it->it_flags & ITEM_REPDATA))
-+ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
-+#else
- /* Allocate a new CAS ID on link. */
- ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
-+#endif /* USE_REPLICATION */
-
- item_link_q(it);
-
---- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009
-+++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010
-@@ -102,6 +102,30 @@
-
- static void conn_free(conn *c);
-
-+#ifdef USE_REPLICATION
-+static int rep_exit = 0;
-+static conn *rep_recv = NULL;
-+static conn *rep_send = NULL;
-+static conn *rep_conn = NULL;
-+static conn *rep_serv = NULL;
-+static int server_socket_replication(const int);
-+static void server_close_replication(void);
-+static int replication_init(void);
-+static int replication_server_init(void);
-+static int replication_client_init(void);
-+static int replication_start(void);
-+static int replication_connect(void);
-+static int replication_close(void);
-+static void replication_dispatch_close(void);
-+static int replication_marugoto(int);
-+static int replication_send(conn *);
-+static int replication_pop(void);
-+static int replication_push(void);
-+static int replication_exit(void);
-+static int replication_item(Q_ITEM *);
-+static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER;
-+#endif /* USE_REPLICATION */
-+
- /** exported globals **/
- struct stats stats;
- struct settings settings;
-@@ -194,6 +218,11 @@
- settings.backlog = 1024;
- settings.binding_protocol = negotiating_prot;
- settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
-+#ifdef USE_REPLICATION
-+ settings.rep_addr.s_addr = htonl(INADDR_ANY);
-+ settings.rep_port = 11212;
-+ settings.rep_qmax = 8192;
-+#endif /* USE_REPLICATION */
- }
-
- /*
-@@ -382,6 +411,10 @@
- prot_text(c->protocol));
- } else if (IS_UDP(transport)) {
- fprintf(stderr, "<%d server listening (udp)\n", sfd);
-+#ifdef USE_REPLICATION
-+ } else if (init_state == conn_rep_listen) {
-+ fprintf(stderr, "<%d server listening (replication)\n", sfd);
-+#endif /* USE_REPLICATION */
- } else if (c->protocol == negotiating_prot) {
- fprintf(stderr, "<%d new auto-negotiating client connection\n",
- sfd);
-@@ -593,7 +626,11 @@
- "conn_nread",
- "conn_swallow",
- "conn_closing",
-- "conn_mwrite" };
-+ "conn_mwrite",
-+ "conn_repconnect",
-+ "conn_rep_listen",
-+ "conn_pipe_recv",
-+ "conn_pipe_send" };
- return statenames[state];
- }
-
-@@ -752,6 +789,14 @@
-
- assert(c != NULL);
-
-+#ifdef USE_REPLICATION
-+ if (c == rep_conn){
-+ if (settings.verbose > 1)
-+ fprintf(stderr, "REP>%d %s\n", c->sfd, str);
-+ conn_set_state(c, conn_new_cmd);
-+ return;
-+ }
-+#endif /* USE_REPLICATION */
- if (c->noreply) {
- if (settings.verbose > 1)
- fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
-@@ -791,9 +836,11 @@
- int comm = c->cmd;
- enum store_item_type ret;
-
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
-
- if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
- out_string(c, "CLIENT_ERROR bad data chunk");
-@@ -832,6 +879,11 @@
-
- switch (ret) {
- case STORED:
-+#ifdef USE_REPLICATION
-+ if( c != rep_conn ){
-+ replication_call_rep(ITEM_key(it), it->nkey);
-+ }
-+#endif /* USE_REPLICATION */
- out_string(c, "STORED");
- break;
- case EXISTS:
-@@ -2410,6 +2462,11 @@
- APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
- APPEND_STAT("threads", "%d", settings.num_threads);
- APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
-+#ifdef USE_REPLICATION
-+ APPEND_STAT("replication", "MASTER", 0);
-+ APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION);
-+ APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count());
-+#endif /*USE_REPLICATION*/
- STATS_UNLOCK();
- }
-
-@@ -2797,6 +2854,11 @@
- switch(add_delta(c, it, incr, delta, temp)) {
- case OK:
- out_string(c, temp);
-+#ifdef USE_REPLICATION
-+ if( c != rep_conn){
-+ replication_call_rep(ITEM_key(it), it->nkey);
-+ }
-+#endif /* USE_REPLICATION */
- break;
- case NON_NUMERIC:
- out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
-@@ -2911,17 +2973,25 @@
- if (it) {
- MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
-
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
-
- item_unlink(it);
- item_remove(it); /* release our reference */
-+#ifdef USE_REPLICATION
-+ if( c != rep_conn )
-+ replication_call_del(key, nkey);
-+#endif /* USE_REPLICATION */
- out_string(c, "DELETED");
- } else {
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.delete_misses++;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.delete_misses++;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
-
- out_string(c, "NOT_FOUND");
- }
-@@ -2986,6 +3056,22 @@
-
- process_update_command(c, tokens, ntokens, comm, true);
-
-+#ifdef USE_REPLICATION
-+ } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) {
-+
-+ process_update_command(c, tokens, ntokens, comm, true);
-+ if(c->item)
-+ ((item *)(c->item))->it_flags |= ITEM_REPDATA;
-+
-+ } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) {
-+ if(replication_start() == -1)
-+ exit(EXIT_FAILURE);
-+ if (settings.verbose > 0)
-+ fprintf(stderr,"replication: start\n");
-+ out_string(c, "OK");
-+ return;
-+
-+#endif /* USE_REPLICATION */
- } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
-
- process_arithmetic_command(c, tokens, ntokens, 1);
-@@ -3012,11 +3098,17 @@
-
- set_noreply_maybe(c, tokens, ntokens);
-
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.flush_cmds++;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.flush_cmds++;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
-
- if(ntokens == (c->noreply ? 3 : 2)) {
-+#ifdef USE_REPLICATION
-+ if( c != rep_conn )
-+ replication_call_flush_all();
-+#endif
- settings.oldest_live = current_time - 1;
- item_flush_expired();
- out_string(c, "OK");
-@@ -3029,6 +3121,11 @@
- return;
- }
-
-+#ifdef USE_REPLICATION
-+ if( c != rep_conn )
-+ replication_call_defer_flush_all(realtime(exptime) + process_started);
-+#endif
-+ settings.oldest_live = realtime(exptime) - 1;
- /*
- If exptime is zero realtime() would return zero too, and
- realtime(exptime) - 1 would overflow to the max unsigned
-@@ -3275,9 +3372,11 @@
- int avail = c->rsize - c->rbytes;
- res = read(c->sfd, c->rbuf + c->rbytes, avail);
- if (res > 0) {
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.bytes_read += res;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.bytes_read += res;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
- gotdata = READ_DATA_RECEIVED;
- c->rbytes += res;
- if (res == avail) {
-@@ -3423,6 +3522,12 @@
-
- assert(c != NULL);
-
-+#ifdef USE_REPLICATION
-+ if(rep_exit && (c->state != conn_pipe_recv)){
-+ return;
-+ }
-+#endif /* USE_REPLICATION */
-+
- while (!stop) {
-
- switch(c->state) {
-@@ -3502,9 +3607,11 @@
- if (nreqs >= 0) {
- reset_cmd_handler(c);
- } else {
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.conn_yields++;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.conn_yields++;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
- if (c->rbytes > 0) {
- /* We have already read in data into the input buffer,
- so libevent will most likely not signal read events
-@@ -3545,9 +3652,11 @@
- /* now try reading from the socket */
- res = read(c->sfd, c->ritem, c->rlbytes);
- if (res > 0) {
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.bytes_read += res;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.bytes_read += res;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
- if (c->rcurr == c->ritem) {
- c->rcurr += res;
- }
-@@ -3600,9 +3709,11 @@
- /* now try reading from the socket */
- res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
- if (res > 0) {
-- pthread_mutex_lock(&c->thread->stats.mutex);
-- c->thread->stats.bytes_read += res;
-- pthread_mutex_unlock(&c->thread->stats.mutex);
-+ if (c->thread) {
-+ pthread_mutex_lock(&c->thread->stats.mutex);
-+ c->thread->stats.bytes_read += res;
-+ pthread_mutex_unlock(&c->thread->stats.mutex);
-+ }
- c->sbytes -= res;
- break;
- }
-@@ -3698,6 +3809,10 @@
- case conn_closing:
- if (IS_UDP(c->transport))
- conn_cleanup(c);
-+#ifdef USE_REPLICATION
-+ else if(c == rep_conn)
-+ replication_close();
-+#endif /*USE_REPLICATION*/
- else
- conn_close(c);
- stop = true;
-@@ -3706,6 +3821,70 @@
- case conn_max_state:
- assert(false);
- break;
-+
-+#ifdef USE_REPLICATION
-+ case conn_pipe_recv:
-+ if(replication_pop()){
-+ replication_close();
-+ }else{
-+ replication_send(rep_conn);
-+ }
-+ stop = true;
-+ break;
-+
-+ case conn_rep_listen:
-+ if (settings.verbose > 0)
-+ fprintf(stderr,"replication: accept\n");
-+ addrlen = sizeof(addr);
-+ res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
-+ if(res == -1){
-+ if(errno == EAGAIN || errno == EWOULDBLOCK) {
-+ } else if (errno == EMFILE) {
-+ fprintf(stderr, "replication: Too many opened connections\n");
-+ } else {
-+ fprintf(stderr, "replication: accept error\n");
-+ }
-+ }else{
-+ if(rep_conn){
-+ close(res);
-+ fprintf(stderr,"replication: already connected\n");
-+ }else{
-+ if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){
-+ close(res);
-+ fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n");
-+ }else{
-+ server_close_replication();
-+ rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
-+ rep_conn->item = NULL;
-+ rep_conn->rbytes = 0;
-+ rep_conn->rcurr = rep_conn->rbuf;
-+ replication_connect();
-+ replication_marugoto(1);
-+ replication_marugoto(0);
-+ }
-+ }
-+ }
-+ stop = true;
-+ break;
-+
-+ case conn_repconnect:
-+ rep_conn = c;
-+ replication_connect();
-+ conn_set_state(c, conn_read);
-+ if (settings.verbose > 0)
-+ fprintf(stderr,"replication: marugoto copying\n");
-+ if(!update_event(c, EV_READ | EV_PERSIST)){
-+ fprintf(stderr, "replication: Couldn't update event\n");
-+ conn_set_state(c, conn_closing);
-+ }
-+ stop = true;
-+ break;
-+
-+ case conn_pipe_send:
-+ /* should not happen */
-+ fprintf(stderr, "replication: unexpected conn_pipe_send state\n");
-+ break;
-+#endif /* USE_REPLICATION */
- }
- }
-
-@@ -4002,6 +4181,89 @@
- return 0;
- }
-
-+#ifdef USE_REPLICATION
-+static int server_socket_replication(const int port) {
-+ int sfd;
-+ struct linger ling = {0, 0};
-+ struct addrinfo *ai;
-+ struct addrinfo *next;
-+ struct addrinfo hints;
-+ char port_buf[NI_MAXSERV];
-+ int error;
-+ int success = 0;
-+
-+ int flags =1;
-+
-+ memset(&hints, 0, sizeof (hints));
-+ hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
-+ hints.ai_family = AF_UNSPEC;
-+ hints.ai_protocol = IPPROTO_TCP;
-+ hints.ai_socktype = SOCK_STREAM;
-+ snprintf(port_buf, NI_MAXSERV, "%d", port);
-+ error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
-+ if (error != 0) {
-+ if (error != EAI_SYSTEM)
-+ fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
-+ else
-+ perror("getaddrinfo()");
-+
-+ return 1;
-+ }
-+
-+ for (next= ai; next; next= next->ai_next) {
-+ conn *rep_serv_add;
-+ if ((sfd = new_socket(next)) == -1) {
-+ freeaddrinfo(ai);
-+ return 1;
-+ }
-+ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
-+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
-+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
-+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
-+
-+ if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
-+ if (errno != EADDRINUSE) {
-+ perror("bind()");
-+ close(sfd);
-+ freeaddrinfo(ai);
-+ return 1;
-+ }
-+ close(sfd);
-+ continue;
-+ } else {
-+ success++;
-+ if (listen(sfd, 1024) == -1) {
-+ perror("listen()");
-+ close(sfd);
-+ freeaddrinfo(ai);
-+ return 1;
-+ }
-+ }
-+
-+ if (!(rep_serv_add = conn_new(sfd, conn_rep_listen,
-+ EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) {
-+ fprintf(stderr, "failed to create replication server connection\n");
-+ exit(EXIT_FAILURE);
-+ }
-+
-+ rep_serv_add->next = rep_serv;
-+ rep_serv = rep_serv_add;
-+ }
-+
-+ freeaddrinfo(ai);
-+
-+ /* Return zero iff we detected no errors in starting up connections */
-+ return success == 0;
-+}
-+
-+static void server_close_replication(void) {
-+ while(rep_serv){
-+ conn_close(rep_serv);
-+ rep_serv = rep_serv->next;
-+ }
-+}
-+#endif /* USE_REPLICATION */
-+
- /*
- * We keep the current time of day in a global variable that's updated by a
- * timer event. This saves us a bunch of time() system calls (we really only
-@@ -4041,6 +4303,9 @@
-
- static void usage(void) {
- printf(PACKAGE " " VERSION "\n");
-+#ifdef USE_REPLICATION
-+ printf("repcached %s\n",REPCACHED_VERSION);
-+#endif /* USE_REPLICATION */
- printf("-p <num> TCP port number to listen on (default: 11211)\n"
- "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
- "-s <file> UNIX socket path to listen on (disables network support)\n"
-@@ -4088,6 +4353,10 @@
- #ifdef ENABLE_SASL
- printf("-S Turn on Sasl authentication\n");
- #endif
-+#ifdef USE_REPLICATION
-+ printf("-x <ip_addr> hostname or IP address of peer repcached\n");
-+ printf("-X <num> TCP port number for replication (default: 11212)\n");
-+#endif /* USE_REPLICATION */
- return;
- }
-
-@@ -4194,6 +4463,26 @@
- exit(EXIT_SUCCESS);
- }
-
-+#ifdef USE_REPLICATION
-+static void sig_handler_cb(int fd, short event, void *arg)
-+{
-+ struct event *signal = arg;
-+
-+ if (settings.verbose)
-+ fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal));
-+
-+ if (replication_exit()) {
-+ exit(EXIT_FAILURE);
-+ }
-+
-+ pthread_mutex_lock(&replication_pipe_lock);
-+ if (!rep_send) {
-+ exit(EXIT_SUCCESS);
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+}
-+#endif /* USE_REPLICATION */
-+
- #ifndef HAVE_SIGIGNORE
- static int sigignore(int sig) {
- struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
-@@ -4249,6 +4538,57 @@
- #endif
- }
-
-+static void create_listening_sockets(void)
-+{
-+ /* create unix mode sockets after dropping privileges */
-+ if (settings.socketpath != NULL) {
-+ errno = 0;
-+ if (server_socket_unix(settings.socketpath,settings.access)) {
-+ vperror("failed to listen on UNIX socket: %s", settings.socketpath);
-+ exit(EX_OSERR);
-+ }
-+ }
-+
-+ /* create the listening socket, bind it, and init */
-+ if (settings.socketpath == NULL) {
-+ const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
-+ char temp_portnumber_filename[PATH_MAX];
-+ FILE *portnumber_file = NULL;
-+
-+ if (portnumber_filename != NULL) {
-+ snprintf(temp_portnumber_filename,
-+ sizeof(temp_portnumber_filename),
-+ "%s.lck", portnumber_filename);
-+
-+ portnumber_file = fopen(temp_portnumber_filename, "a");
-+ if (portnumber_file == NULL) {
-+ fprintf(stderr, "Failed to open \"%s\": %s\n",
-+ temp_portnumber_filename, strerror(errno));
-+ }
-+ }
-+
-+ errno = 0;
-+ if (settings.port && server_socket(settings.port, tcp_transport,
-+ portnumber_file)) {
-+ vperror("failed to listen on TCP port %d", settings.port);
-+ exit(EX_OSERR);
-+ }
-+
-+ /* create the UDP listening socket and bind it */
-+ errno = 0;
-+ if (settings.udpport && server_socket(settings.udpport, udp_transport,
-+ portnumber_file)) {
-+ vperror("failed to listen on UDP port %d", settings.udpport);
-+ exit(EX_OSERR);
-+ }
-+
-+ if (portnumber_file) {
-+ fclose(portnumber_file);
-+ rename(temp_portnumber_filename, portnumber_filename);
-+ }
-+ }
-+}
-+
- int main (int argc, char **argv) {
- int c;
- bool lock_memory = false;
-@@ -4261,6 +4601,11 @@
- struct rlimit rlim;
- char unit = '\0';
- int size_max = 0;
-+#ifdef USE_REPLICATION
-+ struct in_addr addr;
-+ struct addrinfo master_hint;
-+ struct addrinfo *master_addr;
-+#endif /* USE_REPLICATION */
- /* listening sockets */
- static int *l_socket = NULL;
-
-@@ -4307,6 +4652,11 @@
- "B:" /* Binding protocol */
- "I:" /* Max item size */
- "S" /* Sasl ON */
-+#ifdef USE_REPLICATION
-+ "X:" /* replication port */
-+ "x:" /* replication master */
-+ "q:" /* replication queue length */
-+#endif /* USE_REPLICATION */
- ))) {
- switch (c) {
- case 'a':
-@@ -4462,6 +4812,31 @@
- );
- }
- break;
-+#ifdef USE_REPLICATION
-+ case 'x':
-+ if (inet_pton(AF_INET, optarg, &addr) <= 0) {
-+ memset(&master_hint, 0, sizeof(master_hint));
-+ master_hint.ai_flags = 0;
-+ master_hint.ai_socktype = 0;
-+ master_hint.ai_protocol = 0;
-+ if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){
-+ settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr;
-+ freeaddrinfo(master_addr);
-+ }else{
-+ fprintf(stderr, "Illegal address: %s\n", optarg);
-+ return 1;
-+ }
-+ } else {
-+ settings.rep_addr = addr;
-+ }
-+ break;
-+ case 'X':
-+ settings.rep_port = atoi(optarg);
-+ break;
-+ case 'q':
-+ settings.rep_qmax = atoi(optarg);
-+ break;
-+#endif /* USE_REPLICATION */
- case 'S': /* set Sasl authentication to true. Default is false */
- #ifndef ENABLE_SASL
- fprintf(stderr, "This server is not built with SASL support.\n");
-@@ -4587,6 +4962,17 @@
- /* initialize main thread libevent instance */
- main_base = event_init();
-
-+#ifdef USE_REPLICATION
-+ /* register events for SIGINT and SIGTERM to handle them in main thread */
-+ struct event signal_int, signal_term;
-+ event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
-+ &signal_int);
-+ event_add(&signal_int, NULL);
-+ event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
-+ &signal_term);
-+ event_add(&signal_term, NULL);
-+#endif
-+
- /* initialize other stuff */
- stats_init();
- assoc_init();
-@@ -4615,63 +5001,21 @@
- /* initialise clock event */
- clock_handler(0, 0, 0);
-
-- /* create unix mode sockets after dropping privileges */
-- if (settings.socketpath != NULL) {
-- errno = 0;
-- if (server_socket_unix(settings.socketpath,settings.access)) {
-- vperror("failed to listen on UNIX socket: %s", settings.socketpath);
-- exit(EX_OSERR);
-- }
-- }
--
-- /* create the listening socket, bind it, and init */
-- if (settings.socketpath == NULL) {
-- int udp_port;
--
-- const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
-- char temp_portnumber_filename[PATH_MAX];
-- FILE *portnumber_file = NULL;
--
-- if (portnumber_filename != NULL) {
-- snprintf(temp_portnumber_filename,
-- sizeof(temp_portnumber_filename),
-- "%s.lck", portnumber_filename);
--
-- portnumber_file = fopen(temp_portnumber_filename, "a");
-- if (portnumber_file == NULL) {
-- fprintf(stderr, "Failed to open \"%s\": %s\n",
-- temp_portnumber_filename, strerror(errno));
-- }
-- }
--
-- errno = 0;
-- if (settings.port && server_socket(settings.port, tcp_transport,
-- portnumber_file)) {
-- vperror("failed to listen on TCP port %d", settings.port);
-- exit(EX_OSERR);
-- }
--
-- /*
-- * initialization order: first create the listening sockets
-- * (may need root on low ports), then drop root if needed,
-- * then daemonise if needed, then init libevent (in some cases
-- * descriptors created by libevent wouldn't survive forking).
-- */
-- udp_port = settings.udpport ? settings.udpport : settings.port;
--
-- /* create the UDP listening socket and bind it */
-- errno = 0;
-- if (settings.udpport && server_socket(settings.udpport, udp_transport,
-- portnumber_file)) {
-- vperror("failed to listen on UDP port %d", settings.udpport);
-- exit(EX_OSERR);
-- }
-+ /*
-+ * initialization order: first create the listening sockets
-+ * (may need root on low ports), then drop root if needed,
-+ * then daemonise if needed, then init libevent (in some cases
-+ * descriptors created by libevent wouldn't survive forking).
-+ */
-
-- if (portnumber_file) {
-- fclose(portnumber_file);
-- rename(temp_portnumber_filename, portnumber_filename);
-- }
-+#ifdef USE_REPLICATION
-+ if(replication_init() == -1){
-+ fprintf(stderr, "faild to replication init\n");
-+ exit(EXIT_FAILURE);
- }
-+#else
-+ create_listening_sockets();
-+#endif
-
- /* Drop privileges no longer needed */
- drop_privileges();
-@@ -4694,3 +5038,401 @@
-
- return EXIT_SUCCESS;
- }
-+
-+#ifdef USE_REPLICATION
-+static int replication_start(void)
-+{
-+ static int start = 0;
-+ if(start)
-+ return(0);
-+
-+ create_listening_sockets();
-+
-+ start = 1;
-+ return(0);
-+}
-+
-+static int replication_server_init(void)
-+{
-+ rep_recv = NULL;
-+ rep_send = NULL;
-+ rep_conn = NULL;
-+ if(server_socket_replication(settings.rep_port)){
-+ fprintf(stderr, "replication: failed to initialize replication server socket\n");
-+ return(-1);
-+ }
-+ if (settings.verbose > 0)
-+ fprintf(stderr, "replication: listen\n");
-+ return(replication_start());
-+}
-+
-+static int replication_client_init(void)
-+{
-+ int s;
-+ conn *c;
-+ struct addrinfo ai;
-+ struct sockaddr_in server;
-+
-+ rep_recv = NULL;
-+ rep_send = NULL;
-+ rep_conn = NULL;
-+
-+ memset(&ai,0,sizeof(ai));
-+ ai.ai_family = AF_INET;
-+ ai.ai_socktype = SOCK_STREAM;
-+ s = new_socket(&ai);
-+
-+ if(s == -1) {
-+ fprintf(stderr, "replication: failed to replication client socket\n");
-+ return(-1);
-+ }else{
-+ /* connect */
-+ memset((char *)&server, 0, sizeof(server));
-+ server.sin_family = AF_INET;
-+ server.sin_addr = settings.rep_addr;
-+ server.sin_port = htons(settings.rep_port);
-+ if (settings.verbose > 0)
-+ fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port);
-+ if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){
-+ c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
-+ if(c == NULL){
-+ fprintf(stderr, "replication: failed to create client conn");
-+ close(s);
-+ return(-1);
-+ }
-+ drive_machine(c);
-+ }else{
-+ if(errno == EINPROGRESS){
-+ c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
-+ if(c == NULL){
-+ fprintf(stderr, "replication: failed to create client conn");
-+ close(s);
-+ return(-1);
-+ }
-+ }else{
-+ fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port));
-+ close(s);
-+ return(-1);
-+ }
-+ }
-+ }
-+ return(0);
-+}
-+
-+static int replication_init(void)
-+{
-+ if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){
-+ if(replication_client_init() != -1){
-+ return(0);
-+ }
-+ }
-+ return(replication_server_init());
-+}
-+
-+static int replication_connect(void)
-+{
-+ int f;
-+ int p[2];
-+
-+ if(pipe(p) == -1){
-+ fprintf(stderr, "replication: can't create pipe\n");
-+ return(-1);
-+ }else{
-+ if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) {
-+ fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
-+ return(-1);
-+ }
-+ if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) {
-+ fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
-+ return(-1);
-+ }
-+ pthread_mutex_lock(&replication_pipe_lock);
-+ rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
-+ rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
-+ event_del(&rep_send->event);
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ }
-+ return(0);
-+}
-+
-+static int replication_close(void)
-+{
-+ int c;
-+ int r;
-+ Q_ITEM *q;
-+
-+ if(settings.verbose > 0)
-+ fprintf(stderr,"replication: close\n");
-+ if(rep_recv){
-+ rep_recv->rbytes = sizeof(q);
-+ rep_recv->rcurr = rep_recv->rbuf;
-+ c = 0;
-+ do{
-+ r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes);
-+ if(r == -1){
-+ break;
-+ }
-+ rep_recv->rbytes -= r;
-+ rep_recv->rcurr += r;
-+ if(!rep_recv->rbytes){
-+ memcpy(&q, rep_recv->rbuf, sizeof(q));
-+ rep_recv->rbytes = sizeof(q);
-+ rep_recv->rcurr = rep_recv->rbuf;
-+ qi_free(q);
-+ c++;
-+ }
-+ }while(r);
-+ conn_close(rep_recv);
-+ rep_recv = NULL;
-+ if (settings.verbose > 1) {
-+ fprintf(stderr, "replication: qitem free %d items\n", qi_free_list());
-+ fprintf(stderr, "replication: close recv %d items\n", c);
-+ }
-+ }
-+ pthread_mutex_lock(&replication_pipe_lock);
-+ if(rep_send){
-+ conn_close(rep_send);
-+ rep_send = NULL;
-+ if (settings.verbose > 1)
-+ fprintf(stderr,"replication: close send\n");
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ if(rep_conn){
-+ conn_close(rep_conn);
-+ rep_conn = NULL;
-+ if (settings.verbose > 1)
-+ fprintf(stderr,"replication: close conn\n");
-+ }
-+ if(!rep_exit)
-+ replication_server_init();
-+ return(0);
-+}
-+
-+static void replication_dispatch_close(void)
-+{
-+ if (settings.verbose > 1)
-+ fprintf(stderr, "replication: dispatch close\n");
-+ pthread_mutex_lock(&replication_pipe_lock);
-+ if (rep_send) {
-+ conn_close(rep_send);
-+ rep_send = NULL;
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+}
-+
-+static int replication_marugoto(int f)
-+{
-+ static int keysend = 0;
-+ static int keycount = 0;
-+ static char *keylist = NULL;
-+ static char *keyptr = NULL;
-+
-+ if(f){
-+ if(keylist){
-+ free(keylist);
-+ keylist = NULL;
-+ keyptr = NULL;
-+ keycount = 0;
-+ keysend = 0;
-+ }
-+ pthread_mutex_lock(&cache_lock);
-+ keylist = (char *)assoc_key_snap((int *)&keycount);
-+ pthread_mutex_unlock(&cache_lock);
-+ keyptr = keylist;
-+ if (!keyptr){
-+ replication_call_marugoto_end();
-+ }else{
-+ if (settings.verbose > 0)
-+ fprintf(stderr,"replication: marugoto start\n");
-+ }
-+ }else{
-+ if(keyptr){
-+ while(*keyptr){
-+ item *it = item_get(keyptr, strlen(keyptr));
-+ if(it){
-+ item_remove(it);
-+ if(replication_call_rep(keyptr, strlen(keyptr)) == -1){
-+ return(-1);
-+ }else{
-+ keysend++;
-+ keyptr += strlen(keyptr) + 1;
-+ return(0);
-+ }
-+ }
-+ keyptr += strlen(keyptr) + 1;
-+ }
-+ if(settings.verbose > 0)
-+ fprintf(stderr,"replication: marugoto %d\n", keysend);
-+ replication_call_marugoto_end();
-+ if(settings.verbose > 0)
-+ fprintf(stderr,"replication: marugoto owari\n");
-+ free(keylist);
-+ keylist = NULL;
-+ keyptr = NULL;
-+ keycount = 0;
-+ keysend = 0;
-+ }
-+ }
-+ return(0);
-+}
-+
-+static int replication_send(conn *c)
-+{
-+ while(c->wbytes){
-+ int w = write(c->sfd, c->wcurr, c->wbytes);
-+ if(w == -1){
-+ if(errno == EAGAIN || errno == EINTR){
-+ }else{
-+ fprintf(stderr,"replication: send error\n");
-+ replication_close();
-+ break;
-+ }
-+ }else{
-+ c->wbytes -= w;
-+ c->wcurr += w;
-+ }
-+ }
-+ return(c->wbytes);
-+}
-+
-+static int replication_pop(void)
-+{
-+ int r;
-+ int c;
-+ int m;
-+ Q_ITEM **q;
-+
-+ if(settings.verbose > 1)
-+ fprintf(stderr, "replication: pop\n");
-+
-+ if(!rep_recv)
-+ return(0);
-+
-+ r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize);
-+ if(r == -1){
-+ if(errno == EAGAIN || errno == EINTR){
-+ }else{
-+ fprintf(stderr,"replication: pop error %d\n", errno);
-+ return(-1);
-+ }
-+ }if(r == 0){
-+ /* other end closed, trigger replication_close() */
-+ return(-1);
-+ }else{
-+ c = r / sizeof(Q_ITEM *);
-+ m = r % sizeof(Q_ITEM *);
-+ q = (Q_ITEM **)(rep_recv->rbuf);
-+ while(c--){
-+ if(q[c]){
-+ if(rep_conn && replication_cmd(rep_conn, q[c])){
-+ replication_item(q[c]); /* error retry */
-+ }else{
-+ qi_free(q[c]);
-+ }
-+ }else{
-+ if(!rep_exit){
-+ if (settings.verbose)
-+ fprintf(stderr,"replication: cleanup start\n");
-+ rep_exit = 1;
-+ }
-+ }
-+ }
-+ }
-+ if(rep_exit){
-+ if(rep_conn->wbytes){
-+ /* retry */
-+ if(replication_exit()){
-+ replication_close();
-+ fprintf(stderr,"replication: cleanup error\n");
-+ exit(EXIT_FAILURE);
-+ }
-+ }else{
-+ /* finish */
-+ replication_close();
-+ if (settings.verbose)
-+ fprintf(stderr,"replication: cleanup complete\n");
-+ exit(EXIT_SUCCESS);
-+ }
-+ }
-+ replication_marugoto(0);
-+ return(0);
-+}
-+
-+static int replication_push(void)
-+{
-+ int w;
-+
-+ while(rep_send->wbytes){
-+ w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes);
-+ if(w == -1){
-+ if(errno == EAGAIN || errno == EINTR){
-+ fprintf(stderr,"replication: push EAGAIN or EINTR\n");
-+ }else{
-+ return(-1);
-+ }
-+ }else{
-+ rep_send->wbytes -= w;
-+ rep_send->wcurr += w;
-+ }
-+ }
-+ rep_send->wcurr = rep_send->wbuf;
-+ return(0);
-+}
-+
-+static int replication_exit(void)
-+{
-+ return(replication_item(NULL));
-+}
-+
-+static int replication_item(Q_ITEM *q)
-+{
-+ pthread_mutex_lock(&replication_pipe_lock);
-+ if (!rep_send) {
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ return 0;
-+ }
-+ if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){
-+ fprintf(stderr,"replication: buffer over fllow\n");
-+ if(q){
-+ qi_free(q);
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ replication_dispatch_close();
-+ return(-1);
-+ }
-+ memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q));
-+ rep_send->wbytes += sizeof(q);
-+ if(replication_push()){
-+ fprintf(stderr, "replication: push error\n");
-+ if(q){
-+ qi_free(q);
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ replication_dispatch_close();
-+ return(-1);
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ return(0);
-+}
-+
-+int replication(enum CMD_TYPE type, R_CMD *cmd)
-+{
-+ Q_ITEM *q;
-+
-+ pthread_mutex_lock(&replication_pipe_lock);
-+ if (!rep_send) {
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+ return 0;
-+ }
-+ pthread_mutex_unlock(&replication_pipe_lock);
-+
-+ if((q = qi_new(type, cmd, false))) {
-+ replication_item(q);
-+ }else{
-+ fprintf(stderr,"replication: can't create Q_ITEM\n");
-+ replication_dispatch_close();
-+ return(-1);
-+ }
-+ return(0);
-+}
-+#endif /* USE_REPLICATION */
---- memcached-1.4.5/memcached.h~ 2010-05-06 14:09:51.000000000 +0300
-+++ memcached-1.4.5/memcached.h 2010-05-06 14:10:13.518051741 +0300
-@@ -144,7 +144,13 @@
- conn_swallow, /**< swallowing unnecessary bytes w/o storing */
- conn_closing, /**< closing this connection */
- conn_mwrite, /**< writing out many items sequentially */
-- conn_max_state /**< Max state value (used for assertion) */
-+#ifdef USE_REPLICATION
-+ conn_repconnect, /**< replication connecting to master */
-+ conn_rep_listen, /**< replication listening socket */
-+ conn_pipe_recv, /**< replication command pipe recv */
-+ conn_pipe_send, /**< replication command pipe send */
-+#endif /* USE_REPLICATION */
-+ conn_max_state, /**< Max state value (used for assertion) */
- };
-
- enum bin_substates {
-@@ -247,7 +247,9 @@
- uint64_t get_misses;
- uint64_t evictions;
- uint64_t reclaimed;
-+#if 0
- time_t started; /* when the process was started */
-+#endif
- bool accepting_conns; /* whether we are currently accepting */
- uint64_t listen_disabled_num;
- };
-@@ -274,6 +282,11 @@
- int backlog;
- int item_size_max; /* Maximum item size, and upper end for slabs */
- bool sasl; /* SASL on/off */
-+#ifdef USE_REPLICATION
-+ struct in_addr rep_addr; /* replication addr */
-+ int rep_port; /* replication port */
-+ int rep_qmax; /* replication QITEM max */
-+#endif /*USE_REPLICATION*/
- };
-
- extern struct stats stats;
-@@ -286,6 +299,10 @@
- /* temp */
- #define ITEM_SLABBED 4
-
-+#ifdef USE_REPLICATION
-+#define ITEM_REPDATA 128
-+#endif /*USE_REPLICATION*/
-+
- /**
- * Structure for storing items within memcached.
- */
-@@ -438,6 +455,10 @@
- #include "trace.h"
- #include "hash.h"
- #include "util.h"
-+
-+#ifdef USE_REPLICATION
-+#include "replication.h"
-+#endif /* USE_REPLICATION */
-
- /*
- * Functions such as the libevent-related calls that need to do cross-thread
---- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970
-+++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010
-@@ -0,0 +1,355 @@
-+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
-+/*
-+ *
-+ */
-+#include "memcached.h"
-+#include "replication.h"
-+#include <stdlib.h>
-+#include <stdio.h>
-+#include <unistd.h>
-+#include <string.h>
-+#include <errno.h>
-+
-+static Q_ITEM *q_freelist = NULL;
-+static int q_itemcount = 0;
-+static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER;
-+
-+int get_qi_count(void)
-+{
-+ int c;
-+ pthread_mutex_lock(&replication_queue_lock);
-+ c = q_itemcount;
-+ pthread_mutex_unlock(&replication_queue_lock);
-+ return(c);
-+}
-+
-+Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse)
-+{
-+ Q_ITEM *q = NULL;
-+ char *key = NULL;
-+ uint32_t keylen = 0;
-+ rel_time_t time = 0;
-+
-+ pthread_mutex_lock(&replication_queue_lock);
-+ if(q_freelist){
-+ q = q_freelist;
-+ q_freelist = q->next;
-+ }
-+
-+ if(NULL == q){
-+ if(reuse) {
-+ pthread_mutex_unlock(&replication_queue_lock);
-+ return(NULL);
-+ }
-+ if(q_itemcount >= settings.rep_qmax) {
-+ pthread_mutex_unlock(&replication_queue_lock);
-+ return(NULL);
-+ }
-+ q = malloc(sizeof(Q_ITEM));
-+ if (NULL == q){
-+ fprintf(stderr,"replication: qi_new out of memory\n");
-+ pthread_mutex_unlock(&replication_queue_lock);
-+ return(NULL);
-+ }
-+ q_itemcount++;
-+ if (settings.verbose > 2)
-+ fprintf(stderr,"replication: alloc c=%d\n", q_itemcount);
-+ }
-+
-+ pthread_mutex_unlock(&replication_queue_lock);
-+
-+ switch (type) {
-+ case REPLICATION_REP:
-+ case REPLICATION_DEL:
-+ key = cmd->key;
-+ keylen = cmd->keylen;
-+ break;
-+ case REPLICATION_FLUSH_ALL:
-+ break;
-+ case REPLICATION_DEFER_FLUSH_ALL:
-+ time = cmd->time;
-+ break;
-+ case REPLICATION_MARUGOTO_END:
-+ break;
-+ default:
-+ fprintf(stderr,"replication: got unknown command: %d\n", type);
-+ return(NULL);
-+ }
-+
-+ q->key = NULL;
-+ q->type = type;
-+ q->time = time;
-+ q->next = NULL;
-+ if (keylen) {
-+ q->key = malloc(keylen + 1);
-+ if(NULL == q->key){
-+ qi_free(q);
-+ q = NULL;
-+ }else{
-+ memcpy(q->key, key, keylen);
-+ *(q->key + keylen) = 0;
-+ }
-+ }
-+
-+ return(q);
-+}
-+
-+void qi_free(Q_ITEM *q)
-+{
-+ if(q){
-+ if(q->key){
-+ free(q->key);
-+ q->key = NULL;
-+ }
-+ pthread_mutex_lock(&replication_queue_lock);
-+ q->next = q_freelist;
-+ q_freelist = q;
-+ pthread_mutex_unlock(&replication_queue_lock);
-+ }
-+}
-+
-+int qi_free_list()
-+{
-+ int c = 0;
-+ Q_ITEM *q = NULL;
-+
-+ pthread_mutex_lock(&replication_queue_lock);
-+ while((q = q_freelist)){
-+ q_itemcount--;
-+ c++;
-+ q_freelist = q->next;
-+ free(q);
-+ }
-+ pthread_mutex_unlock(&replication_queue_lock);
-+ return(c);
-+}
-+
-+static int replication_get_num(char *p, int n)
-+{
-+ int l;
-+ char b[64];
-+ if(p)
-+ l = sprintf(p, "%u", n);
-+ else
-+ l = sprintf(b, "%u", n);
-+ return(l);
-+}
-+
-+int replication_call_rep(char *key, size_t keylen)
-+{
-+ R_CMD r;
-+ r.key = key;
-+ r.keylen = keylen;
-+ return(replication(REPLICATION_REP, &r));
-+}
-+
-+int replication_call_del(char *key, size_t keylen)
-+{
-+ R_CMD r;
-+ r.key = key;
-+ r.keylen = keylen;
-+ return(replication(REPLICATION_DEL, &r));
-+}
-+
-+int replication_call_flush_all()
-+{
-+ R_CMD r;
-+ r.key = NULL;
-+ return(replication(REPLICATION_FLUSH_ALL, &r));
-+}
-+
-+int replication_call_defer_flush_all(const rel_time_t time)
-+{
-+ R_CMD r;
-+ r.key = NULL;
-+ r.time = time;
-+ return(replication(REPLICATION_DEFER_FLUSH_ALL, &r));
-+}
-+
-+int replication_call_marugoto_end()
-+{
-+ R_CMD r;
-+ r.key = NULL;
-+ return(replication(REPLICATION_MARUGOTO_END, &r));
-+}
-+
-+static int replication_alloc(conn *c, int s)
-+{
-+ char *p;
-+ s += c->wbytes;
-+ if(c->wsize < s){
-+ while(c->wsize < s)
-+ c->wsize += 4096;
-+ if((p = malloc(c->wsize))){
-+ memcpy(p, c->wbuf, c->wbytes);
-+ free(c->wbuf);
-+ c->wbuf = p;
-+ }else{
-+ return(-1);
-+ }
-+ }
-+ return(0);
-+}
-+
-+static int replication_del(conn *c, char *k)
-+{
-+ int l = 0;
-+ char *s = "delete ";
-+ char *n = "\r\n";
-+ char *p = NULL;
-+
-+ l += strlen(s);
-+ l += strlen(k);
-+ l += strlen(n);
-+ if(replication_alloc(c,l) == -1){
-+ fprintf(stderr, "replication: del malloc error\n");
-+ return(-1);
-+ }
-+ p = c->wbuf + c->wbytes;
-+ memcpy(p, s, strlen(s));
-+ p += strlen(s);
-+ memcpy(p, k, strlen(k));
-+ p += strlen(k);
-+ memcpy(p, n, strlen(n));
-+ p += strlen(n);
-+ c->wbytes = p - c->wbuf;
-+ c->wcurr = c->wbuf;
-+ return(0);
-+}
-+
-+static int replication_rep(conn *c, item *it)
-+{
-+ int exp = 0;
-+ int len = 0;
-+ char *s = "rep ";
-+ char *n = "\r\n";
-+ char *p = NULL;
-+ char flag[40];
-+
-+ if(it->exptime)
-+ exp = it->exptime + process_started;
-+ flag[0]=0;
-+ if((p=ITEM_suffix(it))){
-+ int i;
-+ memcpy(flag, p, it->nsuffix - 2);
-+ flag[it->nsuffix - 2] = 0;
-+ for(i=0;i<strlen(flag);i++){
-+ if(flag[i] > ' ')
-+ break;
-+ }
-+ memmove(flag,&flag[i],strlen(flag)-i);
-+ for(p=flag;*p>' ';p++);
-+ *p=0;
-+ }
-+ len += strlen(s);
-+ len += it->nkey;
-+ len += 1;
-+ len += strlen(flag);
-+ len += 1;
-+ len += replication_get_num(NULL, exp);
-+ len += 1;
-+ len += replication_get_num(NULL, it->nbytes - 2);
-+ len += 1;
-+ len += replication_get_num(NULL, ITEM_get_cas(it));
-+ len += strlen(n);
-+ len += it->nbytes;
-+ len += strlen(n);
-+ if(replication_alloc(c,len) == -1){
-+ fprintf(stderr, "replication: rep malloc error\n");
-+ return(-1);
-+ }
-+ p = c->wbuf + c->wbytes;
-+ memcpy(p, s, strlen(s));
-+ p += strlen(s);
-+ memcpy(p, ITEM_key(it), it->nkey);
-+ p += it->nkey;
-+ *(p++) = ' ';
-+ memcpy(p, flag, strlen(flag));
-+ p += strlen(flag);
-+ *(p++) = ' ';
-+ p += replication_get_num(p, exp);
-+ *(p++) = ' ';
-+ p += replication_get_num(p, it->nbytes - 2);
-+ *(p++) = ' ';
-+ p += replication_get_num(p, ITEM_get_cas(it));
-+ memcpy(p, n, strlen(n));
-+ p += strlen(n);
-+ memcpy(p, ITEM_data(it), it->nbytes);
-+ p += it->nbytes;
-+ c->wbytes = p - c->wbuf;
-+ c->wcurr = c->wbuf;
-+ return(0);
-+}
-+
-+static int replication_flush_all(conn *c, rel_time_t exp)
-+{
-+ char *s = "flush_all ";
-+ char *n = "\r\n";
-+ char *p = NULL;
-+
-+ int l = strlen(s) + strlen(n);
-+ if (exp > 0)
-+ l += replication_get_num(NULL, exp);
-+ if(replication_alloc(c,l) == -1){
-+ fprintf(stderr, "replication: flush_all malloc error\n");
-+ return(-1);
-+ }
-+ p = c->wbuf + c->wbytes;
-+ memcpy(p, s, strlen(s));
-+ p += strlen(s);
-+ if (exp > 0)
-+ p += replication_get_num(p, exp);
-+ memcpy(p, n, strlen(n));
-+ p += strlen(n);
-+ c->wbytes = p - c->wbuf;
-+ c->wcurr = c->wbuf;
-+ return(0);
-+}
-+
-+static int replication_marugoto_end(conn *c)
-+{
-+ char *s = "marugoto_end";
-+ char *n = "\r\n";
-+ char *p = NULL;
-+
-+ int l = strlen(s) + strlen(n);
-+ if(replication_alloc(c,l) == -1){
-+ fprintf(stderr, "replication: marugoto_end malloc error\n");
-+ return(-1);
-+ }
-+ p = c->wbuf + c->wbytes;
-+ memcpy(p, s, strlen(s));
-+ p += strlen(s);
-+ memcpy(p, n, strlen(n));
-+ p += strlen(n);
-+ c->wbytes = p - c->wbuf;
-+ c->wcurr = c->wbuf;
-+ return(0);
-+}
-+
-+int replication_cmd(conn *c, Q_ITEM *q)
-+{
-+ item *it;
-+ int r;
-+
-+ switch (q->type) {
-+ case REPLICATION_REP:
-+ it = item_get(q->key, strlen(q->key));
-+ if (!it)
-+ return(replication_del(c, q->key));
-+ r = replication_rep(c, it);
-+ item_remove(it);
-+ return r;
-+ case REPLICATION_DEL:
-+ return(replication_del(c, q->key));
-+ case REPLICATION_FLUSH_ALL:
-+ return(replication_flush_all(c, 0));
-+ case REPLICATION_DEFER_FLUSH_ALL:
-+ return(replication_flush_all(c, q->time));
-+ case REPLICATION_MARUGOTO_END:
-+ return(replication_marugoto_end(c));
-+ default:
-+ fprintf(stderr,"replication: got unknown command:%d\n", q->type);
-+ return(0);
-+ }
-+}
---- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970
-+++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010
-@@ -0,0 +1,42 @@
-+#ifndef MEMCACHED_REPLICATION_H
-+#define MEMCACHED_REPLICATION_H
-+#define REPCACHED_VERSION "2.2"
-+#include <netdb.h>
-+
-+enum CMD_TYPE {
-+ REPLICATION_REP,
-+ REPLICATION_DEL,
-+ REPLICATION_FLUSH_ALL,
-+ REPLICATION_DEFER_FLUSH_ALL,
-+ REPLICATION_MARUGOTO_END,
-+};
-+
-+typedef struct queue_item_t Q_ITEM;
-+struct queue_item_t {
-+ enum CMD_TYPE type;
-+ char *key;
-+ rel_time_t time;
-+ Q_ITEM *next;
-+};
-+
-+typedef struct replication_cmd_t R_CMD;
-+struct replication_cmd_t {
-+ char *key;
-+ int keylen;
-+ rel_time_t time;
-+};
-+
-+Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool);
-+void qi_free(Q_ITEM *);
-+int qi_free_list(void);
-+int replication_cmd(conn *, Q_ITEM *);
-+int get_qi_count(void);
-+
-+int replication_call_rep(char *key, size_t keylen);
-+int replication_call_del(char *key, size_t keylen);
-+int replication_call_flush_all(void);
-+int replication_call_defer_flush_all(const rel_time_t time);
-+int replication_call_marugoto_end(void);
-+int replication(enum CMD_TYPE type, R_CMD *cmd);
-+
-+#endif
---- memcached-1.4.5/t/binary.t~ 2010-04-03 10:07:16.000000000 +0300
-+++ memcached-1.4.5/t/binary.t 2010-05-06 14:13:25.718440750 +0300
-@@ -2,11 +2,13 @@
-
- use strict;
- use warnings;
--use Test::More tests => 3361;
-+use Test::More;
- use FindBin qw($Bin);
- use lib "$Bin/lib";
- use MemcachedTest;
-
-+Test::More::plan(tests => 3361 + (support_replication() ? 36 : 0));
-+
- my $server = new_memcached();
- ok($server, "started the server");
-
---- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009
-+++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010
-@@ -41,6 +41,10 @@
- my $exe = "$builddir/memcached-debug";
- croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe;
-
-+ if (support_replication()) {
-+ $args .= ' -X 0';
-+ }
-+
- my $childpid = fork();
-
- my $cmd = "$builddir/timedrun 10 $exe $args";
---- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009
-+++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010
-@@ -13,7 +13,8 @@
-
-
- @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats
-- supports_sasl free_port);
-+ supports_sasl free_port support_replication memcached_version
-+ version2num);
-
- sub sleep {
- my $n = shift;
-@@ -148,6 +149,23 @@
- return 0;
- }
-
-+sub support_replication {
-+ my $output = `$builddir/memcached-debug -h`;
-+ return 1 if $output =~ /^-x <ip_addr>/m;
-+ return 0;
-+}
-+
-+sub memcached_version {
-+ my $output = `$builddir/memcached-debug -h`;
-+ return $1 if $output =~ /^memcached (\d[\d\.]+)/;
-+ return 0;
-+}
-+
-+sub version2num {
-+ my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/);
-+ return $major*100**2 + $minor*100 + $pl
-+}
-+
- sub new_memcached {
- my ($args, $passed_port) = @_;
- my $port = $passed_port || free_port();
-@@ -171,6 +189,9 @@
- }
- if ($< == 0) {
- $args .= " -u root";
-+ }
-+ if (support_replication() && $args !~ m/-X/) {
-+ $args .= ' -X 0';
- }
-
- my $childpid = fork();
---- memcached-1.4.5/t/stats.t~ 2010-04-03 10:07:16.000000000 +0300
-+++ memcached-1.4.5/t/stats.t 2010-05-06 14:15:28.521352735 +0300
-@@ -57,7 +57,8 @@
- my $stats = mem_stats($sock);
-
- # Test number of keys
--is(scalar(keys(%$stats)), 38, "38 stats values");
-+my $keys = 38 + (support_replication() ? 3 : 0);
-+is(scalar(keys(%$stats)), $keys, "$keys stats values");
-
- # Test initial state
- foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
---- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009
-+++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010
-@@ -300,6 +300,10 @@
- argv[arg++] = "-1";
- argv[arg++] = "-U";
- argv[arg++] = "0";
-+#ifdef USE_REPLICATION
-+ argv[arg++] = "-X";
-+ argv[arg++] = "0";
-+#endif
- /* Handle rpmbuild and the like doing this as root */
- if (getuid() == 0) {
- argv[arg++] = "-u";
================================================================
---- gitweb:
http://git.pld-linux.org/gitweb.cgi/packages/memcached.git/commitdiff/dacfa27336a56b0388e2779d16efed2b4ef3db22
More information about the pld-cvs-commit
mailing list