151 votos

Creación de una cola de bloqueo <T> en NET?

Tengo un escenario donde tengo varios hilos de la adición de una cola y de varios subprocesos de la lectura de la misma cola. Si la cola alcanza un tamaño específico de todos los hilos que son el llenado de la cola serán bloqueados en agregar hasta que un elemento se elimina de la cola.

La solución a continuación es lo que estoy usando ahora mismo y mi pregunta es: ¿Cómo puede ser mejorado? Hay un objeto que ya permite a este comportamiento en la BCL que yo debería usar?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

192voto

Marc Gravell Puntos 482669

Eso se ve muy inseguro (muy poco sincronización); ¿qué tal algo así como:

 class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}
 

(Edit)

En realidad, usted querría una manera de cerrar la cola para que los lectores comiencen salir limpiamente - tal vez algo como una bandera bool - si se establece, una cola vacía sólo devuelve (en lugar de bloquear):

 bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
 

50voto

xhafan Puntos 544

Utilice .net 4 BlockingCollection, para encolar uso Agregar (), para quitar de la cola uso Take (). Utiliza internamente sin bloqueo ConcurrentQueue. Más información aquí Rápido y Mejor Productor / consumidor técnica cola BlockingCollection vs cola concurrente

14voto

Daniel Earwicker Puntos 63298

"Cómo puede ser mejorado?"

Bien, usted tiene que mirar en cada método en su clase y considerar qué pasaría si otro hilo era a la vez de llamar a este método o por cualquier otro método. Por ejemplo, poner un candado en el método de Quitar, pero no en el método Add. ¿Qué sucede si un hilo se Añade al mismo tiempo que otro hilo Quita? Cosas malas.

Considerar también la posibilidad de que un método puede devolver un segundo objeto que proporciona acceso a la primera del objeto de datos internos - por ejemplo, GetEnumerator. Imagina un hilo y que el enumerador, otro subproceso es la modificación de la lista al mismo tiempo. No es bueno.

Una buena regla general es hacer esto más simple para obtener el derecho al reducir el número de métodos de la clase a un mínimo absoluto.

En particular, no se hereda de otra clase de contenedor, porque se exponen toda clase de métodos, de ofrecer una manera para que la persona que llama para corromper los datos internos, o para ver parcialmente completa de los cambios en los datos (igual de malo, porque los datos dañada en ese momento). Ocultar todos los detalles y ser completamente despiadado acerca de cómo permitir el acceso a ellos.

Me gustaría aconsejamos que utilice el estándar de las soluciones de conseguir un libro acerca de roscado o el uso de la 3ª parte de la biblioteca. De otro modo, dado lo intentas, usted va a ser el código de depuración por un largo tiempo.

También, ¿no tendría más sentido para Quitar a la devolución de un artículo (por ejemplo, el que se agregó en primer lugar, como se trata de una cola), en lugar de la persona que llama la elección de un determinado artículo? Y cuando la cola está vacía, tal vez Quitar también debe bloquear.

Actualización: Marc respuesta implementa realmente todas estas sugerencias! :) Pero voy a dejar esto aquí ya que puede ser útil para entender por qué su versión es una mejora.

5voto

Mark Rendle Puntos 4592

Sólo llamé esto utilizando las extensiones reactivas y recordé esta pregunta:

 public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}
 

No necesariamente del todo seguro, pero muy simple.

4voto

Kevin Puntos 122

Esto es lo que he venido op para un seguro para subprocesos limitado el bloqueo de cola.

 using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
 

Iteramos.com

Iteramos es una comunidad de desarrolladores que busca expandir el conocimiento de la programación mas allá del inglés.
Tenemos una gran cantidad de contenido, y también puedes hacer tus propias preguntas o resolver las de los demás.

Powered by:

X