Akka + Servlet 3 = Comet

image

Вступление

Не так давно моё внимание привлекла библиотека Akka — реализация модели акторов для Scala и Java. Она мне показалась достаточно интересной, тем более до этого с акторами мне ещё не доводилось работать. И вот я наконец победил лень и сел реализовать на Akka что-нибудь простое и бесполезное. Напрмер, асинхронную обработку http-запросов. К тому же уже давно вышла реализация Servlet 3, которую тоже надо посмотреть. Итак, предлагаю вашему вниманию реализацию на Scala простейшего comet-чата.

Что для этого понадобится

  • Scala — отличный язык программирования, особенно если вы уже долго работаете с Java и вам уже стало скучно. Или вы смотрите roadmap развития Java и вам становится грустно.
  • Akka — реализация акторов для Scala и Java. Позволяет разрабатывать параллельные программы на манер Erlang. Согласно заявлением разработчиков и некоторым тестам является лучшей в мире scala
  • Servlet 3 — последняя на данный момент спецификация сервлетов. В частности поддерживает асинхронную обработку запросов и конфигурирование с помощью аннотаций (можно совсем без web.xml). Я использую Tomcat 7

Приступим

На схеме изображена блок схема компонентов сервера.
image
Начнём с центральной и самой простой части нашего чата — актора Chat:
class Chat extends Actor{
 
    val subscribers = scala.collection.mutable.ListBuffer.empty[ActorRef]
 
    protected def receive = {
        case Subscribe(a) =>{
            subscribers += a
        }
        case UnSubscribe(a) =>{
            subscribers -= a
        }
        case m @ ChatMessage(_,_)  =>{
            for(s <- subscribers) s ! m
        }
    }
}
Метод recieve является основным в классе актора. Он взвращает partial функцию (PartialFunction[Any, Unit]), которая будет вызываться при поступлении сообщения. Сообщения — это обычно case классы, которые потом легко разделяются с помощью сопоставления с образцом (pattern matching).

Этот актор хранит у себя список сессий пользователей. Он обрабатывает три типа сообщений: Subscribe и UnSubscribe, с помощью которых акторы сессий пользователей (UserSession) подписываются и отписываются от получения сообщений чата; и ChatMessage — сообщение в чате, которое просто ретранслируется всем участникам чата (включая самого отправителя).

Теперь перейдём к другому концу нашего сервера, сервлету:
@WebServlet(urlPatterns = Array("/comet"), asyncSupported = true, loadOnStartup = 1)
class CometServlet extends HttpServlet{
    override def service(req: HttpServletRequest, resp: HttpServletResponse) {
        req.setCharacterEncoding(«UTF-8»)
        resp.setCharacterEncoding(«UTF-8»)
        resp.setContentType(«application/json»)
        val ctx = req.startAsync(req, resp)
        ctx.setTimeout(1000*60*2)
        RequestDispatcher.instance ! Request(new RequestContext(ctx))
    }
}
Настройки сервлета заданы с помощью аннотации (больше никакого xml). Параметр asyncSupported указывает на то, что данный сервлет может обрабатывать запросы асинхронно. Само начало асинхронной обработки происходит при вызове метода startAsync у запроса. После этого вызова запрос не будет закрыт при завершении метода service, а будет ждать вызова метода complete (у класса AsyncContext, объект которого вернул startAsync) или наступления таймаута обработки запроса. При этом поток, в контексте которого происходил вызов метода service сервлета, не будет ждать завершения обработки запроса, а отправится обратно в пул дожидаться нового запроса.

В нашем случае запрос после небольшой предварительной подготовки отправляется актору RequestDispatcher. Оператор '!' посылает сообщение актору асинхронно, то есть не ждёт результата обработки сообщения.

RequestDispatcher находит (или создаёт новую) сессию пользователя (UserSession) и асинхронно отправляет ей запрос на обработку.
class RequestDispatcher extends Actor{
    val sessions = collection.mutable.HashMap.empty[String, ActorRef]
    protected def receive = {
        case Request(ctx) => {
            var userId = ctx.param(«userId»)
            if(userId == null) userId = UUID.randomUUID.toString
            val session = sessions.getOrElseUpdate(userId, {
                val a = Actor.actorOf(new UserSession(userId))
                self.startLink(a)
                a
            })
            session ! Request(ctx)
        }
    }
}
Актор UserSession занимается непосредственно обработкой запроса пользователя.
class UserSession(val userId:String) extends Actor{
    var waitRequest:Option[RequestContext] = None
    var messagesQueue = List.empty[ChatMessage]
 
    protected def receive = {
        case Request(ctx) => {
            val action = ctx.param(«action»)
            action match {
                case «sendMessage» => {
                    val msg = ctx.param(«message»)
                    Chat.instance ! ChatMessage(prepareMessage(msg))
                    ctx.writeAndComplete(okResponse)
                }
                case «getMessages» =>{
                    closeLastRequest()
                    if(messagesQueue.isEmpty){
                        waitRequest = Some(ctx)
                    } else{
                        sendMessagesToClient(ctx)
                    }
                }
            }
        }
        case msg @ ChatMessage(_,_) => {
                messagesQueue = msg :: messagesQueue
                if(waitRequest.isDefined){
                    sendMessagesToClient(waitRequest.get)
                    waitRequest = None
                }
        }
        case CloseRequest() =>{
            waitRequest = None
        }
    }
    override def preStart {
        Chat.instance ! Subscribe(self)
    }
    override def postStop {
        closeLastRequest()
        Chat.instance ! UnSubscribe(self)
    }
}
Для каждого пользователя создаётся отдельный экземпляр UserSession. Для получения сообщений клиент отправляет запрос getMessages, который, если очередь неотправленных сообщений не пуста, сразу обрабатывается; если сообщений нет, то запрос сохраняется и ожидает их поступления. Таким образом от клиента почти всё время есть ожидающий запрос. Отправка сообщений производится отдельным post-запросом. Данные на клиент передаются в виде json. В качестве серализатора использую lift-json(кусок веб-фреймворка для scala — lift).

На клиенте всё предельно просто. Запрос сообщений с помощью jQuery.ajax происходит примерно так
$.ajax({
    data:{
        userId:userId,
        action:«getMessages»
    },
    success:function(res){
        //показ сообщений
        delayedReceiveMessages(10);
    },
    error:function(){
        delayedReceiveMessages(1000);
    }
})

Результат

Всё что получилось разместил здесь. Развёрнуто на бесплатном микроинстансе Amazon EC2.
Субъективно, разрабатвывать с использованием акторов оказалось довольно приятно. Отсутствие общего состояния очень облегчает жизнь.

Ни на что не претенндующие замеры показали, что на 10 ожидающих соединений расходуется 1 Mb памяти кучи (heap). Эта память почти полностью расходуется на содержание соединения томкатом. Потребность акторов на порядок-два меньше. На мой взгляд для comet-сервера это много.

PS Для заинтересовавшихся Akka и владеющими английским советую посмотреть доклад на Scala Days 2010 и ознакомится с официальным руководством.
Те, кому нужен настоящий comet-сервер для java/scala (а не этот велосипед), предлагаю поискать что-то специализированное, например Atmpsphere.


0 комментариев

Только зарегистрированные и авторизованные пользователи могут оставлять комментарии.