Taille de texte par Defaut
Agrandir la Taille du Texte
Réduire la Taille du Texte
Options d'accessibilité
Informatique

Un Pool de Thread avec Priorité

Romain
21 octobre 2015
Pas de commentaires

Chez iTK, nous devons lancer, à chaque mise à jour de la météo, plusieurs milliers de simulations agronomiques. Le temps de calcul pour chaque simulation peut varier de deux à plusieurs dizaines de secondes. Pour réaliser cette opération nous utilisons un pool de threads.

Les utilisateurs n’aiment pas attendre !

En plus des simulations déclenchées par une mise à jour de nos données météo, nos utilisateurs ont la possibilité de demander de nouvelles simulations. Si l’un de ces derniers souhaite simuler pendant que les milliers d’autres simulations sont en train de tourner, il va devoir attendre que les autres simulations finissent pour enfin obtenir son résultat.

Un rapide calcul nous montre que pour 1000 simulations (très peu !), qui ont chacune un temps de calcul de 2 secondes (dans la fourchette basse, donc), parallélisées sur 10 threads, le temps d’attente est de 200 secondes, soit 3 minutes : voilà qui ne rendra pas notre utilisateur heureux.

Nous avons donc mis en place un système de pool, où la prochaine tâche est sélectionnée en fonction de sa priorité. Pour notre exemple, l’utilisateur sera toujours prioritaire par rapport aux simulations déclenchées par la mise à jour de la météo.

Gérer des priorités dans un pool de threads

Quand nous nous sommes lancés dans ce développement, nous pensions ce problème commun, avec une solution au moins triviale, au mieux déjà existante et prête à l’emploi. Malheureusement, rien n’existait dans notre stack backend Java / Spring.

Après quelques recherches nous avons choisi d’implémenter cette solution. On utilise l’Executor de spring, qui nécessite une Future et une Callable. On va définir les nôtres avec la gestion de priorité.

L’Executor

Tout d’abord nous devons définir un ThreadPoolExecutor avec une PriorityBlockingQueue. La méthode newTaskFor(…), à redéfinir, va nous permettre d’intercepter la création de nouvelle tâche. Ainsi nous pouvons ajouter à la PriorityBlockingQueue notre tâche avec sa priorité.

package com.itkweb.services.simulation.pool;

import java.util.concurrent.*;

public class SimulationThreadPoolTaskExecutor {

    private static final int MAX_ARRAY_SIZE = 100000;

    public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<>(MAX_ARRAY_SIZE, PriorityFuture.COMP)) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
            }
        };
    }

}

 

La Future

Une PriorityFuture qui est une Future avec une priorité. En effet, les pools de threads en java utilisent des futures pour communiquer le résultat d’un traitement. Pour ranger dans le bon ordre les tâches à exécuter, PriorityBlockingQueue utilise la fonction PriorityFuture.COMP qui est une implémentation standard de la méthode compare en Java.

package com.itkweb.services.simulation.pool;

import java.util.Comparator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

Nous avons implémenté COMP de sorte que la plus petite valeur passe en premier. C’est ici qu’il vous faut décider quoi faire des priorités.

La Callable

Il ne nous manque plus qu’à définir PriorityCallable, une petite interface permettant de définir la priorité d’une Callable, ainsi nous aurons couvert la totalité des fonctionnalités d’un thread de pool.

package com.itkweb.services.priv.simulation.pool;

import java.util.concurrent.Callable;

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

 

L’Utilisation

Rien de plus simple, puisqu’il suffit d’appeler notre executor comme avant.

...

public SimulatorServiceImpl() {
  executor = SimulationThreadPoolTaskExecutor.getPriorityExecutor(NB_SIMU);
}

public Future<SimulationState> runSimulation(AbstractSimulationContext simulationContext) {
  return executor.submit(simulationCallableFactory.createCallable(simulationContext));
}

...

Nous nous sommes juste permis de créer une Factory afin de faciliter la création de l’instance de Callable et surtout de cacher les détails d’implémentation, comme l’initialisation du contexte Spring.

public SimulationCallable createCallable(AbstractSimulationContext simulationContext) {
  SimulationCallable task = applicationContext.getBean(SimulationCallable.class);
  task.initialize(simulationContext);
  return task;
}

Conclusion

En interne, java va transformer notre Callable en PriorityFuture. Comme cette PriorityFuture spécialise RunnableFuture, le thread va pouvoir se lancer, traiter notre demande. Nous aurons alors le choix d’attendre le résultat et bloquer le thread courant ou de ne rien faire de la future et laisser le thread courant continuer son exécution normale.

Pour la suite des aventures, nous pensons maintenant passer à une architecture micro-service, qui fera l’objet d’un autre article !

Si jamais vous avez réussi à implémenter une autre solution, nous serons ravis de la découvrir.

Commentaires 0

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.