Agregando Tareas a la ThreadPool

La clase ThreadPool permite lanzar tareas asíncronas evitándonos la engorrosa gestión de prioridades, carga de procesador, seguridad, etc.

Apunto este snippet por aquí que luego siempre se me olvida cómo se hace.

class Program
{
    static void Main(string[] args)
    {
        Runner r = new Runner();
        r.Run();
        Console.WriteLine("Todas las llamadas finalizadas! (Las tareas de la pool siguen corriendo)");
        Console.ReadLine();
    }
}
class Runner
{
    public void Run()
    {
        for (int i = 0; i < 10; i++)
        {
            // Agregamos tareas a la ThreadPool
            // Como parámetros pasamos referencia a un método que cumpla
            // la firma del delgado System.Threading.WaitCallBack y un parámetro
            System.Threading.ThreadPool.QueueUserWorkItem(new WaitCallback(Task), i);
        }
    }
    // Método que será ejecutado por la ThreadPool.
    // Recibiremos como parámetro lo que hayamos pasado al agregar la tarea a la pool
    private void Task(object number)
    {
        int taskNumber = (int)number;
        Console.WriteLine("Iniciando tarea {0}...",taskNumber.ToString());
        // Esperamos un tiempo aleatorio entre 1 y 10 segs.
        Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(1000,10000));
        Console.WriteLine("Finalizada tarea {0}!", taskNumber.ToString());
    }
}

Esta sería la salida que produce este snippet:

Para tareas más sencillas podemos usar delegados anónimos, en lugar de tener que  crear un método para que sea ejecutado por la pool.

public void RunAnonymous()
{
    for (int i = 0; i < 10; i++)
    {
        // Agregamos tareas a la ThreadPool
        // Para la llamada usamos un delegado anónimo que debe cumplir la
        // firma de System.Treading.WaitCallBack
        string nombreEmpleado = "Empleado " + i.ToString(), idEmpleado = i.ToString();
        ThreadPool.QueueUserWorkItem(delegate(Object param) { ProcesarEmpleado (idEmpleado, nombreEmpleado); });
    }
}
// Procesa los datos del empleado, etc, etc, etc
private void ProcesarEmpleado(string idEmpleado, string nombreEmpleado)
{
    Console.WriteLine("Procesando {0}/{1}...", idEmpleado, nombreEmpleado);
    // Esperamos un tiempo aleatorio entre 1 y 10 segs.
    Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(1000, 10000));
    Console.WriteLine("Fin proceso empleado {0}/{1}!", idEmpleado, nombreEmpleado);
}

Cómo podríamos saber cuándo han acabado de ejecutarse todos los hilos?

Creamos un flag a nivel de módulo en la clase Runner, que nos indicará cuántos hilos en ejecución hay actualmente.

private int _runningTasksCount = 0;

Hay que tener mucho cuidado con las variables accedidas por varios hilos a la vez, pues es posible que cada procesador (físico o lógico) tenga una copia del valor de esta variable. Si dos hilos acceden simultáneamente, y lo que es peor, la actualizan simultáneamente, puede producirse una pérdida de datos.

// Método que será ejecutado por la ThreadPool.
// Recibiremos como parámetro lo que hayamos pasado al agregar la tarea a la pool
private void Task(object param)
{
    // Indicar que hay una tarea más ejecutándose en la pool
    Interlocked.Increment(ref _runningTasksCount);
    int taskNumber = (int)param;
    Console.WriteLine("Iniciando tarea {0}...", taskNumber.ToString());
    // Esperamos un tiempo aleatorio entre 1 y 10 segs.
    Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(1000, 10000));
    Console.WriteLine("Finalizada tarea {0}!", taskNumber.ToString());
    // Indicar que hay una tarea menos ejecutándose en la pool
    Interlocked.Decrement(ref _runningTasksCount);
    // Hemos acabado
    if (Thread.VolatileRead(ref _runningTasksCount)<1)
    {
        Console.WriteLine("Todas las tareas completadas!");
    }
}

La clase System.Threading.Interlocked posee mecanismos para modificar datos de forma segura. En el ejemplo lo que hacemos es incrementar el valor del flag usando el método Increment, y lo decrementamos usando Decrement.
Finalmente para una lectura segura (puede ocurrir que la copia de la variable que tiene el procesador en el que se ejecuta el hilo no tenga un valor actualizado) usamos el método estático VolatileRead de System.Threading.Thread

Ocurre que como el hilo principal tiene como única tarea lanzar a los demás hilos, acaba antes que los demás. Si nuestra aplicación es como la del ejemplo, y el hilo principal se ejecuta en una aplicación de consola, la aplicación finalizará antes que los hilos acaben de ejecutarse. Es posible que en este caso, al finalizar el proceso, los demás finalizen también "a capón".

Por esto necesitamos un mecanismo que impida que la consola se cierre antes que todos los hilos acaben su trabajo.

Yo lo gestionaría envolviendo el flag que acabamos de crear en una propiedad de sólo lectura y la consultaría desde el main de la consola.

public int RunningTasksCount
{
    get { return Thread.VolatileRead(ref _runningTasksCount); }
}

static void Main(string[] args)
{
    Runner r = new Runner();
    r.Run();
    Console.WriteLine("Carga de la pool OK! (Las tareas de la pool siguen corriendo)");
    // Esperar a que todo haya acabado
    while (r.RunningTasksCount > 0)
    {
        Thread.Sleep(100);
    }
    Console.ReadLine();
}

Código Fuente

Comments

No Comments