viernes, 7 de diciembre de 2012

Implementación de un Queue FIFO en Redis con Ruby y C

En esta entradas implementaremos una cola de tareas usando Redis, para aquellos casos donde RabbitMQ y ZeroMQ resultan muy complejos para lo que se quiere lograr. Estaré implementando la cola con Ruby y también con C para darle algo de originalidad a la entrada. Por cierto debo aclarar que el código de Ruby está basado en un código python escrito por Peter Hoffman en su Blog.
require 'redis'

class RedisQueue
     attr_reader :qname

     def initialize(name, options={})
          @redis = Redis.new(options)
          @qname = name
     end

     def size
          @redis.llen(@qname)
     end

     def empty?
          @redis.llen(@qname).zero?
     end

     def put(item, options={})
          if options[:priority] == :high
               @redis.lpush(@qname, item)
          else
               @redis.rpush(@qname, item)
          end
     end

     def get(timeout=0)
          item = @redis.blpop(@qname, timeout)
          item ? item[1] : nil
     end

     def get_nonblock
          item = @redis.lpop(@qname)
          return item[1] if item
          fail Errno::EWOULDBLOCK  
     end

     def done!
          @redis.quit
     end
end

La clase anterior es muy fácil de usar, a continuación dejo un ejemplo de uso:
queue = RedisQueue.new('queue:app')

p queue.qname
p queue.size
p queue.empty?

queue.put 'Tuesday'
queue.put 'Wednesday'
queue.put 'Monday', :priority => :high  # Ahora Monday es lo primero en salir.

p queue.size
p queue.empty?

p queue.get
p queue.get
p queue.get
p queue.get(2)     # Espera por 2 segundos.

begin
     p queue.get_nonblock
rescue Errno::EWOULDBLOCK
     puts 'La cola no tiene elementos, intenta hacer algo mientras se llena.'
end

Implementación en C

La versión de C requiere un poquito más de nuestra parte, pues hay que estar pendiente de la memoria, pero sin duda la implementación en C es la ideal para los ambientes de producción, donde la velocidad importa mucho. El siguiente código hace uso de la librería Hiredis, la cual en mí opinión, necesita un fichero Coding Style urgentemente.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "hiredis.h"

struct redis_queue {
     redisContext   *redis;
     char           *qname;
};

/*
 * Se conecta con Redis y prepara el entorno de la cola.
 *
 * host: Servidor de Redis.
 * port: Puerto para conectar con Redis.
 * qname: Nombre de la cola.
 *
 * devuelve NULL en caso de error o una estructura redis_queue.
 */
struct redis_queue
*init_redis_queue(const char *host, short port, char *qname)
{
     struct redis_queue   *queue;

     queue = malloc(sizeof(*queue));

     queue->redis = redisConnect(host, port);
     if(queue->redis->err)
          return NULL;

     queue->qname = malloc(strlen(qname));
     memcpy(&queue->qname[0], qname, strlen(qname));
  return queue;
}

/*
 * Agrega un nuevo elemento a la cola.
 *
 * queue: Estructura previamente devuelta por init_redis_queue().
 * data: Datos para anexarlo a la cola.
 * nbytes: Numero de bytes para copiar a la cola desde la variable data.
 *
 * devuelve -1 en caso de error o un numero positivo proveniente de Redis.
 */
long long
queue_put(struct redis_queue *queue, char *data, size_t nbytes)
{
     long long    n;
     redisReply  *reply;

     if(!queue || !queue->qname)
          return -1;

     reply = redisCommand(queue->redis, "lpush %s %b", queue->qname, data, nbytes);

     if(!reply)
          return -1;

     n = reply->integer;
     freeReplyObject(reply);
   return n;
}

/*
 * Obtiene un elemento de la cola o se bloquea por tiempo indefinido.
 *
 * queue: Estructura previamente devuelta por init_redis_queue().
 *
 * devuelve NULL en caso de error o la información solicitada. El programador
 * es responsable de liberar con free() los datos devueltos por queue_get().
 */
unsigned char*
queue_get(struct redis_queue *queue)
{
     redisReply      *reply;
     unsigned char   *data;

     if(!queue || !queue->qname)
          return NULL;

     reply = redisCommand(queue->redis, "brpop %s 0", queue->qname);
     if(!reply || reply->elements!=2)
          return NULL;

     data = malloc(reply->element[1]->len);
     memcpy(&data[0], reply->element[1]->str, reply->element[1]->len);
     freeReplyObject(reply);
   return data;
}

/*
 * Obtiene la cantidad de elementos que posee la cola.
 *
 * queue: Estructura previamente devuelta por init_redis_queue().
 *
 * devuelve -1 en caso de error o un número positivo proveniente de Redis.
 */
long long
queue_size(struct redis_queue *queue)
{
     long long   size;
     redisReply  *reply;

     if(!queue || !queue->qname)
          return -1;

     reply = redisCommand(queue->redis, "llen %s", queue->qname);
     if(!reply)
          return -1;

     size = reply->integer;
     freeReplyObject(reply);
   return size;
}

El código anterior  se puede utilizar de la siguiente manera:

int
main(void)
{
     unsigned char        *data;
     struct redis_queue   *queue;

     queue = init_redis_queue("localhost", 6379, "Queue");
     if(!queue) {
          fprintf(stderr, "Error, imposible conectarse con Redis.\n");
          exit(EXIT_FAILURE);
     }

     queue_put(queue, "Hola\0", 5);
     queue_put(queue, "tio\0", 4);
     queue_put(queue, "que tal\0", 8);

     printf("Size: %lld\n", queue_size(queue));

     data = queue_get(queue);
     puts(data);
     free(data);

     data = queue_get(queue);
     puts(data);
     free(data);

     data = queue_get(queue);
     puts(data);
     free(data);
  return 0;
}
Realmente no se cual es la manera correcta de compilar un programa que depende de Hiredis y la documentación tampoco dice cómo, la manera en que lo haremos será: Descargar el código fuente desde github; descomprimir el archivo; con make compilar Hiredis; posicionas tu fichero fuente dentro de la carpeta de Hiredis y lo compilas contra la librería libhiredis.a, más o menos así:

$ make
    cc -std=c99 -pedantic -c -O3 -fPIC  -Wall -W -Wstrict-prototypes -Wwrite-strings   net.c
    cc -std=c99 -pedantic -c -O3 -fPIC  -Wall -W -Wstrict-prototypes -Wwrite-strings   hiredis.c
    cc -std=c99 -pedantic -c -O3 -fPIC  -Wall -W -Wstrict-prototypes -Wwrite-strings   sds.c
    cc -std=c99 -pedantic -c -O3 -fPIC  -Wall -W -Wstrict-prototypes -Wwrite-strings   async.c
    ...
$ gcc -c -o queue.o queue.c
$ gcc -o queue queue.o libhiredis.a

 Apéndice
La temática para implementar una cola FIFO es muy sencilla. Imagina que tienes una pila de platos donde uno esta encima del otro, ahora imagina que para obtener un plato de esa enorme pila solo puedes cogerlo desde abajo, allá en la base de la pila y que para agregar platos al montón, solo puedes hacerlo desde el tope. Si imaginas esto, estarás imaginando el funcionamiento de una cola FIFO. De manera semejante, para implementar una cola LIFO debes imaginar, que solo sacas y pones platos desde un mismo lugar, sea la base o la cima de la pila. A continuación un ejemplo de cómo hacerlo desde el CLI de Redis:
redis 127.0.0.1:6379> lpush COLA "Primer Elemento"
(integer) 3
redis 127.0.0.1:6379> lpush COLA "Segundo Elemento"
(integer) 4
redis 127.0.0.1:6379> brpop COLA 0
1) "COLA"
2) "Primer Elemento"
redis 127.0.0.1:6379> brpop COLA 0
1) "COLA"
2) "Segundo Elemento"
redis 127.0.0.1:6379> brpop COLA 0
^C
Para más información acerca de las ordenes que admite Redis, visita: http://redis.io/commands
Referencias
  1. http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html 

1 comentario: