Zrównoleglenie operacji w aplikacji webowej

Zrownoleglenie

W dzisiejszym artykule opiszę, jak łatwo można zwiększyć wydajność przykładowej aplikacji webowej, zamieniając sposób wykonywania wewnętrznych akcji/odwołań z sekwencyjnego na równoległe.

W świecie systemów klasy enterprise do tego celu dojść można wieloma drogami. W przypadku aplikacji JEE moglibyśmy np. „rozrzucić” JMS-em zadania po kolejkach, pozostawiając „czarną robotę” dla Message-Driven Beans. W projekcie EAI opcji mielibyśmy jeszcze więcej (np. FlowN BPEL-a czy Splitter EIP). Ale co zrobić, kiedy rzecz dotyczy zwykłego, prostego servleta?

Tytułem wstępu

Wśród wielu produktów informatycznych, jakie rozwijamy u naszych Klientów, jest system archiwizacji i zarządzania dokumentami. Oparty o platformę OpenText, zawiera wiele modułów zapewniających wsparcie dla całego cyklu życia dokumentów: od skanowania, oczyszczania, poprzez kierowanie przepływem pracy, reprodukcję, wyszukiwanie, udostępnianie, po archiwizację i retencję.

Chciałoby się tu napisać więcej, bo platforma ta jest naprawdę ciekawa, zarówno z punktu widzenia biznesowego (automatyzacja, zwiększenie produktywności, obniżenie kosztów operacyjnych), jak i technologicznego. Jesteśmy jednak na łamach specjalistycznego bloga, skupmy się więc na wątku technicznym. No i, oczywiście, na temacie artykułu.

Jednym z modułów systemu jest serwer usług. Udostępnia on wiele gotowych usług (funkcjonalności) oraz pozwala na budowanie/komponowanie nowych. W efekcie nasz system stanowi repozytorium składowania dokumentów dla wielu systemów dziedzinowych Klienta.

Przykładową usługą realizowaną przez system jest pobranie na wskazaną lokalizację sieciową zarchiwizowanych obrazów dokumentów (pliki graficzne, PDF,itp.). Od wielu lat sprawowała się ona „dzielnie”, natomiast wraz z rozwojem biznesu naszego Klienta oraz wzrostem liczby klientów Web Service, potrzebne było jej wyskalowanie.

Szybka analiza pokazała, że nie są tu problemem procesory ani pamięć. CPU przez większość czasu „nudzą się”, a garbage collector sprawnie „odśmieca” pamięć. Okazało się, że można znacznie przyspieszyć działanie usług, skracając czas obsługi żądań http, poprzez lepsze zrównoleglenie wykonywanych operacji (pobierania plików z dostarczanych przez platformę InputStream’ów).  Ponieważ z przyczyn niezależnych nie można zmieniać architektury, powinniśmy pozostać w ramach obecnego serwera (Tomcat 6) i implementacji („zwykła” Java, bez Spring, bez JEE, bez JMS). Najprostszym rozwiązaniem, jakie się tu nasuwa, jest własne zarządzanie wątkami.

Duchy przeszłości

Gdzieś głęboko we mnie zakorzenione było przeświadczenie, że samodzielne kombinowanie z wątkami w servletach, to rzecz, jeśli nawet nie zabroniona, to raczej niewskazana. Postanowiłem zrobić szybki „rachunek sumienia”: skąd właściwie wzięło się to przekonanie? W wielu grupach dyskusyjnych również można znaleźć niemało wątpliwości, czy takie operacje są dozwolone: http://www.coderanch.com/t/535749/Servlets/java/Threads-Servlets, http://www.theserverside.com/discussions/thread.tss?thread_id=44353, http://www.adam-bien.com/roller/abien/entry/legally_starting_threads_in_ejbs.

Dlatego też postanowiłem zajrzeć do źródła, czyli do specyfikacji. „Wygooglałem” fragment :

A servlet container may place security restrictions on the environment in which a servlet executes. In a Java Platform, Standard Edition (J2SE, v.1.3 or above) or Java Platform, Enterprise Edition (Java EE, v.1.3 or above) environment, these restrictions should be placed using the permission architecture defined by the Java platform. For example, high-end application servers may limit the creation of a Thread object to insure that other components of the container are not negatively impacted.

W swobodnym tłumaczeniu mamy zatem następujące zalecenia: kontener może nakładać pewne ograniczenia, może ograniczać możliwość tworzenia. Okazuje się zatem, że to najmocniejsze z ograniczeń, jakie znalazłem w całej specyfikacji, wcale nie zabrania zarządzania wątkami!

Natomiast fragment specyfikacji EJB mówi wyraźnie:

The enterprise bean must not attempt to manage threads. The enterprise bean must not attempt to start, stop, suspend, or resume a thread, or to change a thread priority or name. The enterprise bean must not attempt to manage thread groups.

Podsumowując, formalnie rzecz biorąc wykorzystywanie wątków w servletach jest dozwolone. Moje wątpliwości (chyba nie tylko moje) brały się zapewne z bezwiednego „rzutowania” ograniczeń EJB na Servlety (jedno i drugie jest częścią specyfikacji JEE), nieprzefiltrowanego szumu informacyjnego, ale też naturalnej niechęci do zabaw z odbezpieczoną bronią.

No nic, skoro można, to trzeba spróbować! 🙂

Przygotujmy sobie zatem „narzędzia” do dalszych rozważań. Aby łatwiej było zrozumieć zagadnienie, z którym mieliśmy do czynienia, najpierw zamodeluję sytuację obecną.

 

Sytuacja wyjściowa

Załóżmy, że mamy do czynienia z aplikacją webową, która odbiera żądanie http, by na jego podstawie uruchomić N zadań, a następnie wysłać odpowiedź do klienta. Prosty servlet realizujący to zadanie mógłby wyglądać tak:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package pl.atena.technoblog.parallel;

import java.io.*;
import java.util.*;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.*;

@WebServlet("/serial")
public class Serial extends HttpServlet {
    private static final long serialVersionUID = 7357242480855825340L;

    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // get the number of workers to run
        int count = this.getRequestedCount(request, 4);
        // create the workers
        ArrayList workers = new ArrayList();
        for (int i = 0; i < count; i++) {
            WorkUnit wu = new WorkUnit("worker" + (i + 1));
            workers.add(wu);
        }
       
        // run the workers one after another
        Long t1 = System.nanoTime();
        for (WorkUnit wu : workers) {
            wu.run();
        }
        Long t2 = System.nanoTime();
        Writer w = response.getWriter();
        w.write(String.format("\n Request processed in %dms!", (t2 - t1) / 1000000));
        w.flush();
        w.close();
    }
   
    /**
     * Method for getting the requested workers' count.
     * If no such param in the request, assign defaultValue.
     */
    private int getRequestedCount(HttpServletRequest request, int defaultValue) {
        int result = defaultValue;
        String param = request.getParameter("count");
        if (param != null) {
            result = Integer.parseInt(param);
        }
        return result;
    }
}

Kluczowa jest tu metoda doGet (linie 14-37), która:

  • w linii 17 odczytuje liczbę zadań do wykonania;
  • w liniach 19-23 tworzy i inicjuje obiekty, które hermetyzują logikę biznesową (klasa WorkUnit);
  •  w liniach 27-29 po kolei uruchamia logikę przetwarzania (WorkUnit.run()).

Nie wnikając w specyfikę dziedziny i implementacji konkretnego przypadku, przyjmuję, że mamy poniższą klasę pomocniczą, która „symuluje” wykonanie atomowej porcji logiki biznesowej (np. długotrwały odczyt z bazy danych, odczyt/skopiowanie pliku) – w tym wydaniu metoda run() po prostu odczekuje cztery sekundy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package pl.atena.technoblog.parallel;

public class WorkUnit {
    private String name;
    public WorkUnit(String name) {
        this.name = name;
    }

    public void run() {
        System.out.println(this.name + ": start");
        try {
            // complicated business logic:
            Long t1 = System.nanoTime();
            Thread.sleep(4000);
            Long t2 = System.nanoTime();
           
            System.out.println(String.format("%s finished in %d ms", this.name, (t2 - t1) / 1000000));
        } catch (InterruptedException e) {
            System.out.println(this.name + ": error");     
            e.printStackTrace();
        }
    }
}

Po uruchomieniu aplikacji webowej i wywołaniu servleta (w parametrze count definiujemy, ile workerów chcemy uruchomić):

http://localhost:8080/TestExecutorService/serial?count=4

Na konsoli zobaczymy mniej więcej taki ślad wykonania:

worker1: start  
worker1 finished in 4004 ms  
worker2: start  
worker2 finished in 4000 ms  
worker3: start  
worker3 finished in 4003 ms  
worker4: start  
worker4 finished in 4003 ms  

Sam servlet zwróci tekst:

Request processed in 16012ms!

Widać, że każdy z workerów zużył mniej więcej cztery sekundy, a łączne „obrobienie” żądania zajęło servletowi, jak się należało spodziewać, szesnaście sekund. Możemy zaobserwować również, że każdy następny worker startował dopiero wtedy, kiedy poprzedni kończył działanie („worker1: start”, „worker1 finished”, „worker2: start”, „worker2 finshed”, itd.).

Na marginesie: proszę mi wybaczyć, że zastosowałem tak mało subtelne sposoby logowania (println) i pomiaru czasu. Nie chciałem dodatkowo zaśmiecać źródeł, a dla naszych prostych przykładów wydają się one być wystarczające.

Zrównoleglenie

Jak widać na tym prostym modelu, wywoływane przez servlet zadania są niezależne (wynik jednego nie zależy od poprzedniego). To, że w pierwotnej implementacji workery muszą na siebie czekać, jest nieuzasadnionym marnotrawstwem czasu. (Jedyny punkt synchronizacji jest taki, że servlet powinien wysłać odpowiedź dopiero wtedy, gdy wszystkie wystartowane zadania zostaną ukończone.) Wystarczyłoby uruchomić je jednocześnie, a

Realizacja tej zasadniczo słusznej koncepcji wymaga jednak odpowiedzi na kilka pytań:

  1. Jak sprawić, żeby nasz worker uruchamiał się asynchronicznie (tj. uruchamiał się i oddawał sterowanie do servleta tak, żeby ten mógł odpalić następną porcję logiki do wykonania)?
  2. Jak zarządzać pulą wątków do uruchamiania worker’ów?
  3. Jak najlepiej osadzić taką pulę w aplikacji web’owej?
  4. Jak z poziomu aplikacji web’owej wykorzystywać pulę wątków?
  5. Jak koordynować wykonanie zadań?
  6. Jak kontrolować (ograniczać) liczbę uruchamianych jednorazowo worker’ów (nie można ich odpalać w nieskończoność – w końcu skonsumowałyby wszystkie zasoby serwera)?
    A jeśli ograniczać, to jak obsłużyć wątki oczekujące na wykonanie?

Poniżej postaram się po kolei wyjaśnić, co według mnie kryje się pod każdym z powyższych pytań i w jaki sposób mam zamiar na nie odpowiedzieć.

Uruchomienie asynchroniczne

Jak sprawić, żeby nasz worker uruchamiał się asynchronicznie? W Javie 6 odpowiedź na to pytanie jest stosunkowo prosta. Podstawowym środkiem do realizacji współbieżności są wątki – klasa java.lang.Thread.

Przerobię teraz WorkUnit tak, żeby klasa mogła być uruchamiana w dedykowanym wątku:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package pl.atena.technoblog.parallel;

public class RunnableWorkUnit implements Runnable {
    private String name;
    public RunnableWorkUnit(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(String.format("%d:%s started (%s)", System.currentTimeMillis(), this.name,Thread.currentThread().getName()));
        try {
            // complicated business logic:
            Long t1 = System.nanoTime();
            Thread.sleep(4000);
            Long t2 = System.nanoTime();
           
            System.out.println(String.format("%d:%s finished in %d ms", System.currentTimeMillis(),this.name, (t2 - t1) / 1000000));
        } catch (InterruptedException e) {
            System.out.println(this.name + ": error");
            e.printStackTrace();
        }
    }
}

Nowa klasa, RunnableWorkUnit, jest niemal identyczna jak WorkUnit. Główna różnica polega na tym, że implementuje ona interfejs java.lang.Runnable. Wystartowanie naszego nowego workera mogłoby wyglądać np. tak:

RunnableWorkUnit rwu = new RunnableWorkUnit("test");
Thread t = new Thread(rwu);
t.start();

I daje w wyniku oczekiwane:

Mam już Runnable, którego możemy wywołać w oddzielnym wątku, ale skąd taki wątek wziąć?

Zarządzanie pulą wątków

Java SE 5 dała programistom zestaw narzędzi do tworzenia współbieżnych aplikacji, w postaci fremeworka (java.util.concurrent,  java.util.concurrent.atomic i java.util.concurrent.locks) zawierającego m. in. bezpieczne (thread-safe) kolekcje, które mogą być modyfikowane i odczytywane z różnych wątków, pule wątków, samafory, atomowe zmienne, biblioteki do planowania, synchronizacji i koordynowania wykonywania zadań itp.

W naszym ćwiczeniu do równoległej realizacji zadań chciałbym wykorzystać pulę wątków. W ten sposób uzyskam kontrolę nad liczbą uruchomionych wątków (rozmiar puli, „zasobożerność” rozwiązania). Nie będę musiał za każdym razem ich tworzyć na nowo  ale będę „pożyczał” i „oddawał” je z i do puli wtedy, kiedy będą potrzebne (co zmniejsza koszty tworzenia, inicjowania i niszczenia wątków). Dlatego z całej palety dostępnych narzędzi wybieram ExecutorService.

Sam interfejs ExecutorService definiuje spójny sposób wskazywania zadań do wykonania, a framework dodatkowo dostarcza gotowe implementacje dające różne pożądane zachowania (dalej będę nazywał je wykonawcami lub executorami).

Podstawową implementacją jest klasa ThreadPoolExecutor, która pozwala na konfigurację m. in.:

  • W jakiej kolejności wykonywane będą zadania (FIFO/LIFO, na podstawie priorytetu),
  • Ile zadań może być uruchomionych jednocześnie,
  • Ile zadań może być w kolejce oczekujących do wykonania,
  • Jak postępować z nowymi zadaniami w przypadku kompletnego zapełnienia kolejki,
  • Czy wywoływać dodatkowe akcje przed/po wykonaniu zadania (np. powiadomienia, liczenie statystyk itp.).

Dodatkowo framework zawiera klasę Executors, która pozwala na utworzenie prekonfigurowanych wykonawców, przy pomocy następujących metod statycznych:

  • newSingleThreadExecutor – zwraca ExecutorService z dokładnie jednym wątkiem;
  • newFixedThreadPool – zwraca pulę wątków, której maksymalna pojemność jest ściśle określona; wątki tworzone są na bieżąco dla nowych zadań aż do osiągnięcia rozmiaru maksymalnego, a potem pula utrzymuje ten rozmiar (jeśli jakiś wątek “zginie”, tworzony jest w jego miejsce nowy);
  • newCachedThreadPool – zwraca pulę z nieograniczoną pojemnością;  maksymalna liczba wątków nie jest zdefiniowana, aczkolwiek wykonawca będzie uwalniał zasoby (usuwał wątki), gdy zmaleje obciążenie (wykorzystanie puli) i dodawał nowe, gdy obciążenie rośnie;
  • newSingleThreadScheduledExecutor – wykonawca analogiczny do tego z newSingleThreadExecutor, który dodatkowo umożliwia uruchomienie zadania z zadanym opóźnieniem lub wykonywanie cykliczne;
  • newScheduledThreadPool – wykonawca z pulą o ustalonej liczbie wątków, który dodatkowo umożliwia uruchomienie zadania z zadanym opóźnieniem lub wykonywanie cykliczne.

Poniższy fragment kodu:

final int numberOfThreads = 256;  
final ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads);

utworzy wykonawcę o profilu najbardziej odpowiednim dla naszego celu – pula o stałym rozmiarze. Na razie arbitralnie ustalam maksymalną liczbę wątków na 256. Oczywiście powinna ona zostać dobrana na podstawie wiedzy/badania charakterystyki obciążenia podczas wykonania zadań.

Osadzenie puli wątków w aplikacji web

Na razie zaparkuję temat dostrojenia puli (doboru rozmiaru i metody postępowania w przypadku jej zapełnienia) i skupię się na tym, jak powołać ją do życia w naszej aplikacji webowej.

Najkorzystniejsze wydaje się utworzenie takiej puli podczas startu całej aplikacji i zamknięcie jej przy wyłączeniu aplikacji (zatrzymanie aplikacji, undeploy, zamknięcie serwera). Naturalnym miejscem na tego typu akcje jest Listner nasłuchujący na zdarzenia związane z cyklem życia aplikacji (impolementujący ServletContextListener).

Tworzę następującą klasę:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package pl.atena.technoblog.parallel;  
 
import java.util.concurrent.*;  
 
import javax.servlet.*;  
import javax.servlet.annotation.WebListener;  
 
@WebListener  
public class MyListener implements ServletContextListener {  
    @Override  
    public void contextInitialized(ServletContextEvent sce) {  
        System.out.println("Servlet Context initialized.");  
        final int numberOfThreads = 256;  
        final ExecutorService threadPool = Executors  
                .newFixedThreadPool(numberOfThreads);  
 
        final ServletContext servletContext = sce.getServletContext();  
        servletContext.setAttribute("myExecutorService", threadPool);  
    }  
 
    @Override  
    public void contextDestroyed(ServletContextEvent sce) {  
        System.out.println("Servlet Context destroyed.");  
        final ExecutorService threadPool = (ExecutorService) sce  
                .getServletContext().getAttribute("myExecutorService");  
        // stop accepting new tasks  
        threadPool.shutdown();  
        try {  
            // wait for task to terminate  
            threadPool.awaitTermination(1, TimeUnit.MINUTES);  
        } catch (InterruptedException e) {  
            // enough waiting!  
            threadPool.shutdownNow();  
        }  
    }  
}

Anotacja @WebListener w linii 8 zawiadamia serwer, że klasa MyListener ma być powiadamiana o zdarzeniach typu tworzenie i niszczenie kontekstu aplikacji.

W metodzie contextInitialized (odpowiadającej uruchomieniu aplikacji), w liniach 13-15 tworzony jest nowy ExecutorService. W liniach 17-18 wykorzystujemy mechanizm współdzielenia informacji w postaci ServletContext –  obiekt threadPool dodany zostaje do kontekstu. Dzięki takiemu osadzeniu w dowolnym serwlecie aplikacji możemy uzyskać dostęp do puli wątków przy pomocy prostego wywołania:

final ServletContext servletContext = this.getServletContext();
final ExecutorService executorService = (ExecutorService) servletContext
        .getAttribute("myExecutorService");

Metoda contextDestroyed odpowiada za sprzątanie przy zamknięciu aplikacji. threadPool.shutdown(); zamyka pulę – od tego momentu żadne nowe zadania nie będą przez executor przyjmowane. Po threadPool.awaitTermination(1, TimeUnit.MINUTES); aplikacja czekać będzie minutę aż do zakończenia aktualnie wykonywanych zadań. Jeśli tak się nie stanie, wygenerowany zostanie wyjątek InterruptedException. W linii 33 przechwytujemy wyjątek i wymuszamy zakończenie pracy uruchomionych wątków.

Równoległe wykonanie zadań

Wiemy już, jak asynchronicznie wystartować zadanie oraz jak utworzyć i osadzić w aplikacji webowej pulę wątków. Teraz skorzystajmy z tej wiedzy. Na podstawie poprzedniego servlet’a wywołującego zadania jedno po drugim (Serial) piszę taką klasę (Parallel), która wykorzystuje stworzoną pulę połączeń i uruchamia zadania równolegle:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package pl.atena.technoblog.parallel;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.*;

@WebServlet("/parallel")
public class Parallel extends HttpServlet {
    private static final long serialVersionUID = -152801880710512087L;

    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // get the number of workers to run
        int count = this.getRequestedCount(request, 4);
        // get the executor service
        final ServletContext sc = this.getServletContext();
        final ExecutorService executorService = (ExecutorService) sc
                .getAttribute("myExecutorService");
       
        Long t1 = System.nanoTime();
        // create the workers
        List workers = new ArrayList();
        for (int i = 0; i < count; i++) {
            RunnableWorkUnit wu = new RunnableWorkUnit("RunnableTask"
                    + String.valueOf(i + 1));
            workers.add(wu);
        }
        // run the workers through the executor
        for (RunnableWorkUnit wu : workers) {
            executorService.execute(wu);
        }
        Long t2 = System.nanoTime();
        Writer w = response.getWriter();
        w.write(String.format("\n Request processed in %dms!", (t2 - t1) / 1000000));
        w.flush();
        w.close();
    }
    /**
     * Method for getting the requested workers' count.
     * If no such param in the request, assign defaultValue.
     */
    private int getRequestedCount(HttpServletRequest request, int defaultValue) {
        int result = defaultValue;
        String param = request.getParameter("count");
        if (param != null) {
            result = Integer.parseInt(param);
        }
        return result;
    }
}

Jak widać, logika nie różni się znacznie od swojego synchronicznego poprzednika. A oto najciekawsze fragmenty powyższego kodu:

  • w liniach 20-22 z kontekstu pobieramy ExecutorService,
  • w linii 34 przekazujemy wykonanie zadania do tego ExecutorService.

Sprawdźmy, jak to zadziała w praktyce:

http://localhost:8080/TestExecutorService/parallel?count=4

Na konsoli zobaczymy mniej więcej taki ślad wykonania:

1359185521876:RunnableTask2 started (pool-1-thread-2)
1359185521876:RunnableTask3 started (pool-1-thread-3)
1359185521877:RunnableTask1 started (pool-1-thread-1)
1359185521877:RunnableTask4 started (pool-1-thread-4)
1359185525877:RunnableTask2 finished in 4003 ms
1359185525878:RunnableTask3 finished in 4004 ms
1359185525878:RunnableTask1 finished in 4004 ms
1359185525878:RunnableTask4 finished in 4004 ms

Z zapisów konsoli widać, że wszystkie taski uruchomione zostały mniej więcej w tym samym czasie i wszystkie skończyły się po około czterech sekundach. Wszystko wydaje się działać zgodnie z założeniem. Wątpliwość może natomiast budzić tekst zwracany przez servlet:

Request processed in 5ms!

Wynika z niego, że odpowiedzieliśmy na wywołanie http, nie czekając na zakończenie działania wszystkich wystartowanych workerów. To z kolei sprowadza nas ponownie do pytania postawionego na początku tej sekcji: jak koordynować wykonanie zadań?

Koordynowanie zadań

java.util.concurrent zawiera cztery klasy realizujące specyficzne wzorce synchronizacji.  Semaphore to klasyczny idiom współbieżności. CountDownLatch to bardzo proste i często wykorzystywane rozwiązanie pozwalające na blokowanie przepływu sterowania do momentu, gdy osiągnięta będzie określona liczba sygnałów, zdarzeń czy warunków. CyclicBarrier pozwala na synchronizację postępu wykonywania jakiegoś algorytmu przez wątki (może zostać np. wykorzystana tak, że bariera wstrzymuje wykonanie wątków do momentu, aż wszystkie wątki ją osiągną). Z kolei Exchanger reprezentuje punkt, w którym dwa wątki mogą się „spotkać” i wymienić obiektami. Po takiej zamianie wątki będą mogły kontynuować działanie.

W przypadku naszej modelowej aplikacji mamy prosty algorytm synchronizacji: równolegle wykonywane zadania nie są od siebie zależne i wystarczy, że główny wątek poczeka na zakończenie wszystkich tasków, zanim da odpowiedź. Do realizacji tego zadania wystarczy zwykły „licznik” ustawiony na wartość równą liczbie zadań do wykonania, który zmniejszać będzie się za każdym razem, kiedy poszczególne zadania zostaną zakończone. Dokładnie tak działa CountDownLatch.

Oto zmodyfikowana postać servleta (klasa Parallel2):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package pl.atena.technoblog.parallel;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.*;

@WebServlet("/parallel2")
public class Parallel2 extends HttpServlet {
    private static final long serialVersionUID = -152801880710512087L;

    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // get the number of workers to run
        int count = this.getRequestedCount(request, 4);
        // get the executor service
        final ServletContext sc = this.getServletContext();
        final ExecutorService executorService = (ExecutorService) sc
                .getAttribute("myExecutorService");
        // create work coordinator
        CountDownLatch countDownLatch = new CountDownLatch(count);
       
        Long t1 = System.nanoTime();
        // create the workers
        List workers = new ArrayList();
        for (int i = 0; i < count; i++) {
            RunnableWorkUnit2 wu = new RunnableWorkUnit2("RunnableTask"
                    + String.valueOf(i + 1), countDownLatch);
            workers.add(wu);
        }
        // run the workers through the executor
        for (RunnableWorkUnit2 wu : workers) {
            executorService.execute(wu);
        }

        try {
            System.out.println("START WAITING");
            countDownLatch.await();
            System.out.println("DONE WAITING");
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
       
        Long t2 = System.nanoTime();
        Writer w = response.getWriter();
        w.write(String.format("\n Request processed in %dms!", (t2 - t1) / 1000000));
        w.flush();
        w.close();
    }
    /**
     * Method for getting the requested workers' count.
     * If no such param in the request, assign defaultValue.
     */
    private int getRequestedCount(HttpServletRequest request, int defaultValue) {
        int result = defaultValue;
        String param = request.getParameter("count");
        if (param != null) {
            result = Integer.parseInt(param);
        }
        return result;
    }
}

i klasy reprezentującej zadanie:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package pl.atena.technoblog.parallel;

import java.util.concurrent.CountDownLatch;

public class RunnableWorkUnit2 implements Runnable {
    private String name;
    private CountDownLatch countDownLatch;
    public RunnableWorkUnit2(String name, CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(String.format("%d:%s started (%s)", System.currentTimeMillis(), this.name,Thread.currentThread().getName()));
        try {
            // complicated business logic:
            Long t1 = System.nanoTime();
            Thread.sleep(4000);
            Long t2 = System.nanoTime();
           
            System.out.println(String.format("%d:%s finished in %d ms", System.currentTimeMillis(),this.name, (t2 - t1) / 1000000));
            System.out.println("Counting down: " +  this.name);
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            System.out.println(this.name + ": error");
            e.printStackTrace();
        }
    }
}

Poniżej zestawiam ważniejsze zmiany w stosunku do poprzedniej wersji.

  • W linii 22 (klasa Parallel2) tworzymy nowy obiekt CountDownLatch i inicjujemy jego licznik.
  • W liniach 30-31, tak jak poprzednio, tworzymy nowe workery. Jedyną różnicą jest to, że przekazujemy do konstruktora referencję na countDownLatch. Dzięki temu będą one mogły „powiadomić” o zakończeniu działania.
  • W linii 41 wstrzymujemy wykonanie programu, do momentu kiedy licznik countDownLatch osiągnie 0, czyli zakończą działanie wszystkie wątki. Klasa CountDownLatch ma również taką wersję metody await(), która czeka na wykonanie wszystkich wątków, ale nie dłużej niż zadany timeout.
  • W klasie RunnableWorkUnit w konstruktorze zapamiętujemy countDownLatch, z którym worker będzie się komunikował.
  • Linia 23 jest tutaj kluczowa: wątek informuje o zrealizowaniu logiki biznesowej.

Po uruchomieniu:

http://localhost:8080/TestExecutorService/parallel2?count=4

na konsoli otrzymamy:

START WAITING
1359187820290:RunnableTask3 started (pool-2-thread-3)
1359187820290:RunnableTask4 started (pool-2-thread-4)
1359187820290:RunnableTask1 started (pool-2-thread-1)
1359187820291:RunnableTask2 started (pool-2-thread-2)
1359187824295:RunnableTask3 finished in 3999 ms
Counting down: RunnableTask3
1359187824296:RunnableTask2 finished in 4000 ms
Counting down: RunnableTask2
1359187824296:RunnableTask4 finished in 4000 ms
Counting down: RunnableTask4
1359187824296:RunnableTask1 finished in 4000 ms
Counting down: RunnableTask1
DONE WAITING

a w przeglądarce zobaczymy:

Request processed in 4008ms!

W tym wydaniu servlet odpowiedział po około czterech sekundach. Z logu (konsoli) widać, jak poszczególne workery były po kolei uruchamiane („RunnableTask3 started”), jak kończyły działanie i „odhaczały się” w countDownLatch („Counting down: RunnableTask3”) i jak po ostatnim workerze program zakończył oczekiwanie („DONE WAITING”).

Wszystko zadziałało tak, jak tego sobie życzyliśmy. Osiągnęliśmy założony cel (zrównoleglenie zadań). Po prostu pięknie!

W zasadzie na tym mógłbym skończyć eksperyment.

Mógłbym, gdyby nie małe „ale”…

Wysycenie puli

Dotychczasowe próby przeprowadzałem w bardzo komfortowych warunkach. Pula wątków miała pojemność256, aobciążałem ją jednostkowo niewielką liczbą zadań (4). Co by się jednak stało, gdyby liczba zadań i równoczesnych wywołań były znacznie większe? Jak aplikacja będzie zachowywać się, gdy wolnych wątków zabraknie?

Zasymulujmy wyczerpanie puli, poprzez drobną modyfikację kodu ją tworzącego. W klasie MyListener zmieńmy rozmiar puli na 1 (numberOfThreads = 1, linia 13 listingu klasy MyListener) i teraz obciążmy serwer:

http://localhost:8080/TestExecutorService/parallel2?count=4

Tym razem na konsoli zobaczymy:

START WAITING
1359188146172:RunnableTask1 started (pool-1-thread-1)
1359188150175:RunnableTask1 finished in 4000 ms
Counting down: RunnableTask1
1359188150176:RunnableTask2 started (pool-1-thread-1)
1359188154180:RunnableTask2 finished in 4003 ms
Counting down: RunnableTask2
1359188154180:RunnableTask3 started (pool-1-thread-1)
1359188158183:RunnableTask3 finished in 4003 ms
Counting down: RunnableTask3
1359188158183:RunnableTask4 started (pool-1-thread-1)
1359188162188:RunnableTask4 finished in 4004 ms
Counting down: RunnableTask4
DONE WAITING

Pierwszy worker wystartował, zabierając jedyny dostępny wątek w puli. Drugi musiał poczekać aż poprzedni skończy działanie i odda wątek do puli. Podobnie każdy następny. W tym przypadku łączny czas wykonania wyniósł około szesnastu sekund, czyli tyle, ile przed zrównolegleniem. Ale jeśli takich żądań będzie więcej, może się okazać, że nawzajem podbierać będą sobie one wątki, powodując jeszcze większe opóźnienia.

Poniżej wyniki kilku prób uruchomienia jednocześnie czterech wywołań servlet’a parallel2:

  1. Request processed in 16017ms!
  2. Request processed in 32041ms!
  3. Request processed in 48055ms!
  4. Request processed in 64069ms!
  1. Request processed in 32034ms!
  2. Request processed in 36040ms!
  3. Request processed in 52056ms!
  4. Request processed in 64068ms!
  1. Request processed in 28026ms!
  2. Request processed in 40041ms!
  3. Request processed in 48049ms!
  4. Request processed in 64072ms!
  1. Request processed in 16016ms!
  2. Request processed in 32031ms!
  3. Request processed in 60055ms!
  4. Request processed in 64060ms!

W pierwszym przypadku złożyło się tak, że requesty „zgodnie” czekały, aż poprzedni zostanie obsłużony od A do Z. W pozostałych trzech przypadkach wyniki są ciekawsze. Wszystkie one wskazują, że przy zastosowanej przeze mnie implementacji czas obsługi requestów (w przypadku wysycenia puli) nie poprawił się, lecz uległ znacznej degradacji. Czas odpowiedzi jest znacznie gorszy, niż gdyby servlet pracował po staremu, we własnym wątku, po kolei odpalając workery.

Konfiguracja puli

No właśnie, a może jeśli pula zaczyna się przepełniać, pozwolić na samodzielne (tak jak w servlecie serial) uruchamianie tasków? Ale jak to zrealizować?

ThreadPoolExecutor, klasa  implementująca pulę, posiada metody pozwalające na obliczenie szacunkowej liczby wolnych wątków:

final ServletContext sctx = this.getServletContext();  
final ExecutorService executorService = (ExecutorService) sctx.getAttribute("myExecutorService");  
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) executorService);  
int free = tpe.getPoolSize() - tpe.getActiveCount();  

Teraz, widząc, że wolne miejsce się kończy, możemy coś dalej zadecydować. W logice obsługi żądania musielibyśmy zaszyć takie warunkowe rozdzielenie. Okazuje się jednak, że można obejść się bez takich kombinacji i użyć rozwiązania, które nie wymaga zmian implementacji istniejącej logiki tworzenia i uruchamiania wątków.

Framework pozwala na zdefiniowanie zachowania puli w stosunku do zadań, których nie jest w stanie obsłużyć (kiedy wykonawca został zamknięty lub kiedy osiągnięto maksymalną pojemność puli). Możemy definiować własne algorytmy albo wykorzystać predefiniowane zachowania:

Ten ostatni wariant pasuje świetnie do naszych potrzeb. Zmodyfikuję kod tworzenia executora (klasa MyListener), by go wykorzystać:

public void contextInitialized(ServletContextEvent sce) {
    System.out.println("Servlet Context initialized.3");
    final int numberOfThreads = 1;
    final ExecutorService threadPool = new ThreadPoolExecutor(
            numberOfThreads, // core thread pool size
            numberOfThreads, // maximum thread pool size
            0L, TimeUnit.MILLISECONDS,// time to wait before resizing pool
            new LinkedBlockingQueue<Runnable>(numberOfThreads),
            new ThreadPoolExecutor.CallerRunsPolicy());
 
    final ServletContext servletContext = sce.getServletContext();
    servletContext.setAttribute("myExecutorService", threadPool);
}

Tym razem konstruuję samodzielnie wykonawcę (co daje mi większe możliwości konfiguracyjne), na wzór tego, którego poprzednio uzyskałem korzystając z Executors. Do konstruktora, podpatrzonego w kodzie źródłowym metody newFixedThreadPool, dokładam argument odpowiedzialny za sposób postępowania z odrzuconymi wątkami (new ThreadPoolExecutor.CallerRunsPolicy()).

Teraz, teoretycznie, czas odpowiedzi powinien być lepszy niż w poprzedniej wersji. Zobaczmy, jak to wygląda w praktyce. Wywołanie /parallel2?count=4 da następujący wynik na konsoli:

1359189184358:RunnableTask1 started (pool-2-thread-1)
1359189184358:RunnableTask3 started (http-bio-8080-exec-7)
1359189188361:RunnableTask3 finished in 4003 ms
Counting down: RunnableTask3
1359189188361:RunnableTask1 finished in 4003 ms
Counting down: RunnableTask1
1359189188362:RunnableTask4 started (http-bio-8080-exec-7)
1359189188362:RunnableTask2 started (pool-2-thread-1)
1359189192364:RunnableTask2 finished in 4000 ms
Counting down: RunnableTask2
1359189192364:RunnableTask4 finished in 4000 ms
Counting down: RunnableTask4
START WAITING
DONE WAITING

Widać, że RunnableTask1 obsłużony został przez pulę (w nawiasie nazwa wątku uruchamiającego task – pool-2-thread-1). Ponieważ jedyny wolny wątek puli został zabrany, RunnableTask3 uruchomiony jest w wątku servlet’a (http-bio-8080-exec-7). Kończy działanie jako pierwszy, dlatego RunnableTask4 może być uruchomiony w tym samym wątku (http-bio-8080-exec-7). Po zakończeniu RunnableTask1 pula znowu ma wolne przebiegi, dlatego podejmuje wykonanie RunnableTask2.

Servlet odpowiada informacją:  “Request processed in 8009ms!”, co potwierdza opisany przebieg wydarzeń. Praca została rozłożona równomiernie między wątek puli i wątek servleta, co spowodowało skrócenie czasu wykonania o połowę (w stosunku do implementacji sekwencyjnej). Oczywiście, przy większym obciążeniu wyniki nie będą już takie „książkowe” (więcej żądań będzie „walczyło” o wolne wątki), ale czasy obsługi żądań powinny być w zdecydowanej większości nie gorsze niż dla implementacji bez puli połączeń.

Wnioski

Co udało się pokazać w ramach tego eksperymentu? Przede wszystkim okazuje się, że zrównoleglanie działań w aplikacjach webowych jest nie tylko możliwe, ale również całkiem proste. Zestaw standardowych narzędzi (zawartych w samej specyfikacji Java i nie wymagających bibliotek zewnętrznych) pozwala na utworzenie puli wątków, a jej wykorzystanie może znacznie skrócić czas wykonywania. Nawet przy dużym obciążeniu i wysyceniu puli, czasy obsługi żądań powinny być w zdecydowanej większości nie gorsze niż gdyby tej puli nie było wcale.

Dodatkowo uzyskaliśmy efekt swoistego przycinania pasma (throttling). Z jednej strony, kiedy pula jest zajęta, servlety ją wykorzystujące przejmują na siebie wykonywanie zadań, pozwalając jej się zregenerować. Z drugiej strony, przy większym obciążeniu żądania obsługiwane są dłużej, gdyż zadania uruchamiane są sekwencyjnie w servlet’ach. Przy odpowiedniej konfiguracji maksymalnej liczby wątków obsługujących request’y HTTP, serwer będzie miał chwilę, żeby „odrobić zaległości”.

Istnieje jeszcze kilka pokrewnych zagadnień, które być może warto przy tej okazji omówić. Na przykład nakładanie restrykcji czasowych na uruchamiane zadania (timeout), monitorowanie puli czy odczytywanie wyników zadań uruchamianych przy pomocy puli (wykorzystanie interfejsów Callable i Future )… Jednak i bez tego artykuł urósł do nieprzyzwoitych rozmiarów.

Dlatego niniejszym pozwolę sobie podziękować za uwagę i na tym zakończyć. 😉


O Karol Brejna

Pracuję w branży IT od roku 2000. W tym czasie brałem udział w wielu złożonych i wymagających projektach, w których pełniłem szereg ról (m. in. programista, główny projektant, architekt). Jako architekt, szczególnie lubię wzbogacanie naszego "zestawu narzędzi" o nowe technologie i produkty - poczynając od baz danych NoSQL (np. MongoDB, Redis, Neo4j), na kompletnych pakietach SOA kończąc (np. WSO2).

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *