require 'redis' class RedisQueue attr_reader :qname def initialize(name, options={}) @redis = Redis.new(options) @qname = name end def size @redis.llen(@qname) end def empty? @redis.llen(@qname).zero? end def put(item, options={}) if options[:priority] == :high @redis.lpush(@qname, item) else @redis.rpush(@qname, item) end end def get(timeout=0) item = @redis.blpop(@qname, timeout) item ? item[1] : nil end def get_nonblock item = @redis.lpop(@qname) return item[1] if item fail Errno::EWOULDBLOCK end def done! @redis.quit end end
La clase anterior es muy fácil de usar, a continuación dejo un ejemplo de uso:
queue = RedisQueue.new('queue:app') p queue.qname p queue.size p queue.empty? queue.put 'Tuesday' queue.put 'Wednesday' queue.put 'Monday', :priority => :high # Ahora Monday es lo primero en salir. p queue.size p queue.empty? p queue.get p queue.get p queue.get p queue.get(2) # Espera por 2 segundos. begin p queue.get_nonblock rescue Errno::EWOULDBLOCK puts 'La cola no tiene elementos, intenta hacer algo mientras se llena.' end
Implementación en C
La versión de C requiere un poquito más de nuestra parte, pues hay que estar pendiente de la memoria, pero sin duda la implementación en C es la ideal para los ambientes de producción, donde la velocidad importa mucho. El siguiente código hace uso de la librería Hiredis, la cual en mí opinión, necesita un fichero Coding Style urgentemente.
#include <stdio.h> #include <stdlib.h> #include <string.h> #include "hiredis.h" struct redis_queue { redisContext *redis; char *qname; }; /* * Se conecta con Redis y prepara el entorno de la cola. * * host: Servidor de Redis. * port: Puerto para conectar con Redis. * qname: Nombre de la cola. * * devuelve NULL en caso de error o una estructura redis_queue. */ struct redis_queue *init_redis_queue(const char *host, short port, char *qname) { struct redis_queue *queue; queue = malloc(sizeof(*queue)); queue->redis = redisConnect(host, port); if(queue->redis->err) return NULL; queue->qname = malloc(strlen(qname)); memcpy(&queue->qname[0], qname, strlen(qname)); return queue; } /* * Agrega un nuevo elemento a la cola. * * queue: Estructura previamente devuelta por init_redis_queue(). * data: Datos para anexarlo a la cola. * nbytes: Numero de bytes para copiar a la cola desde la variable data. * * devuelve -1 en caso de error o un numero positivo proveniente de Redis. */ long long queue_put(struct redis_queue *queue, char *data, size_t nbytes) { long long n; redisReply *reply; if(!queue || !queue->qname) return -1; reply = redisCommand(queue->redis, "lpush %s %b", queue->qname, data, nbytes); if(!reply) return -1; n = reply->integer; freeReplyObject(reply); return n; } /* * Obtiene un elemento de la cola o se bloquea por tiempo indefinido. * * queue: Estructura previamente devuelta por init_redis_queue(). * * devuelve NULL en caso de error o la información solicitada. El programador * es responsable de liberar con free() los datos devueltos por queue_get(). */ unsigned char* queue_get(struct redis_queue *queue) { redisReply *reply; unsigned char *data; if(!queue || !queue->qname) return NULL; reply = redisCommand(queue->redis, "brpop %s 0", queue->qname); if(!reply || reply->elements!=2) return NULL; data = malloc(reply->element[1]->len); memcpy(&data[0], reply->element[1]->str, reply->element[1]->len); freeReplyObject(reply); return data; } /* * Obtiene la cantidad de elementos que posee la cola. * * queue: Estructura previamente devuelta por init_redis_queue(). * * devuelve -1 en caso de error o un número positivo proveniente de Redis. */ long long queue_size(struct redis_queue *queue) { long long size; redisReply *reply; if(!queue || !queue->qname) return -1; reply = redisCommand(queue->redis, "llen %s", queue->qname); if(!reply) return -1; size = reply->integer; freeReplyObject(reply); return size; }
El código anterior se puede utilizar de la siguiente manera:
int main(void) { unsigned char *data; struct redis_queue *queue; queue = init_redis_queue("localhost", 6379, "Queue"); if(!queue) { fprintf(stderr, "Error, imposible conectarse con Redis.\n"); exit(EXIT_FAILURE); } queue_put(queue, "Hola\0", 5); queue_put(queue, "tio\0", 4); queue_put(queue, "que tal\0", 8); printf("Size: %lld\n", queue_size(queue)); data = queue_get(queue); puts(data); free(data); data = queue_get(queue); puts(data); free(data); data = queue_get(queue); puts(data); free(data); return 0; }Realmente no se cual es la manera correcta de compilar un programa que depende de Hiredis y la documentación tampoco dice cómo, la manera en que lo haremos será: Descargar el código fuente desde github; descomprimir el archivo; con make compilar Hiredis; posicionas tu fichero fuente dentro de la carpeta de Hiredis y lo compilas contra la librería libhiredis.a, más o menos así:
$ make
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings net.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings hiredis.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings sds.c
cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings async.c
...
$ gcc -c -o queue.o queue.c
$ gcc -o queue queue.o libhiredis.a
Apéndice
La temática para implementar una cola FIFO es muy sencilla. Imagina que tienes una pila de platos donde uno esta encima del otro, ahora imagina que para obtener un plato de esa enorme pila solo puedes cogerlo desde abajo, allá en la base de la pila y que para agregar platos al montón, solo puedes hacerlo desde el tope. Si imaginas esto, estarás imaginando el funcionamiento de una cola FIFO. De manera semejante, para implementar una cola LIFO debes imaginar, que solo sacas y pones platos desde un mismo lugar, sea la base o la cima de la pila. A continuación un ejemplo de cómo hacerlo desde el CLI de Redis:
redis 127.0.0.1:6379> lpush COLA "Primer Elemento" (integer) 3 redis 127.0.0.1:6379> lpush COLA "Segundo Elemento" (integer) 4 redis 127.0.0.1:6379> brpop COLA 0 1) "COLA" 2) "Primer Elemento" redis 127.0.0.1:6379> brpop COLA 0 1) "COLA" 2) "Segundo Elemento" redis 127.0.0.1:6379> brpop COLA 0 ^CPara más información acerca de las ordenes que admite Redis, visita: http://redis.io/commands
Referencias
Excelente entrada man!
ResponderEliminar