4gophers

TCP сервер работающий как часы

Перевод статьи “TCP servers that run like clockwork

Go замечательный язык для написания различных сетевых сервисов. Большинство сервисов используют TCP в качестве базового транспорта. Именно поэтому важно понимать как писать надежные TCP серверы. Кроме того, это позволит лучше разбираться в базовых принципах работы сетевых приложений, которые необходимы нам в повседневной работе.

“Кричащий” сервис

Начнем с создания простейшего TCP сервера, который будет “кричать” в ответ своим клиентам. Он будет получать некоторое сообщение, переводить его в верхний регистр и отправлять обратно клиенту. Мы будем использовать простой протокол - одна строка это запрос. В ответ отправляется тоже одна строка. Запрос может оканчиваться \r\n или \n. Ответ всегда оканчивается \n.

В действии это выглядит так:

$ telnet 0.0.0.0 8080
Trying 0.0.0.0...
Connected to 0.0.0.0.
Escape character is '^]'.
Hello. How are you?
HELLO. HOW ARE YOU?
Ok. Be shouty!
OK. BE SHOUTY!
Bye!
BYE!
^]
telnet> Connection closed.

Под капотом:

type Server struct {
    Addr string
}

func (srv Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":8080"
    }
    log.Printf("starting server on %v\n", addr)
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    defer listener.Close()
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("error accepting connection %v", err)
            continue
        }
        log.Printf("accepted connection from %v", conn.RemoteAddr())
        handle(conn) //TODO: Implement me
    }
}

В коде выше мы создаем listener который прослушивает указанный addr и принимает соединения. Дальше соединения пробрасываются в handle(conn) которую еще необходимо реализовать. Вся наша бизнес-логика(обработка строк) будет реализованна именно в функции handle.

func handle(conn net.Conn) error {
    defer func() {
        log.Printf("closing connection from %v", conn.RemoteAddr())
        conn.Close()
    }()
    r := bufio.NewReader(conn)
    w := bufio.NewWriter(conn)
    scanr := bufio.NewScanner(r)
    for {
        scanned := scanr.Scan()
        if !scanned {
            if err := scanr.Err(); err != nil {
                log.Printf("%v(%v)", err, conn.RemoteAddr())
                return err
            }
            break
        }
        w.WriteString(strings.ToUpper(scanr.Text()) + "\n")
        w.Flush()
    }
    return nil
}

В нашем хендлере парсятся входные данные пока не наткнемся на \r\n or \n. Одновременно со сканированием текста, мы его капитализируем и отправляем обратно клиенту. Дальше мы ожидаем новых данных от клиента, пока не получим ошибку или EOF.

Обратите внимание, мы читаем и пишем в бесконечном цикле. Нам приходится так делать, потому что мы работаем с одним и тем же TCP соединением. Возможно вы уже увидели в чем тут проблем? Что будет, если клиент уйдет и никогда не закроете соединение? Мы никогда не выйдем из этого бесконечного цикла.

“Кричим” конкурентно

Наш простой сервер может обрабатывать одного клиента в единицу времени. Это не очень круто. Что если многим людям будет необходим “хороший крик”? К счастью, в языке с поддержкой конкурентности это очень легко исправить. Достаточно просто запустить handler конкурентно.

Нам нужно заменить handle(conn) на go handle(conn). И, собственно, все. Теперь наш сервер может конкурентно обрабатывать запросы от клиентов. Чтобы разобраться как это работает, рассмотрим функцию ListenAndServe. Работу функции условно разделим на несколько этапов:

  • Начинаем прослушивать TCP сокет
  • Пытаемся принять соединение от клиента. Если нет желающих подключится, то функция блокируется.
  • При удачном соединении, мы вызываем функцию handle(conn) с полученным соединением.

В текущей реализации если блокируется handle в ожидании пользовательского ввода, то ListenAndServe блокируется тоже. Нам нужно чтобы accept мог обрабатывать следующее соединение, но мы ждем пока не завершит свою работу handle. Простое решение это вызывать handle в рутине. Теперь мы можем сразу вернутся к accept и заблокироваться на ожидании следующего соединения.

Работаем с простаивающими(idle) соединениями

Выше мы обсуждали что будет если клиент просто уйдет и не закроет соединение. Нам нужно что-то делать с простаивающими соединениями. Нам нужны таймауты. Любой сервис, который не использует таймауты, уже сломан. Если вы вынесете из этой статьи только одну мысль, то пусть это будет именно эта.

К счастью, в Go довольно просто реализовать idle таймаут. Посмотрим, как можно отключать клиентов, которые долго не общались с нами.

 type Server struct {
    Addr        string
    IdleTimeout time.Duration
}

Для начала нам нужно поле IdleTimeout, которое будем использовать в качестве настройки. Теперь мы можем установить дедлайн как текущее время + IdleTimeou.

conn.SetDeadline(time.Now().Add(conn.IdleTimeout))

Часики тикают. Дедлайн это абсолютное значение. Соединение будет “убито”, если мы заново не обновим дедлайн. Каждый раз после удачной операции необходимо увеличивать(продлевать) дедлайн. Если мы этого не сделаем, то соединение “умрет”

Какую операцию можно считать удачной? В случае с TCP это будет операция записи или чтения. Если есть хоть какие-то операции чтения и записи - значит есть прогресс. Если долгое время нет ни чтения ни записи, то это означает что соединение больше не используется и должно быть убито.

Для обновления дедлайна в момент чтения и/или записи, нам нужно сделать небольшую обертку над conn с нашей собственной реализацией. В нашей реализации мы добавим обновления дедлайна.

type Conn struct {
    net.Conn
    IdleTimeout time.Duration
}

func (c *Conn) Write(p []byte) (int, error) {
    c.updateDeadline()
    return c.Conn.Write(p)
}

func (c *Conn) Read(b []byte) (int, error) {
    c.updateDeadline()
    return c.Conn.Read(b)
}

func (c *Conn) updateDeadline() {
    idleDeadline := time.Now().Add(c.IdleTimeout)
    c.Conn.SetDeadline(idleDeadline)
}

Это довольно простой код. Видно, что мы можем управлять “жизнью” соединения. Так как наше соединение полностью реализует интерфейс net.Conn, то мы можем использовать нашу реализацию везде, где до этого использовали стандартное соединение, например в вызове handle(conn).

conn := &Conn{
            Conn:        newConn,
            IdleTimeout: srv.IdleTimeout,
        }
conn.SetDeadline(time.Now().Add(conn.IdleTimeout))
go handle(conn)

Посмотрим, как это работает

$ telnet 0.0.0.0 8080
Trying 0.0.0.0...
Connected to 0.0.0.0.
Escape character is '^]'.
I'm going away. Bye!
I'M GOING AWAY. BYE!
Connection closed by foreign host.

Сервер прервал соединение после нескольких секунд бездействия. Конечно, idle таймаут должен настраиваться в зависимости от вашего приложения.

Ограничиваем буферы

Что будет, если клиент отправит гигабайты информации? Кстати, это именно то, что происходит в случае Denial-of-Service(или DoS) атаке. Мы можем избежать неприятных последствий и будем чуть более осторожными.

Прежде всего, нам нужно ограничить количество данных, которые мы читаем из нашего соединения. Это можно сделать с пмощью LimitedReader. Такой ридер ограничивает количество данных которые можно прочитать за единицу времени. Можем использовать его в нашем методе Read

func (c *Conn) Read(b []byte) (int, error) {
    c.updateDeadline()
    r := io.LimitReader(c.Conn, c.MaxReadBuffer)
    return r.Read(b)
}

Мы используем LimitReader в методе conn.Read для чтения данных до MaxReadBuffer. Мы используем MaxReadBuffer как параметр для настройки, как до этого использовали IdleTimeout. Теперь мы можем ограничивать количество данных, которые мы читаем за один раз, но только в рамках одного соединения и одной операции чтения. handle(conn) продолжает считывать данные из сокета. Ограничение буфера защищает нас от слишком быстрого чтения слишком большого количества данных. Тем не менее, нам необходимо ограничивать лимиты в целом. Пусть это будем вашим домашним заданием.

Избавляемся от соединений

Давайте рассмотрим как правильно остановить наш сервер. Стратегия достаточно простая:

  • Прежде всего перестаем принимать новые соединения.
  • Опрашиваем оставшиеся соединения и ждем пока они прекратят свою работу.
  • Как только все соединения закрыты, можно останавливать наш сервер.

Метод остановки будет выглядеть так:

func (srv *Server) Shutdown() {
    srv.inShutdown = true
    log.Println("shutting down...")
    srv.listener.Close()
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            log.Printf("waiting on %v connections", len(srv.conns))
        }
        if len(srv.conns) == 0 {
            return
        }
    }
}

Мы сообщаем всем, что собираемся останавливаться и закрываем текущий экземпляр listener. С этого момента новые соединения не принимаются, текущие пока продолжают свою работу. Дальше, мы опрашиваем счетчик текущих соединений каждые 500ms. Как только счетчик доходит до 0 мы останавливаем сервер.

Необходимо переписать часть нашего сервера, чтобы считать количество соединений, удалять “мертвые” соединения и плавно останавливать сервер. Мне кажется, это замечательное упражнение, которые вы можете выполнить сами. Если у вас случится затык, то вот вам подсказка.

Заключение

Теперь у нас есть TCP сервер с idle таймаутами, ограниченным буфером для чтения и с плавной остановкой. Большинство этих идей уже реализованны в стандартной библиотеке, например в пакете net/http. Я рекомендую почаще заглядывать в код стандартной библиотеки, там очень много всего интересного.

Nov 12, 2017

Ли Атчисон: Масштабирование приложений. Выращивание сложных систем

Мы живем в мире растущих приложений. Практически любые программные продукты рано или поздно приходится расширять, надстраивать, адаптировать к обслуживанию растущей пользовательской аудитории и к пиковым нагрузкам. Для того, чтобы подобное масштабирование протекало гладко и быстро, нужно закладывать такие возможности уже на уровне архитектуры приложения. В этой незаменимой прикладной книге автор рассказывает не только об архитектурных тонкостях, необходимых для эффективного масштабирования приложений, но и о рисках, присущих такой работе, о грамотной организации масштабирования и об использовании облачных сервисов.

 

comments powered by Disqus