Модель акторов и её реализация

  • Автор темы Автор темы gattsu
  • Дата начала Дата начала

gattsu

Выдающийся
Проверенный
Победитель в номинации 2018
Победитель в номинации 2017
За веру и верность форуму
За заслуги перед форумом
Преподаватель
Сообщения
165
Розыгрыши
0
Репутация
1 457
Реакции
293
Баллы
1 393
Долго мучаюсь, но вот уже созреваю до конечной программной концепции, своей реализации модели акторов.

Для общего ознакомления рекомендую почитать статью на википедиии


Для полного ознакомления, рекомендую почитать:

Более полные работы




Очень хорошее

Нюансы моей реализации:
- в системе есть фиксированное количество потоков(worker).
- отсутствует блокирующая синхронизация
- синхронизация с помощью atomic(в класе EventBus)
- распределение вычислений делайте actorsystem(global

Пытаюсь сделать полностью однородную среду, реализовать по принципу "все есть актор". Сейчас переписываю в 100 раз. Пытаюсь замкнуть все контексте и производить обработку пулом, но при этом оставить однородность. Если будут вопросы пишите, пока не могу расписать, что да как, но думаю на конкретные вопросы смогу ответить. Думаю уже близко к концу, удовлетворяющему меня варианту.


ПС пакет actor.test содержит пару примеров реализации.
Да, про другие реализации модели акторов, автор знает и пробовал на них делать программы.

Немного абстрактного кода для примера из ла2
Код:
class Account extends BaseActor {
    public static final Logout logout = new Logout();
  
    static final LoginSuccess success = new LoginSuccess();
  
    final String login;
  
    String password;
  
    {
        behavior(State.FREE)
            .Case(
                auth -> {
                    if(password.equals(auth.password)) {
                        reply(success);
                        become(State.AUTH);
                    } else
                        reply(LoginFail.WRONG_PASSWORD);
                },
                Auth.class
            );
          
        behavior(State.AUTH)
            .Case(
                auth -> reply(LoginFail.IN_USE),
                Auth.class
            )
            .Case(
                logout -> become(State.FREE),
                Logout.class
            );
        ...
    }
  
    enum State {
        FREE,
        AUTH,
        BLOCK,
        ...
    }
  
  
    public static class Auth {
        final String password;
      
    }
  
    public static class LoginSuccess {
  
    }
  
    public static class Logout {
  
    }
  
    public static enum LoginFail {
        IN_USE,
        WRONG_PASSWORD,
        ....
    }
}
 
  • Мне нравится
Реакции: kick

    Mangol

    Баллов: 35
    Без комментариев

    kick

    Баллов: 37
    За сообщение

Обновления:
Создал новую ветку.
Пробую реализовать, другим способом:
- уменьшилось количество этапов передачи сообщения(Сообщение передается на прямую без посредником, посредник только при отправке сообщения через не связанную ссылку)
- удалось увеличить количество обрабатывающих сообщений на 70-80%. (для разного рода действий, следует организовывать разную балансировку между акторами, можно попробовать реализовать рекомендацию, для балансировщика, при создании актора, по принципу привязки к одному воркеру и тп)
- Атомарная синхронизация в каждом mailbox
- через класс Balancer, происходит передача, Worker-ам, на исполнение событий.

ПС В один момент времени класс актор, может обрабатывать только одно входящее сообщение
 
  • Мне нравится
Реакции: kick

    Mangol

    Баллов: 35
    Без комментариев

    kick

    Баллов: 37
    За сообщение
Расширил функционал очереди. Добавил различные функции, которые учитывают указанную задержку, для обработки.
Пробую алгоритм рекурсивных сообщений, в различных случаях позволяет добиться 15-20 миллионов сообщений в секунда, на 3000 акторов.(тест на счетчиках)
Система с единичным акторов позволяет достичь 5 миллионов сообщений(единичный счетчик)
Тест конечно частного случая, но позволяют следить за пропускной способность, задержкой. Если добавить нагрузку в обработку сообщений, можно получить меньший результат.

Уменьшил количество мест синхронизаций, до двух, на одну обработку события.
Потоки могут войти в монитор, при обращении к одному и тому же актору(публикация события)
Потоки могут войти в монитор, если при балансировке, указал один и тот же поток порождения задачи, для двух других.

Прикрепил общую схему работы воркеров(потоков)
 

Вложения

  • Untitled Diagram (2).webp
    Untitled Diagram (2).webp
    25,9 КБ · Просмотры: 266
  • Мне нравится
Реакции: kick

    Mangol

    Баллов: 35
    Без комментариев

    kick

    Баллов: 37
    За сообщение
Продолжим.
Реализовал систему с помощью обычной синхронизации, не блокирующие очереди оставил на потом. Реализовано все топорно, чтобы работал фундамент.
На данный момент можно:
- создавать актор
- отправлять сообщение актору
- делать запрос с ожиданием ответа к актору.

Основные принципы акторов.
Каждый актор в один момент времени, может обрабатывать только одно входящее сообщение.
Во время обработки сообщения актор может:
- менять свое поведения, для реакции на следующее сообщение.
- посылать сообщения другим акторам, включая себя(рекурсивное сообщение)
- создавать других акторов
 
  • Мне нравится
Реакции: kick

    kick

    Баллов: 39
    За сообщение
"Один актор не является актором. Актор является частью системы акторов"
Рассмотри простой пример двух акторов.
У нас, есть два актора Ping и Pong. Акторы, с задержкой в одну секунду, пересылаю друг другу сообщения. Актор Ping отправляет строку "ping" актору Pong. Получив строку, Pong отображает её и отправляет строку "pong". Получив сообщение актор отображает строку и отправляет сообщение "ping". Этот процесс происходит до выхода из программы, пользователем.

Точка входа в модель
Код:
public static void main(String...args) {
        ActorSystem.launch(system -> {
           system.create(Ping.class, "ping", system.create(Pong.class, "pong"));
        });
}
В данному участке кода, запуск производится через статический метод ActorSystem.launch. Методом лябда выражения, в языке java, мы создает Consumer<Actor> и передаем его методу launch. system - корневой актор.

Производим создание актора Pong
Код:
system.create(Pong.class, "pong")
Данный метод create, c двумя аргументами, принимает два значения.
Первый аргумент типа Сlass<? extends Actor>, передается класс который наследует BaseActor, в нашем случае Pong.class
Второй аргумент типа String - уникальное имя актора в узле. "pong"

Производим создание актора Ping
Код:
system.create(Ping.class, "ping", system.create(Pong.class, "pong"))
В данном случае вызывается перегруженный метод create(Class<? extends> clazz, String name, Object...args);
Первые два аргумента являются идентичными, а в третий передается массив аргументов(синтаксический сахар vargs, Object...arss эквивалентно записи Object[] args). Система произведет поиск соответствующего конструктора, с заданными аргументами. В нашем случае будет вызван конструктор, для класса Ping
Код:
public Ping(Actor actor) {
    pong = actor;
}
так как метод create, возвращает ссылку на запись актора, которая наследует базовый абстрактный класс Actor. Таким образом мы скрываем реализацию, и защищаем классы от внешних изменений.
Другой вариант записи
Код:
    public static void main(String...args) {
        ActorSystem.launch(new Consumer<Actor>() {
            @Override
            public void accept(Actor system) {
                Actor pong = system.create(Pong.class, "pong");
                Actor ping = system.create(Ping.class, "ping", pong);
            }
        });
    }

Каждый актор обладает заданным поведение. Чтобы назначить поведение актору, мы сначала его создаем через метод behavior() анонимное, или поведение которое можно назначить по ключу behavior(Object key). Данные два метода возвращают класс который наследует интерфейс Actor.IBehavior. Создав поведение, через метод behavior, который вернул класс поведения, мы можем назначить ему случай поведения на указанный тип, через мето Case(Consumer<T> consumer, Class<T> type);
consumer - функция поведения для указанного типа, она вызывается в случае, когда актор получает соответсвующий тип сообщения
type - класс типа
T - тип значения
Код:
IBehavior behavior = behavior();
behavior.Case(value -> {
...произвести действия
}, Integer.class);
become(behavior);
Мы задали поведение, условному актору, который реагирует на входящее сообщения, числового типа, и производит какие-то вычисления.

В нашем примере поведение двух акторов выглядим следующим образом
Дла актора Pong
Код:
    public static class Pong extends BaseActor {
        {
            become(
                behavior().
                    Case(
                        value -> {
                            System.out.println(value);
                            sender().send("pong", 1000);
                        },
                        String.class
                    )
            );
        }
    }
Метод sender() Возращает актора, который отправил полученное сообщение. Метод send отправляет сообщение "pong" с задержкой в 1 секунду.
То есть, получении сообщения типа String, мы отображаем его в консоли, затем отправляем сообщению "ping", отправителю, с задержкой в одну секунду.


Для актора Ping
Код:
    public static class Ping extends BaseActor {
        Actor pong;
      
        public Ping(Actor actor) {
            pong = actor;
        }
      
        {
            become(
                behavior().
                    Define(
                        d -> pong.send("ping")  
                    ).
                    Case(
                        value -> {
                            System.out.println(value);
                            pong.send("ping", 1000);
                        },
                        String.class
                    )
            );
        }
    }
В данном примере присутствует шаблонный случай Define. Его можно заменить на
Код:
behavior().
Case(
define -> {
pong.send("ping")
},
Actor.Define.class
);
Actor.Define системный класс-сообщение, которое отправляется, при создании каждому актору. Данное сообщение не обязательно для обработки.
Данное сообщение актор получает, обрабатывает, в первую очередь, затем все осталньные входящие, если на момент создания такие есть.

В шаблонном поведении Define, мы отправляем сообщение актору pong, который передали при создании, pong.send("ping"). Это является источником вычислений в данной модели. То есть при создании актора ping мы отправляем сообщение актору pong, на что, в случае актора pong, он получает и обрабатывает сообщение, отобразив его на консоли и ответив сообщением "pong".

Результат выполнения данной программы будет, бесконечное, поочередное отображения, на консоли, строк "ping" и "pong"
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
ping
pong
...
У меня отсутствует понятие планировки, вся планировка организовывается путем задержки сообщений.
ПС На данный момент, в этой имплементации, не реализована отправка сообщения по адресу.
 
Последнее редактирование модератором:
  • Мне нравится
Реакции: kick

    kick

    Баллов: 39
    За сообщение

    Mangol

    Баллов: 37
    Без комментариев
Расширил управление поведением. Добавил:
- возможность предиката (actor.samples.EvenAndOdd)
- возможность задавать поведения, для определенных значения(actor.samples.ValuesCase)

Добавил подержу синглтон акторов.
Синглтон актор - актор, объект которого может находиться в одном экземпляре, в системе.
Singleton расширяется BaseActor класс.
Объявление синглтон актора, делается путем наследования класса Singleton. Любой актор расширяемый от класс Singleton, сможет быть создан, в одном экземпляре, в системе, через метод create.

Обращение, к синглтон актору, можно сделать путем вызова метода actor(T.class), определенного в классе BaseActor. T класс должен быть расширяем actor.Singleton. Вызов данного метода возвращает ссылку на заданный актор.

Примечание: actor(T.class) может вернуть ссылку на не существующий синглтон.

Пример реализации PingPong через singleton
 
Последнее редактирование модератором:
  • Мне нравится
Реакции: kick

    kick

    Баллов: 39
    За сообщение
Обновление.
- Добавил актор-консоль. Через трейт actor.util.Consol.TConsole, добавляется метод, через который можно отправлять строку на консоль(println(Object)).
- Добавил систему остановки акторов. При вызове метода stop, для актора, генерируется событие Event.Stop. При обработке его, актор получает сообщение Actor.Stop, которое можно обработать, производя действия с останавливающим акторов. При остановке, актор удаляется из системы.
- Теперь ссылку на корневой актор можно получить через метод system, определенный в классе ActorBase.
- Для корневого актора добавлена возможность отображения дерева акторов. Через строковое сообщение "dump-tree". Пример можно посмотреть (src/test/java/actor.test.DumpTreeTest)
- Добавил поиск актора по адресу.
- Добавил возможность общения запрос-ответ.
 
  • Мне нравится
Реакции: kick

    kick

    Баллов: 45
    За сообщение
behavior -> case - напомнило Pattern matching из скалы)
 
Ну она где-то так и есть. D java нету механизма pattern matching. Но в данном случае получается reactive парадигма. Программируемое поведение на лету.
В scala бы этот участок был бы грациозней хД
 
  • Мне нравится
Реакции: kick

    kick

    Баллов: 45
    За сообщение
Всё? Конец?
 
Пришлось изучать фундаментальные труды, по альтернативами моделям, другие реализации(akka, killim, actor-foundary). Много времени уходит, пришел к более обобщенной модели. Обновил . Там можно посмотреть текущий прототип. Завтра выложу последний вариант.
Я пытаюсь придерживаться того, что все есть актор и все реализовывать на основе этого. Тоже тяжело предоставить метод расширения стандартного набора. Пытаюсь обобщить все задачи, которые можно решать. Тут вопрос экспериментов, тяжело что-то подобрать.
 
  • Мне нравится
Реакции: kick
Немного обновлю.
Переписал "немного". Решил использовать ForkJoinPool, стандартную реализацию. Получается относительно не плохая производительность.

Изменение в программировании поведения актора:
- теперь актору отправляется в сообщении массив объектов, получается на подобии RPC.
Код:
send(1);
send(1, 2, 3, 4);
- поведение задается через методы, указывается имя метода и его сигнатура.
Правила для регистрации метода как обработчика.
1 Соответствие сигнатуры
2 Соответствие задаваемого имении
3 Возвращаемое значение должно быть void

По примером можно понять общую концепцию.
 
  • Мне нравится
Реакции: kick
Реализовал эксперементальную версию, где любой java объект может быть актором, поведение задается через публичные методы.
Суть в том, что к каждому объекту может быть обработан только один вызов метода, в любой момент времени. Вызов метода, не ожидается, и результат не обрабатывается, на текущий момент.


Пример

С точки зрения параллельной модели вычислений, все вызовы через Actor.send, являются потокобезопасны, и все внутренние значение когерентны. Можно реализовать обработку результата исполнения метода.

Хороший пример, это все ai классы на pts. Так как ai, может обработать только одно событие за раз, и менять внутренне состояние безопасно

ПС На данный момент код очень прост, и не возникнет сложности в понимании
 
  • Мне нравится
Реакции: kick
Продолжаем шаманизм, самая простая
В данной реализации можнно:
- создавать акторы, регистрировать объекты в системе Context.create(name, instance)
- отправлять сообщения, вызывать отложенный метод Context.send(methodName, ...arguments)
В один момент времени объект-актор, может обрабатывать только один вызов, остальные ожидают очереди обработки.
Если у класса объявлен публичный метод:
Код:
public void define() { 
}
то данный метод будет вызван в первую очередь. Пример:
Код:
public class Define {

    public void define() {
        System.out.println("hello actor!");
    }
   
    public static void main(String...args) {
        Model.launch(new Define());
    }
}

Может создать объект-актор, из стандартного класса вывода System.out
Код:
public class Console implements Context {

    public void define() {
        final Reference console = create("console", System.out);
        for(char c : "hello world!".toCharArray())
            console.send("println", c);
    }
   
    public static void main(String...args) {
        Model.launch(new Console());
    }
}

Примечаний: Реализованно все топорно, нету оптимизаций, каждый вызов производит поиск метода заново, по имение и сигнатуре контента
 
  • Мне нравится
Реакции: kick
Точка входа для любой модели:
Model.launch(instance)
 
ожидание завершиния отложеного вызова.
При вызове метода send возвращается объект Callback, если через методы данного объекта, были установлены обработчик успешного и провального завершения вызова, или хотябы один обработчик, тогда актор блокирует и ожидает завершения. Все входящие сообщения, в очереди, ожидаю пока актор завершит запрос.

TODO таймер проверять, если ожидание привышает норму, установлиную через настройку, или вручную пользователем, завершать или отображать сообщения на консоли.

Примечание:
Провальный вызов - когда вызов генерирует исключительную ситуацию
 
Назад
Сверху Снизу