trAvis - MANAGER
Edit File: binary_handler.c
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: * * Libmemcached library * * Copyright (C) 2011 Data Differential, http://datadifferential.com/ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * * The names of its contributors may not be used to endorse or * promote products derived from this software without specific prior * written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include <libmemcachedprotocol/common.h> #include <stdlib.h> #include <sys/types.h> #include <errno.h> #include <stdbool.h> #include <string.h> #include <stdio.h> /* ** ********************************************************************** ** INTERNAL INTERFACE ** ********************************************************************** */ /** * Send a preformatted packet back to the client. If the connection is in * pedantic mode, it will validate the packet and refuse to send it if it * breaks the specification. * * @param cookie client identification * @param request the original request packet * @param response the packet to send * @return The status of the operation */ static protocol_binary_response_status raw_response_handler(const void *cookie, protocol_binary_request_header *request, protocol_binary_response_header *response) { memcached_protocol_client_st *client= (void*)cookie; if (client->root->pedantic && !memcached_binary_protocol_pedantic_check_response(request, response)) { return PROTOCOL_BINARY_RESPONSE_EINVAL; } if (client->root->drain(client) == false) { return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } size_t len= sizeof(protocol_binary_response_header) + htonl(response->response.bodylen); size_t offset= 0; char *ptr= (void*)response; if (client->output == NULL) { /* I can write directly to the socket.... */ do { size_t num_bytes= len - offset; ssize_t nw= client->root->send(client, client->sock, ptr + offset, num_bytes); if (nw == -1) { if (get_socket_errno() == EWOULDBLOCK) { break; } else if (get_socket_errno() != EINTR) { client->error= errno; return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } } else { offset += (size_t)nw; } } while (offset < len); } return client->root->spool(client, ptr, len - offset); } static void print_cmd(protocol_binary_command cmd) { switch (cmd) { case PROTOCOL_BINARY_CMD_GET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_ADD: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_ADD\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_REPLACE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_REPLACE\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_DELETE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DELETE\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_INCREMENT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_INCREMENT\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_DECREMENT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DECREMENT\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_QUIT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_QUIT\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_FLUSH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_FLUSH\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GETQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_NOOP: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_NOOP\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_VERSION: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_VERSION\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GETK: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETK\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GETKQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GETKQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_APPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_APPEND\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_PREPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_PREPEND\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_STAT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_STAT\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SETQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SETQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_ADDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_ADDQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_REPLACEQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_REPLACEQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_DELETEQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DELETEQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_INCREMENTQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_INCREMENTQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_DECREMENTQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DECREMENTQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_QUITQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_QUITQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_FLUSHQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_FLUSHQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_APPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_APPENDQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_PREPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_PREPENDQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_VERBOSITY: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_VERBOSITY\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TOUCH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TOUCH\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GAT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GAT\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GATQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_LIST_MECHS\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SASL_AUTH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_AUTH\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SASL_STEP: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SASL_STEP\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RGET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RGET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RSET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RSET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RSETQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RSETQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RAPPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RAPPEND\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RAPPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RAPPENDQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RPREPEND: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RPREPEND\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RPREPENDQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RPREPENDQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RDELETE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDELETE\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RDELETEQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDELETEQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RINCR: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RINCR\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RINCRQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RINCRQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RDECR: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDECR\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_RDECRQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_RDECRQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SET_VBUCKET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SET_VBUCKET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GET_VBUCKET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GET_VBUCKET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_DEL_VBUCKET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_DEL_VBUCKET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_CONNECT: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CONNECT\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_MUTATION: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_MUTATION\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_DELETE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_DELETE\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_FLUSH: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_FLUSH\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_OPAQUE: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_OPAQUE\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_LAST_RESERVED: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_LAST_RESERVED\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GATK: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATK\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_GATKQ: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_GATKQ\n", __FILE__, __LINE__); return; case PROTOCOL_BINARY_CMD_SCRUB: fprintf(stderr, "%s:%d PROTOCOL_BINARY_CMD_SCRUB\n", __FILE__, __LINE__); return; default: abort(); } } /* * Version 0 of the interface is really low level and protocol specific, * while the version 1 of the interface is more API focused. We need a * way to translate between the command codes on the wire and the * application level interface in V1, so let's just use the V0 of the * interface as a map instead of creating a huuuge switch :-) */ /** * Callback for the GET/GETQ/GETK and GETKQ responses * @param cookie client identifier * @param key the key for the item * @param keylen the length of the key * @param body the length of the body * @param bodylen the length of the body * @param flags the flags for the item * @param cas the CAS id for the item */ static protocol_binary_response_status get_response_handler(const void *cookie, const void *key, uint16_t keylen, const void *body, uint32_t bodylen, uint32_t flags, uint64_t cas) { memcached_protocol_client_st *client= (void*)cookie; uint8_t opcode= client->current_command->request.opcode; if (opcode == PROTOCOL_BINARY_CMD_GET || opcode == PROTOCOL_BINARY_CMD_GETQ) { keylen= 0; } protocol_binary_response_get response= { .message.header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= opcode, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= client->current_command->request.opaque, .cas= memcached_htonll(cas), .keylen= htons(keylen), .extlen= 4, .bodylen= htonl(bodylen + keylen + 4), }, }; response.message.body.flags= htonl(flags); protocol_binary_response_status rval; const protocol_binary_response_status success= PROTOCOL_BINARY_RESPONSE_SUCCESS; if ((rval= client->root->spool(client, response.bytes, sizeof(response.bytes))) != success || (rval= client->root->spool(client, key, keylen)) != success || (rval= client->root->spool(client, body, bodylen)) != success) { return rval; } return PROTOCOL_BINARY_RESPONSE_SUCCESS; } /** * Callback for the STAT responses * @param cookie client identifier * @param key the key for the item * @param keylen the length of the key * @param body the length of the body * @param bodylen the length of the body */ static protocol_binary_response_status stat_response_handler(const void *cookie, const void *key, uint16_t keylen, const void *body, uint32_t bodylen) { memcached_protocol_client_st *client= (void*)cookie; protocol_binary_response_no_extras response= { .message.header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= client->current_command->request.opcode, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= client->current_command->request.opaque, .keylen= htons(keylen), .bodylen= htonl(bodylen + keylen), .cas= 0 }, }; protocol_binary_response_status rval; const protocol_binary_response_status success= PROTOCOL_BINARY_RESPONSE_SUCCESS; if ((rval= client->root->spool(client, response.bytes, sizeof(response.bytes))) != success || (rval= client->root->spool(client, key, keylen)) != success || (rval= client->root->spool(client, body, bodylen)) != success) { return rval; } return PROTOCOL_BINARY_RESPONSE_SUCCESS; } /** * Callback for the VERSION responses * @param cookie client identifier * @param text the length of the body * @param textlen the length of the body */ static protocol_binary_response_status version_response_handler(const void *cookie, const void *text, uint32_t textlen) { memcached_protocol_client_st *client= (void*)cookie; protocol_binary_response_no_extras response= { .message.header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= client->current_command->request.opcode, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= client->current_command->request.opaque, .bodylen= htonl(textlen), .cas= 0 }, }; protocol_binary_response_status rval; const protocol_binary_response_status success= PROTOCOL_BINARY_RESPONSE_SUCCESS; if ((rval= client->root->spool(client, response.bytes, sizeof(response.bytes))) != success || (rval= client->root->spool(client, text, textlen)) != success) { return rval; } return PROTOCOL_BINARY_RESPONSE_SUCCESS; } /** * Callback for ADD and ADDQ * @param cookie the calling client * @param header the add/addq command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status add_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.add != NULL) { uint16_t keylen= ntohs(header->request.keylen); uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; protocol_binary_request_add *request= (void*)header; uint32_t flags= ntohl(request->message.body.flags); uint32_t timeout= ntohl(request->message.body.expiration); char *key= ((char*)header) + sizeof(*header) + 8; char *data= key + keylen; uint64_t cas; rval= client->root->callback->interface.v1.add(cookie, key, keylen, data, datalen, flags, timeout, &cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_ADD) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_ADD, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(cas) } } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for DECREMENT and DECREMENTQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status decrement_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.decrement != NULL) { uint16_t keylen= ntohs(header->request.keylen); protocol_binary_request_decr *request= (void*)header; uint64_t init= memcached_ntohll(request->message.body.initial); uint64_t delta= memcached_ntohll(request->message.body.delta); uint32_t timeout= ntohl(request->message.body.expiration); void *key= request->bytes + sizeof(request->bytes); uint64_t result; uint64_t cas; rval= client->root->callback->interface.v1.decrement(cookie, key, keylen, delta, init, timeout, &result, &cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_DECREMENT) { /* Send a positive request */ protocol_binary_response_decr response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_DECREMENT, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(cas), .bodylen= htonl(8) }, .body.value= memcached_htonll(result) } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for DELETE and DELETEQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status delete_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.delete_object != NULL) { uint16_t keylen= ntohs(header->request.keylen); void *key= (header +1); uint64_t cas= memcached_ntohll(header->request.cas); rval= client->root->callback->interface.v1.delete_object(cookie, key, keylen, cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_DELETE) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_DELETE, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, } } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for FLUSH and FLUSHQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status flush_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.flush_object != NULL) { protocol_binary_request_flush *flush_object= (void*)header; uint32_t timeout= 0; if (htonl(header->request.bodylen) == 4) { timeout= ntohl(flush_object->message.body.expiration); } rval= client->root->callback->interface.v1.flush_object(cookie, timeout); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_FLUSH) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_FLUSH, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, } } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for GET, GETK, GETQ, GETKQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status get_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.get != NULL) { uint16_t keylen= ntohs(header->request.keylen); void *key= (header + 1); rval= client->root->callback->interface.v1.get(cookie, key, keylen, get_response_handler); if (rval == PROTOCOL_BINARY_RESPONSE_KEY_ENOENT && (header->request.opcode == PROTOCOL_BINARY_CMD_GETQ || header->request.opcode == PROTOCOL_BINARY_CMD_GETKQ)) { /* Quiet commands shouldn't respond on cache misses */ rval= PROTOCOL_BINARY_RESPONSE_SUCCESS; } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for INCREMENT and INCREMENTQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status increment_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.increment != NULL) { uint16_t keylen= ntohs(header->request.keylen); protocol_binary_request_incr *request= (void*)header; uint64_t init= memcached_ntohll(request->message.body.initial); uint64_t delta= memcached_ntohll(request->message.body.delta); uint32_t timeout= ntohl(request->message.body.expiration); void *key= request->bytes + sizeof(request->bytes); uint64_t cas; uint64_t result; rval= client->root->callback->interface.v1.increment(cookie, key, keylen, delta, init, timeout, &result, &cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_INCREMENT) { /* Send a positive request */ protocol_binary_response_incr response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_INCREMENT, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(cas), .bodylen= htonl(8) }, .body.value= memcached_htonll(result) } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for noop. Inform the v1 interface about the noop packet, and * create and send a packet back to the client * * @param cookie the calling client * @param header the command * @param response_handler the response handler * @return the result of the operation */ static protocol_binary_response_status noop_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.noop != NULL) { client->root->callback->interface.v1.noop(cookie); } protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_NOOP, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, } } }; return response_handler(cookie, header, (void*)&response); } /** * Callback for APPEND and APPENDQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status append_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.append != NULL) { uint16_t keylen= ntohs(header->request.keylen); uint32_t datalen= ntohl(header->request.bodylen) - keylen; char *key= (void*)(header +1); char *data= key +keylen; uint64_t cas= memcached_ntohll(header->request.cas); uint64_t result_cas; rval= client->root->callback->interface.v1.append(cookie, key, keylen, data, datalen, cas, &result_cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_APPEND) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_APPEND, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(result_cas), }, } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for PREPEND and PREPENDQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status prepend_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.prepend != NULL) { uint16_t keylen= ntohs(header->request.keylen); uint32_t datalen= ntohl(header->request.bodylen) - keylen; char *key= (char*)(header + 1); char *data= key + keylen; uint64_t cas= memcached_ntohll(header->request.cas); uint64_t result_cas; rval= client->root->callback->interface.v1.prepend(cookie, key, keylen, data, datalen, cas, &result_cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_PREPEND) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_PREPEND, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(result_cas), }, } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for QUIT and QUITQ. Notify the client and shut down the connection * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status quit_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.quit != NULL) { client->root->callback->interface.v1.quit(cookie); } protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_QUIT, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque } } }; if (header->request.opcode == PROTOCOL_BINARY_CMD_QUIT) { response_handler(cookie, header, (void*)&response); } /* I need a better way to signal to close the connection */ return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } /** * Callback for REPLACE and REPLACEQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status replace_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.replace != NULL) { uint16_t keylen= ntohs(header->request.keylen); uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; protocol_binary_request_replace *request= (void*)header; uint32_t flags= ntohl(request->message.body.flags); uint32_t timeout= ntohl(request->message.body.expiration); char *key= ((char*)header) + sizeof(*header) + 8; char *data= key + keylen; uint64_t cas= memcached_ntohll(header->request.cas); uint64_t result_cas; rval= client->root->callback->interface.v1.replace(cookie, key, keylen, data, datalen, flags, timeout, cas, &result_cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_REPLACE) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_REPLACE, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(result_cas), }, } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for SET and SETQ * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status set_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.set != NULL) { uint16_t keylen= ntohs(header->request.keylen); uint32_t datalen= ntohl(header->request.bodylen) - keylen - 8; protocol_binary_request_replace *request= (void*)header; uint32_t flags= ntohl(request->message.body.flags); uint32_t timeout= ntohl(request->message.body.expiration); char *key= ((char*)header) + sizeof(*header) + 8; char *data= key + keylen; uint64_t cas= memcached_ntohll(header->request.cas); uint64_t result_cas; rval= client->root->callback->interface.v1.set(cookie, key, keylen, data, datalen, flags, timeout, cas, &result_cas); if (rval == PROTOCOL_BINARY_RESPONSE_SUCCESS && header->request.opcode == PROTOCOL_BINARY_CMD_SET) { /* Send a positive request */ protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= PROTOCOL_BINARY_CMD_SET, .status= htons(PROTOCOL_BINARY_RESPONSE_SUCCESS), .opaque= header->request.opaque, .cas= memcached_ntohll(result_cas), }, } }; rval= response_handler(cookie, header, (void*)&response); } } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for STAT * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status stat_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.stat != NULL) { uint16_t keylen= ntohs(header->request.keylen); rval= client->root->callback->interface.v1.stat(cookie, (void*)(header + 1), keylen, stat_response_handler); } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * Callback for VERSION * @param cookie the calling client * @param header the command * @param response_handler not used * @return the result of the operation */ static protocol_binary_response_status version_command_handler(const void *cookie, protocol_binary_request_header *header, memcached_binary_protocol_raw_response_handler response_handler) { (void)response_handler; (void)header; protocol_binary_response_status rval; memcached_protocol_client_st *client= (void*)cookie; if (client->root->callback->interface.v1.version != NULL) { rval= client->root->callback->interface.v1.version(cookie, version_response_handler); } else { rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; } return rval; } /** * The map to remap between the com codes and the v1 logical setting */ static memcached_binary_protocol_command_handler comcode_v0_v1_remap[256]= { [PROTOCOL_BINARY_CMD_ADDQ]= add_command_handler, [PROTOCOL_BINARY_CMD_ADD]= add_command_handler, [PROTOCOL_BINARY_CMD_APPENDQ]= append_command_handler, [PROTOCOL_BINARY_CMD_APPEND]= append_command_handler, [PROTOCOL_BINARY_CMD_DECREMENTQ]= decrement_command_handler, [PROTOCOL_BINARY_CMD_DECREMENT]= decrement_command_handler, [PROTOCOL_BINARY_CMD_DELETEQ]= delete_command_handler, [PROTOCOL_BINARY_CMD_DELETE]= delete_command_handler, [PROTOCOL_BINARY_CMD_FLUSHQ]= flush_command_handler, [PROTOCOL_BINARY_CMD_FLUSH]= flush_command_handler, [PROTOCOL_BINARY_CMD_GETKQ]= get_command_handler, [PROTOCOL_BINARY_CMD_GETK]= get_command_handler, [PROTOCOL_BINARY_CMD_GETQ]= get_command_handler, [PROTOCOL_BINARY_CMD_GET]= get_command_handler, [PROTOCOL_BINARY_CMD_INCREMENTQ]= increment_command_handler, [PROTOCOL_BINARY_CMD_INCREMENT]= increment_command_handler, [PROTOCOL_BINARY_CMD_NOOP]= noop_command_handler, [PROTOCOL_BINARY_CMD_PREPENDQ]= prepend_command_handler, [PROTOCOL_BINARY_CMD_PREPEND]= prepend_command_handler, [PROTOCOL_BINARY_CMD_QUITQ]= quit_command_handler, [PROTOCOL_BINARY_CMD_QUIT]= quit_command_handler, [PROTOCOL_BINARY_CMD_REPLACEQ]= replace_command_handler, [PROTOCOL_BINARY_CMD_REPLACE]= replace_command_handler, [PROTOCOL_BINARY_CMD_SETQ]= set_command_handler, [PROTOCOL_BINARY_CMD_SET]= set_command_handler, [PROTOCOL_BINARY_CMD_STAT]= stat_command_handler, [PROTOCOL_BINARY_CMD_VERSION]= version_command_handler, }; /** * Try to execute a command. Fire the pre/post functions and the specialized * handler function if it's set. If not, the unknown probe should be fired * if it's present. * @param client the client connection to operate on * @param header the command to execute * @return true if success or false if a fatal error occured so that the * connection should be shut down. */ static protocol_binary_response_status execute_command(memcached_protocol_client_st *client, protocol_binary_request_header *header) { if (client->root->pedantic && memcached_binary_protocol_pedantic_check_request(header)) { /* @todo return invalid command packet */ } /* we got all data available, execute the callback! */ if (client->root->callback->pre_execute != NULL) { client->root->callback->pre_execute(client, header); } protocol_binary_response_status rval= PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND; uint8_t cc= header->request.opcode; if (client->is_verbose) { print_cmd(cc); } switch (client->root->callback->interface_version) { case 0: if (client->root->callback->interface.v0.comcode[cc] != NULL) { rval= client->root->callback->interface.v0.comcode[cc](client, header, raw_response_handler); } break; case 1: if (comcode_v0_v1_remap[cc] != NULL) { rval= comcode_v0_v1_remap[cc](client, header, raw_response_handler); } break; default: /* Unknown interface. * It should be impossible to get here so I'll just call abort * to avoid getting a compiler warning :-) */ abort(); } if (rval == PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND && client->root->callback->unknown != NULL) { rval= client->root->callback->unknown(client, header, raw_response_handler); } if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS && rval != PROTOCOL_BINARY_RESPONSE_EINTERNAL && rval != PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) { protocol_binary_response_no_extras response= { .message= { .header.response= { .magic= PROTOCOL_BINARY_RES, .opcode= cc, .status= htons(rval), .opaque= header->request.opaque, }, } }; rval= raw_response_handler(client, header, (void*)&response); } if (client->root->callback->post_execute != NULL) { client->root->callback->post_execute(client, header); } return rval; } /* ** ********************************************************************** ** "PROTOECTED" INTERFACE ** ********************************************************************** */ memcached_protocol_event_t memcached_binary_protocol_process_data(memcached_protocol_client_st *client, ssize_t *length, void **endptr) { /* try to parse all of the received packets */ protocol_binary_request_header *header; header= (void*)client->root->input_buffer; if (header->request.magic != (uint8_t)PROTOCOL_BINARY_REQ) { client->error= EINVAL; return MEMCACHED_PROTOCOL_ERROR_EVENT; } ssize_t len= *length; while (len >= (ssize_t)sizeof(*header) && (len >= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)))) { /* I have the complete package */ client->current_command= header; protocol_binary_response_status rv= execute_command(client, header); if (rv == PROTOCOL_BINARY_RESPONSE_EINTERNAL) { *length= len; *endptr= (void*)header; return MEMCACHED_PROTOCOL_ERROR_EVENT; } else if (rv == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) { return MEMCACHED_PROTOCOL_PAUSE_EVENT; } ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen)); len -= total; if (len > 0) { intptr_t ptr= (intptr_t)header; ptr += total; if ((ptr % 8) == 0) { header= (void*)ptr; } else { /* Fix alignment */ memmove(client->root->input_buffer, (void*)ptr, (size_t)len); header= (void*)client->root->input_buffer; } } *length= len; *endptr= (void*)header; } return MEMCACHED_PROTOCOL_READ_EVENT; } /* ** ********************************************************************** ** PUBLIC INTERFACE ** ********************************************************************** */ memcached_binary_protocol_callback_st *memcached_binary_protocol_get_callbacks(memcached_protocol_st *instance) { return instance->callback; } void memcached_binary_protocol_set_callbacks(memcached_protocol_st *instance, memcached_binary_protocol_callback_st *callback) { instance->callback= callback; } memcached_binary_protocol_raw_response_handler memcached_binary_protocol_get_raw_response_handler(const void *cookie) { (void)cookie; return raw_response_handler; } void memcached_binary_protocol_set_pedantic(memcached_protocol_st *instance, bool enable) { instance->pedantic= enable; } bool memcached_binary_protocol_get_pedantic(memcached_protocol_st *instance) { return instance->pedantic; }