Багатопотоковість у Java. Лекція 5: атомарні змінні та багатопотокові колекції

18 серпня 2020
Володимир Фролов, Java-розробник, Микита Сізінцев, Android-розробник
Багатопотоковість у Java. Лекція 5: атомарні змінні та багатопотокові колекції
Продовжуємо публікацію короткого курсу, написаного нашими колегами. У попередній лекції вони розповіли про пули потоків і чергу задач.

5.1 АТОМАРНІ ЗМІННІ

Розглянемо ситуацію, коли два чи більше потоків намагаються змінити спільний розділюваний ресурс — водночас виконувати операції читання або запису. Для уникнення ситуації race condition потрібно використовувати synchronized-методи, synchronized-блоки чи відповідні блокування. Якщо цього не зробити, ресурс буде в неконсистентному стані, і значення не матиме ніякого змісту. Проте використання методів synchronized чи synchronized-блоків коду — дуже дорога операція через високу вартість отримання і блокування потоку. Також цей спосіб є блокуючим, він суттєво зменшує продуктивність системи загалом.

Для розв’язання цієї проблеми створили так звані неблокуючі алгоритми — non blocking thread safe algorithms. Ці алгоритми називаються compare and swap (CAS) та базуються на тому, що сучасні процесори підтримують такі операції на рівні машинних інструкцій. З виходом Java 1.5 з'явилися класи атомарних змінних: AtomicInteger AtomicLong, AtomicBoolean, AtomicReference. Вони знаходяться в пакеті java.util.concurrent.atomic. Алгоритм compare and swap працює наступним чином: є комірка пам'яті, поточне значення в ній і те значення, яке хочемо записати у цю комірку. Спочатку комірка пам'яті читає і зберігає поточне значення, потім прочитане значення порівнюється з тим, яке вже є в комірці пам'яті, і якщо значення, прочитане раніше, збігається з поточним, відбувається запис нового значення. Варто згадати, що значення змінної після читання може бути змінене іншим потоком, тому що CAS не є блокуючою операцією.

5.2 ПРОБЛЕМА ABA ЧИ LOST UPDATE

Є два потоки. Потік А прочитав значення з пам'яті, після чого, припустимо, планувальник потоку перервав виконання потоку А. Потім потік B читає значення з пам'яті, а далі змінює його на інше декілька разів. Припустимо, що спочатку значення атомарної змінної було 5, потім 4, у кінці воно знову дорівнює 5. Вийшло так, що потік B останнього разу записав те саме значення, що було спочатку, тобто значення, яке було прочитано потоком А. Потім планувальник відновлює роботу потоку А, той порівнює значення, яке спочатку прочитав (5), із тим, що є в пам'яті зараз (теж 5). Ці значення рівні — виконується операція CAS. Виникає така ситуація, коли нове для системи значення, встановлене потоком B (яке, до речі, дорівнювало початковому значенню, яке ми спостерігали до початку роботи потоків А і В), затирається старим значенням, що мав встановити потік А.

Виникає закономірне запитання: чому значення втрачається? Адже потік А не записав у пам'ять значення, яке він намагався записати, а ось коли планувальник потоків відновив роботу потоку А, значення нарешті записалось?

Припустимо, що комірка пам'яті відображає якийсь глобальний стан системи або, наприклад, якусь циклічну адресу чи повторювану сутність. У цьому випадку така поведінка є просто неприпустимою. Розв’язати CAS-проблему може лічильник кількості змін. У першій операції при читанні значення з пам'яті відбувається також читання лічильника. При виконанні CAS-операції порівнюється значення пам'яті на поточний момент зі старим значенням, прочитаним раніше, та проводиться порівняння поточного значення лічильника зі значенням, прочитаним на попередньому кроці. Якщо в обох операціях порівняння отримано результат true, виконується CAS-операція та записується нове значення. Також варто зазначити, що під час запису нового значення за допомогою CAS-операції значення лічильника збільшується. Причому значення лічильника збільшується лише при запису!

Повертаючись до прикладу про lost update: коли потік А отримає керування та спробує виконати CAS-операцію, значення пам'яті будуть рівними, а значення лічильників — ні. Тому потік А знову прочитає значення з пам'яті та значення лічильника — і знову спробує виконати CAS-операцію. Це відбуватиметься доти, доки CAS-операцію не буде виконано успішно. Однак і при використанні лічильника виникають певні проблеми, розгляд яких виходить за рамки нашого циклу статей. На практиці наразі використовується алгоритм safe memory reclamation (SMR).

Розглянемо, як працює клас AtomicLong, у лістингах 1, 2 та 3.

Лістинг 1: клас AtomicLong

public final long getAndAdd(long delta) {
   return unsafe.getAndAddLong(this, valueOffset, delta);
}

Тут видно, що виконання делегується класу Unsafe.

Лістинг 2: клас Unsafe:

public final long getAndAddLong(Object object, long offset, long newValue) {
    long oldValue;
    do {
        oldValue = this.getLongVolatile(object, offset);
    } while(!this
        .compareAndSwapLong(object, offset, oldValue, oldValue + newValue));
    return oldValue;
}

Тут отримується старе значення, потім встановлюється нове за допомогою операції CAS у тому випадку, якщо між читанням і CAS-операцією жодні інші потоки не змінять значення. В іншому випадку, цикл виконується, доки цю умову не буде дотримано.

Лістинг 3: клас AtomicLong. Отримання зміщення поля value в об'єкті AtomicLong.

static {
    try {
        valueOffset = unsafe
            .objectFieldOffset(AtomicLong.class.getDeclaredField("value"));
    } catch (Exception ex) { 
        throw new Error(ex); 
    }
}

5.3 БАГАТОПОТОКОВІ КОЛЕКЦІЇ

Звичайні колекції у Java не є синхронізованими та не можуть бути безпечно використані в багатопотоковому середовищі. За винятком випадків, коли звернення до цих колекцій відбувається з синхронізованих блоків коду. Неправильне використання колекцій може призвести до неконсистентності даних або ConcurrentModificationException.

У Java є колекції, призначені для використання в багатопотоковому середовищі. Вони реалізують різні механізми синхронізації даних. До виходу Java 1.5 існували такі багатопотокові колекції: Stack, Vector, HashTable. Усі методи цих класів є synchronized. Це означає, що при виклику будь-якого методу цих класів інший потік буде заблоковано, навіть якщо викликаний метод є методом читання. Ці колекції з'явилися ще з версією Java 1 і в сучасній розробці їх використовувати не варто. З виходом Java 1.2 з'явився утилітний клас Collections, що надає статичні методи для обертання стандартних колекцій в їхні синхронізовані представлення. Це зроблено для сумісності з Java версією 1.

Після релізу Java 1.5, 1.6 і 1.7 з'явилася можливість використовувати класи колекцій, призначені й оптимізовані для роботи в багатопотоковому середовищі:

  • CopyOnWriteArrayList <E>
  • CopyOnWriteArraySet <E>
  • ConcurrentSkipListSet <E>
  • ConcurrentHashMap <K, V>
  • ConcurrentSkipListMap <K, V>
  • ConcurrentLinkedQueue <E>
  • ConcurrentLinkedDeque <E>
  • ArrayBlockingQueue <E>
  • DelayQueue <E extends Delayed>
  • LinkedBlockingQueue <E>
  • PriorityBlockingQueue <E>
  • SynchronousQueue <E>
  • LinkedBlockingDeque <E>
  • LinkedTransferQueue <E>

Перші дві колекції — copy on write структури. Третя колекція — skip list структура. Наступні дві — класи Map, призначені для використання в багатопотокових програмах. Використання класів цих колекцій дозволяє збільшити продуктивність програми у порівнянні з використанням застарілих класів із Java 1. Розглянемо статичні методи утилітарного класу Collections.

5.4 СТАТИЧНІ МЕТОДИ З КЛАСУ COLLECTIONS

Статичні методи для роботи з багатопотоковістю у класі Collections:

  • synchronizedCollection()
  • synchronizedList()
  • synchronizedMap()
  • synchronizedSet()
  • synchronizedSortedSet()
  • synchronizedSortedMap()

До звичайної колекції застосовують декоратор, який обертає кожен метод у synchronized-блок, — так при кожному читанні або зміні оберненої колекції відбувається блокування всіх інших операцій. Оскільки перераховані методи було додано для забезпечення сумісності, у нових багатопотокових програмах краще використовувати багатопотокові колекції, які ми розглянемо далі.

5.5 Copy On Write СТРУКТУРИ ДАНИХ

CopyOnWriteArrayList використовується, коли є багато потоків, що читають елементи з колекції, та кілька потоків, що рідко записують дані в колекцію. Copy on write структура створює нову копію даних під час запису в цю структуру. Це дозволяє декільком потокам водночас читати дані, а одному потоку — записувати елементи в колекцію в кожен конкретний момент часу. Всередині структура даних містить volatile масив елементів і при кожній зміні колекції — додаванні, видаленні або заміні елементів — створюється нова локальна копія масиву для змін. Після модифікації змінена копія масиву стає поточною. Масив елементів використовується з ключовим словом volatile для того, щоб усі потоки відразу побачили зміни в масиві. Такий алгоритм роботи з даними гарантує, що потоки, які читають, будуть читати незмінювані в часі дані, та не буде згенеровано ConcurrentModificationException при паралельній зміні масиву. Якщо для читання колекції використовується Iterator, при спробі викликати remove() при обході колекції буде згенеровано UnsupportedOperationException, тому що поточна копія колекції не підлягає зміні. Для операцій запису всередині класу CopyOnWriteArrayList створюється блокування, щоб у конкретний момент часу лише один потік міг змінювати copy on write структуру даних (лістинг 1). Приклад додавання елементу наведено в лістингу 4.

Лістинг 4:

final ReentrantLock lock = new ReentrantLock();
public boolean add(E e) {
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

Без блокування наявність декількох потоків, які змінюють структуру даних, може призвести до втрати даних, що записуються потоками. Декілька потоків роблять копію вихідного масиву даних, вносять зміни та записують їх. Якийсь із потоків завершує роботу швидше, якийсь — повільніше. Потік, що завершить роботу останнім, видалить зміни, зроблені іншими потоками. Із використанням блокування ReentrantLock така проблема зникає. Розглянемо програму з лістингу 5, яка показує, що читання елементів з CopyOnWriteArrayList відбувається швидше, ніж читання з цілком синхронізованої колекції або колекції, обгорнутої відповідним статичним методом із класу Collections.

Лістинг 5:

import java.util.*;
public class PerformanceComparision {
    public static void main(String[] args) 
            throws ExecutionException, InterruptedException {
        List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
        List<Integer> cowLst = new CopyOnWriteArrayList<>();
        fillLst(syncList);
        fillLst(cowLst);
        System.out.println("List synchronized:");
        checkList(syncList);
        System.out.println("CopyOnWriteArrayList:");
        checkList(cowLst);
    }
    private static void fillLst(List<Integer> list) {
        IntStream.rangeClosed(1, 100).forEach(list::add);
    }
    private static void checkList(List<Integer> list) 
            throws ExecutionException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService executors = Executors.newFixedThreadPool(2);
        Future<Long> f1 = executors.submit(new ListRunner(latch, 0, 49, list));
        Future<Long> f2 = executors.submit(new ListRunner(latch, 50, 99, list));
        executors.shutdown();
        latch.countDown();
        System.out.println("Thread1: " + f1.get()/1000);
        System.out.println("Thread2: " + f2.get()/1000);
    }
}

public class ListRunner implements Callable<Long> {
    private CountDownLatch latch;
    private int start;
    private int end;
    private List<Integer> list;
    public ListRunner(CountDownLatch latch, int start, int end, List<Integer> list) {
        this.latch = latch;
        this.start = start;
        this.end = end;
        this.list = list;
    }
    @Override
    public Long call() throws Exception {
        latch.await();
        Integer integer;
        long startTime = System.nanoTime();
        for (int i = start; i < end; ++i) {
            integer = list.get(i);
        }
        return System.nanoTime() - startTime;
    }
}

У програмі створюються два списки: CopyOnWriteArrayList і звичайний ArrayList, обгорнутий за допомогою статичного методу Collections.synchronizedList. Тобто всі методи класу будуть синхронізовані. Потім кожен список заповнюється значеннями від 1 до 100. Для цієї програми неважливо, якими конкретно значеннями будуть заповнені обидва списки. Потім створюються два потоки, що водночас читають один і той самий список. Один потік читає перші 50 елементів у списку, другий — наступні 50. Два потоки запускаються одночасно за допомогою блокування CountDownLatch. На консоль виводиться час, який кожен потік витратив на читання своєї половини списку.

Очевидно, що в синхронізованій версії списку лише один потік може читати дані в кожен момент часу. І у кращому випадку спочатку прочитати значення вдасться одному потоку, а лише потім це зробить другий. Протилежна ситуація спостерігається при використанні CopyOnWriteArrayList. У цій колекції декілька потоків можуть одночасно читати дані зі списку, не блокуючи один одного. Отже читання завершиться швидше, бо не буде взаємних блокувань.

Copy on write — колекція, що не підтримує дублікати та зберігає елементи в довільному порядку CopyOnWriteArraySet. Усередині CopyOnWriteArraySet використовується CopyOnWriteArrayList, і для нього характерні такі самі властивості.

5.6 SKIP LIST СТРУКТУРА ДАНИХ

Якщо необхідно мати відсортовану множину елементів, слід використовувати ConcurrentSkipListSet, що реалізує інтерфейси SortedSet і NavigableSet.

schemes-13

Реалізація ConcurrentSkipListSet базується на ConcurrentSkipListMap, структура ConcurrentSkipListSet схожа на структуру LinkedHashMap. Кожен елемент skip list структури, крім значення, містить посилання на сусідні елементи. Також є посилання вищих порядків, які вказують на елементи, що знаходяться попереду поточного, на довільне число в певному діапазоні, заданому для цього рівня посилань. Для наступного рівня посилань це число більше, ніж для попереднього.

Перевага цієї структури даних — пошук потрібного елементу відбувається дуже швидко за рахунок використання посилань вищих порядків. Продуктивність пошуку в цій структурі даних можна порівняти з пошуком елементів у бінарному дереві. Для вставки зміни цієї структури даних не потрібно цілком блокувати структуру, достатньо знайти елемент, який буде видалено, та заблокувати два сусідні елементи для зміни посилань, що вказують на елемент, який підлягає зміні.

5.7 ІНТЕРФЕЙС BlockingQueue

BlockingQueue — інтерфейс потокобезпечних черг, у які декілька потоків можуть записувати дані та читати їх звідти. Це досягається за рахунок здатності черги блокувати потік, що додає або читає елементи з черги. Наприклад, коли потік намагається отримати елемент із черги, але черга порожня — потік блокується. Або коли потік намагається покласти об'єкт у чергу, яка вже заповнена — потік теж блокується.

Блокуючі черги використовуються, коли одні потоки додають елементи в чергу, а інші — читають їх звідти. Цей патерн відомий як producer consumer. Потоки, що додають об'єкти в чергу, називаються producer (виробник). Додавати елементи в чергу можна, доки вона не заповниться. Потоки, що читають значення, називаються consumer (споживач). Читати з черги можна тільки у випадках, коли в ній є елементи.

BlockingQueue зберігає свої елементи за принципом FIFO. Елементи відсортовані за часом перебування в черзі, а голова знаходиться там довше за всі інші.

BlockingQueue не підтримує значення null, при спробі вставити його генерується NullPointerException.

Інтерфейс BlockingQueue підтримує чотири набори методів, що підтримують різну поведінку при додаванні чи отриманні елементів із черги.

 

 

Генерація виключення

Спеціальне значення

Блокування потоку

Тайм аут

Додавання

add(o)

offer(o)

put(o)

offer(o, timeout)

Видалення

remove(o)

poll()

take()

poll(timeout)

Перевірка

element()

peek()

   

 

Чотири типи поведінки означають наступне:

  1. Якщо негайне виконання операції є неможливим, генерується виняток.
  2. Якщо негайне виконання операції є неможливим, повертається значення — здебільшого true або false.
  3. Якщо негайне виконання операції є неможливим, потік, що виконує цю операцію, блокується.
  4. Якщо операцію не можна виконати негайно, потік, що викликав цей метод, блокується не довше ніж на час, вказаний у методі. Методи, залежно від успіху закінчення, повертають true або false.

Класи, що реалізують інтерфейс BlockingQueue: ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriopityBlockingQueue, SynchronousQueue.

Черга ArrayBlockingQueue має кінцевий розмір. Елементи у цій черзі зберігаються в масиві. Кількість елементів у черзі задається в конструкторі та не може бути змінена після її створення.

DelayQueue — черга з затримкою. Коли потік хоче отримати елемент із черги, елемент повертається, після того як затримка для конкретного елементу минула. Кожен елемент, що буде поміщений у чергу DelayQueue, повинен реалізовувати інтерфейс Delayed, де є єдиний метод long getDelay(). Цей метод повинен повертати решту часу затримки для цього елементу у правильних одиницях TimeUnit. Коли викликається метод отримання елементу з черги, DelayQueue викликає метод getDelay() для визначення, який елемент має бути повернуто з черги. Якщо цей метод повернув значення, близьке до нуля, нуль чи від'ємне значення, цей елемент може бути повернуто з черги. Також елемент, що повинен міститися в DelayQueue, повинен реалізувати метод compareTo(), тому що елементи в черзі упорядковано. Елемент, час затримки якого закінчиться найшвидше, поміщається в голову черги.

У DelayQueue можна поміщати елементи, для яких виклик методу getDelay() повертає нульове або негативне число. Потік, який читає елементи з черги, отримає цей елемент негайно. Приклад програми з використанням DelayQueue наведено в лістингу 6.

Лістинг 6:

import com.google.common.primitives.Ints;
public class DelayObject implements Delayed {
    private String data;
    private long startTime;
    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return Ints.saturatedCast(this.startTime - ((DelayObject) o).startTime);
    }
}

public class DelayQueueProducer implements Runnable {
  
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToProduce;
    private Integer delayOfEachProducedMessageMilliseconds;
    public DelayQueueProducer(BlockingQueue<DelayObject> queue,
        Integer numberOfElementsToProduce,
        Integer delayOfEachProducedMessageMilliseconds) {
        this.queue = queue;
        this.numberOfElementsToProduce = numberOfElementsToProduce;
        this.delayOfEachProducedMessageMilliseconds =   
            delayOfEachProducedMessageMilliseconds;
    }
    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object = new DelayObject(UUID.randomUUID().toString(),     
                delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);
            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

public class DelayQueueConsumer implements Runnable {
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToTake;
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();
    public DelayQueueConsumer(BlockingQueue<DelayObject> queue,
        Integer numberOfElementsToTake) {
        this.queue = queue;
        this.numberOfElementsToTake = numberOfElementsToTake;
    }
    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class TestDelayQueue {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
     
        BlockingQueue<DelayObject> queue = new DelayQueue<>();
        int numberOfElementsToProduce = 2;
        int delayOfEachProducedMessageMilliseconds = 500;
        DelayQueueConsumer consumer = new DelayQueueConsumer(
            queue, numberOfElementsToProduce);
        DelayQueueProducer producer = new DelayQueueProducer(
            queue, numberOfElementsToProduce,
                delayOfEachProducedMessageMilliseconds);
        executor.submit(producer);
        executor.submit(consumer);
        executor.awaitTermination(5, TimeUnit.SECONDS);
        executor.shutdown();
  
        System.out.println(“Is number of consumed elements equal 
            to number of produced elements? ” + 
                consumer.numberOfConsumedElements.get() ==   
                numberOfElementsToProduce);
    }
}

Клас PriorityBlockingQueue — безрозмірна черга. Усі елементи, що вставляються у PriorityBlockingQueue, повинні реалізовувати інтерфейс java.lang.Comparable або допускати сортування за natural ordering. Ітератор, отриманий для цієї колекції, не гарантує ітерацію за елементами в порядку пріоритету. Якщо елементи, що будуть додаватися в чергу, не реалізують інтерфейс Comparable, можна передати компаратор при створенні об'єкта PriorityBlockingQueue.

Клас SynchronousQueue для додавання елементу в чергу викликає метод put(). При виклику цього методу викликаючий потік блокується, поки інший потік не викличе метод take(). Також клас SynchronousQueue можна уявити як точку синхронізації між двома потоками, коли один потік дає елемент, а другий — його отримує. Також про SynchronousQueue кажуть, що це черга без єдиного елементу.

Оскільки SynchronousQueue є чергою без елементів, ця колекція поводиться специфічно: метод isEmpty() завжди повертає true, метод iterator() повертає порожній ітератор, метод hasNext() завжди повертає false, peek() завжди повертає null, size() завжди повертає 0.

Приклад використання SynchronousQueue як producer consumer представлено в лістингу 7.

Лістинг 7:

import java.util.Random;
import java.util.concurrent.*;
public class SynchQueue {
    public static void main(String[] args) {
        BlockingQueue<Integer> syncQueue = new SynchronousQueue<>();
        Producer producer = new Producer(syncQueue);
        producer.start();
        Consumer consumer = new Consumer(syncQueue);
        consumer.start();
    }
}

class Producer extends Thread {
    private BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    public void run() {
        while (true) {
            try {
                queue.put(produce());
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
    private Integer produce() {
        Random randomer = new Random();
        Integer number = randomer.nextInt(1000);
        try {
            Thread.sleep(randomer.nextInt(1000));
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
        System.out.println("Producer: created number: " + number);
        return number;
    }
}

class Consumer extends Thread {
    private BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    public void run() {
        while (true) {
            try {
                System.out.println("Consumed number = " + queue.take());
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

У цій програмі producer генерує довільні числа, а consumer виводить їх на екран. Згенерувавши довільне число, потік producer присипляє його на довільний час. У цей час потік consumer знаходиться у стані WAITING. Після того як об'єкт поміщено в SynchronousQueue, потік consumer прокидається і виводить на консоль отримане значення. Потік producer у цей нетривалий час знаходиться у стані WAITING. І так триватиме нескінченно, доки програму не зупинять.

БАГАТОПОТОКОВІ РЕАЛІЗАЦІЇ ІНТЕРФЕЙСУ MAP

Є три реалізації інтерфейсу Map для роботи в багатопотокових колекціях: HashTable, ConcurrentHashMap і ConcurrentSkipListMap. У класі HashTable усі методи є synchronized, і цю структуру даних можна використовувати в багатопотокових програмах. Однак цей клас уже застарів, і його використання не рекомендується. При операції читання або запису блокуються всі інші потоки, які хочуть щось змінити чи прочитати цю структуру даних. Очевидно, що продуктивність HashTable є дуже низькою. У сучасних програмах у критичних секціях рекомендується використовувати ConcurrentHashMap або ConcurrentSkipListMap. Розглянемо роботу цих класів докладніше.

ConcurrentHashMap. Перш за все, звернемось до структури класу HashMap. Є масив бакетів, усередині кожного з яких знаходиться або пов'язаний список, або бінарне дерево. Усередині ConcurrentHashMap складається з масиву сегментів, кожен з яких містить окрему HashMap з масивом бакетів.

sschemes-14

При додаванні пари “ключ–значення” у таку структуру даних відбувається блокування окремого сегмента, а після додавання пари “ключ–значення” все відбувається, як у звичайній карті. Якщо відбувається читання, сегмент не блокується, і кілька потоків можуть читати дані одночасно. Операції зміни карти можуть відбуватися одночасно в тому випадку, коли потоки звертаються до різних сегментів. Два потоки не можуть одночасно змінювати один і той самий сегмент. Однак один потік може здійснювати зміни в сегменті, а інший потік може паралельно читати той самий сегмент. Це можливо завдяки тому, що при читанні даних сегмент не блокується. При одночасному читанні й модифікації даних у результаті читання повернеться останнім змінене значення. У ConcurrentHashMap для кожного сегмента є своє власне блокування. За замовчанням кількість сегментів дорівнює 16. Кількість сегментів розраховується на підставі параметра int concurrencyLevel. Це додатковий параметр, який можна передати в конструктор при створенні класу. Кількість сегментів за замовчанням означає, що в карту можуть записувати 16 потоків. Кількість сегментів розраховується на підставі concurrencyLevel. Число два зводиться у ступень, починаючи від 1, причому результат має бути більше або дорівнювати параметру concurrencyLevel. Наприклад:
Количество сегментов = 2 ^ 1 = 2   >= 10 (False)

Кількість сегментів = 2 ^ 2 = 4   >= 10 (False)

Кількість сегментів = 2 ^ 3 = 8   >= 10 (False)

Кількість сегментів = 2 ^ 4 = 16 >= 10 (True)

При concurrencyLevel, що дорівнює 10, кількість сегментів — 16. Кількість бакетів у кожному сегменті розраховується за такою формулою:

2^x >= (initialCapacity/concurrencyLevel), де X — це кількість бакетів у сегменті.

schemes-15.png

Рехешування у ConcurrentHashMap відбувається окремо в кожному сегменті, тому воно може виконуватися водночас із записом до іншого сегмента.

ConcurrentSkipListMap. Цей клас використовує принцип skipList, розглянутий раніше. ConcurrentSkipListMap сортує ключі відповідно до natural sortіng або за логікою компаратора, що передається конструктору. Клас реалізує інтерфейси SortedMap, NavigableMap, ConcurrentMap, ConcurrentNavigableMap. ConcurrentSkipListMap гарантує виконання всіх основних операцій за O(log(n)).

ConcurrentSkipListMap підтримує методи навігації lowerEntry, floorEntry, ceilingEntry і higherEntry, які повертають об'єкти Map.Entry “менше”, “менше або дорівнює”, “більше або дорівнює” та “більше”, ніж переданий ключ. Методи lowerKey, floorKey, ceilingKey і higherKey повертають тільки ключі. ConcurrentSkipListMap не підтримує null значення ані у ключах, ані у значеннях.

  • Україна, Remote.UA; Україна, Дніпро; Україна, Київ; Україна, Львів; Україна, Одеса; Україна, Харків; Україна, Херсон
    19 листопада
  • Україна, Remote.UA; Україна, Дніпро; Україна, Київ; Україна, Львів; Україна, Одеса; Україна, Харків; Україна, Херсон
    18 листопада
  • Україна, Дніпро; Україна, Харків; Україна, Херсон
    1 листопада
  • Україна, Remote.UA; Україна, Дніпро; Україна, Київ; Україна, Львів; Україна, Одеса; Україна, Харків; Україна, Херсон
    30 жовтня