Сервер TCP/IP… много серверов хороших и разных.

Чаще всего, если это не приходится делать очень часто (т.е. не является основной спецификой работы), при необходимости написания TCP/IP сервера используется одна из двух "классических" технологий: последовательный сервер или параллельный сервер на основе fork() (Windows-программисты в этом случае пишут сервер на основе thread). Хотя реально можно предложить гораздо больше принципиально различных серверов, которые будут существенно отличаться своей сложностью, временем реакции на запрос клиента и т.д. Ниже описано несколько из таких способов с результатами их тестирования. Программы делались и испытывались в OS QNX 6.2.1, но могут (за исключением специально оговоренного случая) практически без изменений использоваться в любой UNIX-like OS, а за некоторым исключением - и в Windows.

1.Постановка задачи: мы напишем специальный тестовый TCP/IP клиент, который посылает требуемое число раз запрос к серверу (ретранслятору), принимает от него ответ и тут же разрывает соединение. Серия запросов от клиента делается для усреднения результата и для того (как будет видно далее), чтобы исключить (или учесть) эффекты кэширования памяти. Клиент измеряет время (точнее - число циклов процессора) между отправкой запроса серверу и приходом ответа от него. Сервера в этом анализе являются простыми ретрансляторами. Все показанные программы - предложены в упрощённых вариантах: не везде сделана полная обработка ошибочных ситуаций (что, вообще-то говоря, крайне необходимо), и сознательно не включена - обработка сигнала SIGCHLD, которая должна препятствовать появлению зомби процессов. Все приводимые коды программ - работающие и апробированные: весь результирующий вывод скопирован непосредственно с консоли задачи. Весь приводимый программный код транслировался компилятором gcc-2-95 в нотации языка C++ (хотя специфические особенности С++, за исключением потокового ввода-вывода С++ и не использованы).

2.Клиент. Собственно клиент размещён в файле cli.cpp, но он, совместно с сервером, использует общие файлы common.h & common.cpp, все эти файлы с краткими комментариями приведены ниже:

common.h - в этом файле определены различные порты TCP, по которым клиент будет связываться с различными модификациями серверов. Кроме того, здесь определены:
- функция завершения по критической ошибке;
- единая процедура ретрансляции через сокет, которую используют все сервера(для единообразия и корректности сравнений);
- функция подготовки прослушивающего сокета TCP/IP (для того, чтобы устранить этот, достаточно объёмный, код из кода серверов, рассматриваемых ниже).
Код:
#if !defined( __COMMON_H )
#define __COMMON_H
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include [u]
#include <string.h>
#include [i]
#include <netdb.h>
const int PORT = 9000, /* программа: */
SINGLE_PORT = PORT, /* ech0 */
FORK_PORT = PORT + 1, /* ech1 */
FORK_LARGE_PORT = PORT + 2, /* ech10 */
PREFORK_PORT = PORT + 3, /* ech11 */
INET_PORT = PORT + 4, /* ech3 */
THREAD_PORT = PORT + 5, /* ech2 */
THREAD_POOL_PORT = PORT + 6, /* ech21 */
PRETHREAD_PORT = PORT + 7; /* ech22 */
const int MAXLINE = 40;
// критическая ошибка ...
void errx( const char *msg, int err = EOK );
// ретранслятор тестовых пакетов TCP
void retrans( int sc );
// создание и подготовка прослушивающего сокета
int getsocket( in_port_t );
#endif

common.cpp - реализационная часть:

#include "common.h"
// ошибка ...
void errx( const char *msg, int err = EOK ) {
perror( msg );
if( err != EOK ) errno = err;
exit( EXIT_FAILURE );
};
// ретранслятор тестовых пакетов TCP
static char data[ MAXLINE ];
void retrans( int sc ) {
int rc = read( sc, data, MAXLINE );
if( rc > 0 ) {
rc = write( sc, data, strlen( data ) + 1 );
if ( rc < 0 ) perror( "write data failed" );
}
else if( rc < 0 ) { perror( "read data failed" ); return; }
else if( rc == 0 ) { cout << "client closed connection" << endl; return; };
return;
};
// создание и подготовка прослушивающего сокета
struct sockaddr_in addr;
int getsocket( in_port_t p ) {
int rc = 1, ls;
if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) = -1 )
errx( "create stream socket failed" );
if( setsockopt( ls, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof( rc ) ) != 0 )
errx( "set socket option failed" );
memset( &addr, 0, sizeof( addr ) );
addr.sin_len = sizeof( addr );
addr.sin_family = AF_INET;
addr.sin_port = htons( p );
addr.sin_addr.s_addr = htonl( INADDR_ANY );
if( bind( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) != 0 )
errx( "bind socket address failed" );
if( listen( ls, 25 ) != 0 ) errx( "put socket in listen state failed" );
return ls;
};

cli.cpp - код клиента:

#include [i]
#include <sys/neutrino.h>
#include <sys/syspage.h>
#include <sys/procfs.h>
#include "common.h"
// установка параметров клиентов: порт и число повторений
static void setkey( int argc, char *argv[], in_port_t* port, int* num ) {
int opt, val;
while ( ( opt = getopt( argc, argv, "p:n:") ) != -1 ) {
switch( opt ) {
case 'p' :
if( sscanf( optarg, "%i", &val ) != 1 )
errx( "parse command line failed", EINVAL );
*port = (in_port_t)val;
break;
case 'n' :
if( ( sscanf( optarg, "%i", &val ) != 1 ) || ( val <= 0 ) )
errx( "parse command line failed", EINVAL );
*num = val;
break;
default :
errx( "parse command line failed", EINVAL );
break;
};
};
};
// клиент - источник потока тестовых пакетов TCP
int main( int argc, char *argv[] ) {
in_port_t listen_port = SINGLE_PORT;
int num = 10;
setkey( argc, argv, &listen_port, &num );
char data[ MAXLINE ], echo[ MAXLINE ];
uint64_t cps = cps = SYSPAGE_ENTRY( qtime )->cycles_per_sec;
cout << "TCP port = " << listen_port << ", number of echoes = " << num << endl
<< "time of reply - Cycles [usec.] :" << endl;
for( int i = 0; i < num; i++ ) {
int rc, ls;
if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) < 0 )
errx( "create stream socket failed" );
struct sockaddr_in addr;
memset( &addr, 0, sizeof( addr ) );
addr.sin_len = sizeof( addr );
addr.sin_family = AF_INET;
addr.sin_port = htons( listen_port );
inet_aton( "localhost", &addr.sin_addr );
if( ( rc = connect( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) ) < 0 )
errx( "connect failed" );
sprintf( data, "%d", rand() );
uint64_t cycle = ClockCycles();
if( ( rc = write( ls, data, strlen( data ) + 1 ) ) <= 0 )
errx( "write data failed" );
rc = read( ls, echo, MAXLINE );
cycle = ClockCycles() - cycle;
if( rc < 0 ) errx( "read data failed" );
if( rc == 0 ) errx( "server closed connection" );
if( strcmp( data, echo ) != 0 ) { cout << "wrong data" << endl; break; };
cout << cycle << "[" << cycle * 1000000 / cps << "]";
if( i % 5 == 4 ) cout << endl; else cout << 't'; cout << flush;
close( ls );
delay( 100 );
};
if( num % 5 != 0 ) cout << endl;
exit( EXIT_SUCCESS );
};

После запуска клиент анализирует ключи запуска. Предусмотрены значения: "-p" значение порта подключения (по умолчанию - последовательный сервер, порт 9000), и "-n" - число запросов к серверу в серии (по умолчанию - 10). Каждый запрос представляет собой случайное число, генерируемое клиентом, в символьной форме. Ретранслированный сервером ответ сверяется с запросом для дополнительного контроля. Клиент подключается к серверу по петлевому интерфейсу 127.0.0.1, что вполне достаточно для сравнительного анализа. Далее мы рассмотрим его работу с различными серверами.

3. Последовательный сервер ретранслятор. Такой сервер нас интересует только как эталон для сравнения: он имеет минимальное время реакции, т.к. не затрачивается время на порождение каких-либо механизмов параллелизма. С другой стороны, такой сервер, зачастую, просто неинтересен, т.к. не позволяет обслуживать других клиентов до завершения текущего обслуживания.

Все сервера имеют крайне простой код, потому что большая часть рутины снесена в файлы common (h & cpp). Вот код 1-го используемого нами - последовательного сервера (файл ech0.cpp):
Код:
#include "common.h"
int main( int argc, char *argv[] ) {
int ls = getsocket( SINGLE_PORT ), rs;
while( true ) {
if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
retrans( rs );
close( rs );
cout << "*" << flush;
};
exit( EXIT_SUCCESS );
};

Вот результаты выполнения клиента с этим сервером (указано число машинных циклов ожидания, а в скобках - для справки - время в микросекундах для процессора Celeron 533Mhz):
Код:
/root/ForkThread # cli -p9000 -n20
TCP port = 9000, number of echoes = 20
time of reply - Cycles [usec.] :
868325[1624] 135364[253] 135287[253] 133438[249] 133057[248]
136061[254] 133554[249] 133887[250] 138776[259] 131237[245]
134748[252] 133823[250] 135650[253] 130583[244] 134562[251]
132601[248] 134622[251] 134516[251] 132055[246] 134139[250]

Отчётливо виден (1-й запрос) эффект, который мы отнесли к эффектам кэширования памяти программ - различие времени выполнения первого и последующих запросов. В каталоге проекта есть ещё один (тестовый) вариант последовательного сервера, код которого выглядит несколько иначе:
Код:
#include <sys/neutrino.h>
#include "common.h"
int main( int argc, char *argv[] ) {
int ls = getsocket( SINGLE_PORT ), rs, i = 0;
while( true ) {
if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
uint64_t cycle = ClockCycles();
retrans( rs );
cycle = ClockCycles() - cycle;
close( rs );
cout << cycle;
if( i++ % 5 == 4 ) cout << endl; else cout << 't'; cout << flush;
};
exit( EXIT_SUCCESS );
};

Он отличается тем, что "хронометрирует" (для справки) оценочно число циклов на ретрансляцию (затрачиваемые внутри сервера). Все типы серверов используют общую процедуру retrans() и единые затраты "чистого времени". Приведём для справки эти оценки (только машинные циклы):
Код:
/root/ForkThread # ech0_
757808 60862 60085 60444 60197
61111 60565 60154 59121 59984

Видно, что это время составляет около 50% времени, наблюдаемого со стороны клиента, которое включает в себя время реакции на accept() (со стороны сервера), 2-кратные затраты write() + read() (со стороны как клиента, так и сервера), время передачи буферов по петлевому интерфейсу и т.п.

4. "Классический" параллельный сервер. Ниже приведен код такого "классического" сервера (в отличающейся части), в котором обслуживающий процесс порождается fork() после разблокирования на accept()(файл ech1.cpp):
Код:
#include "common.h"
int main( int argc, char *argv[] ) {
int ls = getsocket( FORK_PORT ), rs;
while( true ) {
if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
pid_t pid = fork();
if( pid < 0 ) errx( "fork error" );
if( pid == 0 ) {
close( ls );
retrans( rs );
close( rs );
cout << "*" << flush;
exit( EXIT_SUCCESS );
}
else close( rs );
};
exit( EXIT_SUCCESS );
};

После выхода из accept() (получение запроса connect() от клиента) порождается отдельный обслуживающий процесс, который тут же закрывает свою копию прослушивающего сокета, производит ретрансляцию через соединённый сокет, завершает соединение и завершается сам. Родительский же процесс закрывает свою копию соединённого сокета и продолжает прослушивание канала. Вот результаты выполнения такого сервера:
Код:
/root/ForkThread # cli -p9001 -n20
TCP port = 9001, number of echoes = 20
time of reply - Cycles [usec.] :
2219652[4151] 1467470[2744] 1470056[2749] 1466860[2743] 1469294[2748]
1466875[2743] 1467612[2745] 1489083[2785] 1475620[2759] 1665398[3114]
1472091[2753] 1471635[2752] 1481768[2771] 1462214[2734] 1467229[2744]
1468731[2747] 1466483[2742] 1465499[2741] 1461780[2734] 1649821[3085]

Да . . . время реакции больше чем на порядок превышает простой последовательный сервер. Видно заметно менее (относительно) выраженный эффект кэширования - вновь создаваемое адресное пространство процесса повторно не используется, однако некоторое влияние кэширования сказывается (в программе на стороне клиента?). Добавим в код сервера 1 строчку - перед точкой main (файл ech10.cpp - и изменён порт):
Код:
static long MEM[ 2500000 ];

/root/ForkThread # cli -p9002
TCP port = 9002, number of echoes = 10
time of reply - Cycles [usec.] :
67061908[125432] 64674322[120966] 64126835[119942] 63071907[117969] 64185096[120051]
65478368[122470] 64495464[120632] 64533852[120703] 63831652[119390] 64407915[120468]

Строки вывода перенесены мною потому, что он уже не помещаются в формат страницы: время реакции увеличилось почти в 50 раз, превышает время реакции простейшего последовательного сервера уже почти на 3 порядка (500 раз, или 1000 раз по "чистому" времени обслуживания), и составляет уже 0.12 секунды на каждый запрос. Что произошло? При порождении нового процесса по fork() (можно считать, что здесь затраты не столь большие - из предыдущей таблицы: порядка 1.5 млн. циклов) - OS обязана перекопировать образ задачи (к которой мы добавили ~20Mb) из адресного пространства одного процесса, а пространство другого. И не посредством memcpy(), а запросами к ядру системы, потому как копирование идёт между различными защищёнными образами!

Какие предварительные итоги можно сделать из рассматриваемых результатов? Во-первых, то, что OS QNX определённо не использует технику "copy on write" (COW) для копирования образов порождаемых по fork копий процессов, а, во-вторых, ... меняет ли что-то принципиально применение COW в других OS, например в Linux? Думаю, что "скорее нет", т.к. радикальное снижение начального времени реакции (времени латентности) при использовании COW оборачивается только скрытием тех же затрат, но распределённых по интервалу обслуживанию. Т.е., использование COW эффективно только как "рекламный", "рыночный трюк", рассчитанный на гипнотическое воздействие на конечного потребителя некоторых "магических" тестовых цифр и уж категорически неприменимо для realtime OS, поведение которых во времени должно быть строго детерминировано.

5. Параллельный сервер с предварительным созданием копий. Так что же получается: для серверов, работающих на высоко интенсивных потоках запросов, с традиционным fork-методом всё так плохо? Отнюдь! Нужно только поменять fork & accept местами - создать заранее некоторый пул обслуживающих процессов, каждый из которых до прихода клиентского запроса будет заблокирован на accept (кстати - accept на одном и том же прослушиваемом сокете). А после отработки клиентского запроса заблаговременно создать новый обслуживающий процесс. Эта техника известна как "предварительный fork" или pre-fork. Меняем текст сервера (файл ech11.cpp):
Код:
#include "common.h"
const int NUMPROC = 3;
int main( int argc, char *argv[] ) {
int ls = getsocket( PREFORK_PORT ), rs;
for( int i = 0; i < NUMPROC; i++ ) {
if( fork() == 0 ) {
int rs;
while( true ) {
if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
retrans( rs );
close( rs );
cout << i << flush;
delay( 250 );
};
};
};
for( int i = 0; i < NUMPROC; i++ ) waitpid( 0, NULL, 0 );
exit( EXIT_SUCCESS );
};

При написании этого текста я несколько "схитрил" и упростил, в сравнении с предложенной абзацем выше моделью. Здесь 3 обслуживающих процесса сделаны циклическими и не завершаются по окончанию обслуживания, а снова блокируются на accept, но для наблюдения эффектов этого вполне достаточно (последняя строка нужна вообще только для блокировки родительского процесса, и "сохранения" управляющего терминала - для возможности прекращения всей группы по ^C):
Код:
# pidin
...
6901868 1 ./ech11 10r REPLY 94228
6901869 1 ./ech11 10r REPLY 94228
6901870 1 ./ech11 10r REPLY 94228

/root/ForkThread # cli -p9003
TCP port = 9003, number of echoes = 10
time of reply - Cycles [usec.] :
854276[1597] 138356[258] 135665[253] 131656[246] 136653[255]
132532[247] 133583[249] 134639[251] 136363[255] 131482[245]

Время реакции практически равно последовательному серверу, чего мы и добивались. В этой программе добавлен вывод идентификатора (i) обрабатывающего процесса (предыдущие сервера выводили только символ "*" для идентификации факта обработки запроса). Для этого добавлена и задержка "пере-активизации" процесса delay(250) - больше 2-х периодов запросов клиентов, чтоб заставить обрабатывающие процессы чередоваться. Вот возможный вид протокола сервера:
Код:
/root/ForkThread # 2012012012201201201220120120122012012012

Хорошо видно нарушение периодичности последовательности идентификационных номеров процессов: после периода простоя всегда обслуживание осуществляется процессом с индексом 2 (максимальным) - при множественном блокировании на acept() первым разблокируется процесс, заблокировавшийся последним (!?).

В принципе, не так и сложно в такой схеме сделать и динамический пул процессов, как будет показано ниже для потоков - с той лишь некоторой сложностью, что здесь каждый процесс выполняется в своём закрытом адресном пространстве, и для их взаимной синхронизации придётся использовать что-то из механизмов IPC.

6. Прежде, чем переходить к потоковым (thread) реализациям, рассмотрим ещё один fork-вариант: использование суперсервера inetd. При этом весь сервис по запуску процессов-копий нашего приложения, и перенаправлению его стандартных потоков ввода-вывода в сокет - возьмёт на себя inetd. Вот полный текст ретранслирующего сервера для этого случая (файл ech3.cpp):
Код:
#include <stdio.h>
#include "common.h"
static char data[ MAXLINE ];
void main( void ) { write( STDOUT_FILENO, data, read( STDIN_FILENO, data, MAXLINE ) ); };

Просто? Мне кажется, что очень. Теперь настроим на наше приложение и запустим inetd

- Дописываем в конфигурационный файл /etc/services строку, определяющую порт, через который будет вызываться приложение:
Код:
ech3 9004/tcp

- В конфигурационный файл файл /etc/inetd.conf добавляем строку, которая определяет режим обслуживания и конкретные параметры вызываемого приложения:
Код:
ech3 stream tcp nowait root /root/ForkThread/ech3 ech3

- Запускаем inetd:
Код:
/root/ForkThread # inetd &

При заполнении строк концигурационных файлов нужна особая тщательность, если заголовки сервисов (ech3) в файлах не будут совпадать, то вы получите просто ошибку связи:
Код:
/root/ForkThread # cli -p9004
TCP port = 9004, number of echoes = 10
time of reply - Cycles [usec.] :
connect failed: Connection refused

Проверить, что inetd настроен на прослушивание нашего порта можно так:
Код:
/etc # netstat -a
Active Internet connections (including servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
...
tcp 0 0 *.ech3 *.* LISTEN

Заставить inetd перечитать свои конфигурационные файлы после каждой правки /etc/services или /etc/inetd.conf вы можете, послав ему сигнал SGHUP, например:
Код:
/etc # pidin
...
10231922 1 usr/sbin/inetd 10r SIGWAITINFO
/etc # kill -SIGHUP 10231922

Если ошибка допущена в полном имени программы сервера (поля 6-7 строки inetd.conf), то мы тоже получим не сразу объяснимый результат:
Код:
/root/ForkThread # cli -p9004
TCP port = 9004, number of echoes = 10
time of reply - Cycles [usec.] :
server closed connection

... и, наконец, если всё в настройке inetd правильно, то получим нечто похожее:
Код:
/root/ForkThread # cli -p9004
TCP port = 9004, number of echoes = 10
time of reply - Cycles [usec.] :
16442468[30753] 14169659[26502] 14354292[26848] 14160723[26486] 14187182[26535]
14145131[26457] 14411884[26955] 14761467[27609] 14207573[26573] 14491483[27104]

Отметим, что время реакции в несколько раз (до 10-ти) выше прямой реализации с fork (inetd ведь также "скрыто" делает fork), но зато какая простота и трудоёмкость! Характерно почти полное отсутствие эффектов кэширования. Для серверов, обслуживающих "неплотный" поток запросов - это, пожалуй, оптимальное решение (кстати, большинство "штатных" сетевых сервисов UNIX выполняется именно по такой схеме).

7. Сервер, использующий pthread_create по запросу обслуживания клиента (файл ech2.cpp):
Код:
#include
#include "common.h"
void* echo( void* ps ) {
int sc = *(int*)ps;
sched_yield();
retrans( sc );
close( sc );
cout << "*" << flush;
return NULL;
}
int main( int argc, char *argv[] ) {
int ls = getsocket( THREAD_PORT ), rs;
while( true ) {
if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );
if( pthread_create( NULL, NULL, &echo, &rs ) != EOK ) errx( "thread create error" );
sched_yield();
};
exit( EXIT_SUCCESS );
};

Минимальные комментарии: 2 вызова sched_yield() (в вызывающем потоке, и, позже, в функции обслуживания созданного потока) - предназначены для гарантии копирования созданным потоком переданного дескриптора сокета до его повторного переопределения в цикле вызывающего потока. Результаты выполнения программы:
Код:
/root/ForkThread # cli -p9005
TCP port = 9005, number of echoes = 10
time of reply - Cycles [usec.] :
2493948[4664] 266123[497] 269490[504] 279049[521] 267775[500]
266880[499] 288175[539] 268589[502] 267990[501] 267003[499]

Это только в 2 раза (в 3, если оценивать по "чистому" времени) хуже простого последовательного сервера. Чрезвычайно сильно выражен эффект кэширования - вся обработка последовательности запросов производится на едином (многократно используемом) пространстве адресов.

8. Сервер с предварительным созданием потоков. Поступим по аналогии с pre-fork, и создадим фиксированный пул потоков предварительно (pre-thread, файл ech22.cpp):
Код:
#include
#include "common.h"
static int ntr = 3; /*число thread в пуле*/
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
void* echo( void* ps ) {
int sc = *(int*)ps, rs;
sched_yield();
if( ( rs = accept( sc, NULL, NULL ) ) < 0 ) errx( "accept error" );
retrans( rs );
close( rs );
pthread_mutex_lock( &mutex );
ntr++;
pthread_cond_signal( &condvar );
pthread_mutex_unlock( &mutex );
cout << pthread_self() << flush;
delay( 250 );
return NULL;
}
int main( int argc, char *argv[] ) {
int ls = getsocket( PRETHREAD_PORT ), rs;
while( true ) {
if( pthread_create( NULL, NULL, &echo, &ls ) != EOK ) errx( "thread create error" );
sched_yield();
pthread_mutex_lock( &mutex );
ntr--;
while( ntr <= 0 ) pthread_cond_wait( &condvar, &mutex );
pthread_mutex_unlock( &mutex );
};
exit( EXIT_SUCCESS );
};

Здесь accept (как и раньше в случае prefork) перенесен в обрабатывающий поток (все thread блокированы в accept на единственном прослушивающем сокете). Для синхронизации я использую условную переменную, но могут применятся любые из синхронизирующих примитивов. Испытываем полученную программу:
Код:
/root/ForkThread # cli -p9007
TCP port = 9007, number of echoes = 10
time of reply - Cycles [usec.] :
879988[1645] 134687[251] 137152[256] 136303[254] 693676[1297]
138605[259] 140320[262] 138937[259] 136886[256] 342027[639]

Время реакции очень близко к последовательному серверу (к минимально достижимому потенциально!). Потоки обработчики на сервере идентифицируют себя своим tid:
Код:
/root/ForkThread # ech22
4567891011121314151617181920212223

Хорошо видно последовательное порождение нового потока для обработки каждого запроса клиента. Так же сильно, как и в предыдущем случае, выражены эффекты кэширования.

9. Можно создать сколь угодно сложный диспетчер, поддерживающий оптимальное число потоков (или процессов) в сервере, но в OS QNX от уже предоставлен как стандартное средство системы: потоковый пул (thread_pool_*). Сервер с использованием динамического пула потоков (файл ech21.cpp):
Код:
#include
#include <sys/dispatch.h>
#include "common.h"
static int ls;
THREAD_POOL_PARAM_T *alloc( THREAD_POOL_HANDLE_T *h ) { return (THREAD_POOL_PARAM_T*)h; };
THREAD_POOL_PARAM_T *block( THREAD_POOL_PARAM_T *p ) {
int rs = accept( ls, NULL, NULL );
if( rs < 0 ) errx( "accept error" );
return (THREAD_POOL_PARAM_T*)rs;
};
int handler( THREAD_POOL_PARAM_T *p ) {
retrans( (int)p );
close( (int)p );
delay( 250 );
cout << pthread_self() << flush;
return 0;
};
int main( int argc, char *argv[] ) {
ls = getsocket( THREAD_POOL_PORT );
thread_pool_attr_t attr;
memset( &attr, 0, sizeof( thread_pool_attr_t ) );
attr.lo_water = 3; /* заполнение блока атрибутов пула */
attr.hi_water = 7;
attr.increment = 2;
attr.maximum = 9;
attr.handle = dispatch_create();
attr.context_alloc = alloc;
attr.block_func = block;
attr.handler_func = handler;
void *tpp = thread_pool_create( &attr, POOL_FLAG_USE_SELF ) ;
if( tpp == NULL ) errx( "create pool" );
thread_pool_start( tpp );
exit( EXIT_SUCCESS );
};

Всё, сервер готов - почти всё необходимое за нас сделала библиотека OS. Грубо, логика работы пула потоков QNX следующая:
- начально создаётся attr.lo_water (нижняя ватерлиния) потоков;
- для каждого потока при создании вызывается функция *attr.context_alloc;
- эта функция по завершению вызовет (сама) блокирующую функцию потока *attr.block_func;
- эта функция, после разблокирования (accept) вызовет функцию обработчика *attr.handler_func, которой в качестве параметра (в нашем тексте) передаст дескриптор присоединённого сокета;
- как только число заблокированных потоков станет ниже attr.lo_water - механизм пула создаст дополнительно attr.increment потоков;
- если число блокированных потоков в какой-то момент превысит attr.hi_water ("верхняя ватерлиния") - "лишние" потоки будут уничтожены;
- . . . и всё это так, чтобы общее число потоков (выполняющиеся + блокированные) не превышало attr.maximum.

Это - уникально мощный механизм, с очень широкой функциональностью, но за более детальной информацией я отсылаю всех заинтересованных к технической документации OS QNX. Смотрим это в действии:
Код:
/root/ForkThread # cli -p9006 -n20
TCP port = 9006, number of echoes = 20
time of reply - Cycles [usec.] :
828384[1549] 139615[261] 142050[265] 144799[270] 143895[269]
146760[274] 142760[267] 145951[272] 142816[267] 144384[270]
144657[270] 159474[298] 147504[275] 147113[275] 145257[271]
146866[274] 153215[286] 145461[272] 145013[271] 145311[271]

Результаты очень близкие к максимально возможным! Так же, как и в предыдущих случаях - очень ярко выражен эффект кэширования: вся обработка ведётся на одном и том же, многократно используемом, адресном пространстве.

Посмотрим "чередование" tid обрабатывающих потоков:
Код:
/root/ForkThread # ech21
15315315311731731731731731731776176176176176176176

Хорошо видно, что через некоторое время работы число потоков в пуле стабилизируется на уровне 7-ми ("верхней ватерлинии"). Через какое-то время выполнения состояние пула будет примерно таким:
Код:
/ # pidin
...
10059820 1 ./ech21 10r REPLY 94228
10059820 2 ./ech21 10r REPLY 94228
10059820 3 ./ech21 10r REPLY 94228
10059820 4 ./ech21 10r REPLY 94228
10059820 5 ./ech21 10r REPLY 94228
10059820 6 ./ech21 10r REPLY 94228
10059820 7 ./ech21 10r REPLY 94228
...

Как и предсказывает документация, мы имеем 7 блокированных на accept потоков - по "верхнюю ватерлинию".

Достаточно интересно посмотреть состояния ожидающих сокетов при запущенных всех (или почти всех) видах описанных выше серверов - вот возможное начало таблицы netstat:
Код:
/root/ForkThread # netstat -a
Active Internet connections (including servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp 0 0 *.9005 *.* LISTEN
tcp 0 0 *.9003 *.* LISTEN
tcp 0 0 *.9006 *.* LISTEN
tcp 0 0 *.9002 *.* LISTEN
tcp 0 0 *.9001 *.* LISTEN
tcp 0 0 *.9000 *.* LISTEN
tcp 0 0 *.ech3 *.* LISTEN

10. Итоги. Выше рассмотрено 7 различных альтернативных технологий построения сервера TCP/IP. Сравним средние характеристики вариантов по критерию "время задержки реакции" (представляют интерес только порядки величин, сами значения могут радикально "гулять" в зависимости от конкретного вида серверной функции):
Тип сервера Среднее время обслуживания Время латентности
Последовательный - п.3 135 000 0
fork - п.4 >>1 470 000 >>1 000 000
pre-fork - п.5 133 000 0
inetd - п.6 14 100 000 14 000 000
thread - п.7 267 000 130 000
pre-thread - п.8 140 000 5 000 (~0)
thread pool - п.9 144 000 9 000 (~0)

Тем не менее, не следует категорически руководствоваться выбором той или иной технологии построения сервера только исходя из содержимого показанной выше таблицы. В каждом конкретном случае при выборе решения должно учитываться существенно больше факторов: трудоёмкость реализации, потребление ресурсов, в частности RAM (которое мы никак не затрагиваем в нашем рассмотрении), простота отладки и сопровождения etc.

P.S.

* Все упоминаемые в тексте элементы программного кода, или необходимые для их сборки элементы (Makefile) содержаться в составе прилагаемого проекта echsrv.tgz.
* Материал данного рассмотрения непосредственно произошёл от обсуждения подобных вопросов на форуме http://qnx.org.ru/forum тема fork или thread т.е., в первую очередь, автор приносит свои благодарности всем, принявшим участие в обсуждении. Во-вторых - все обсуждавшие данную тему в форуме, являются соавторами предлагаемого материала в той же мере, как и автор, указанный в титуле статьи, а их полное поимённое перечисление я опускаю только в силу их многочисленности.


Олег И. Цилюрик: olej@front.ru
Information
  • Posted on 31.01.2010 21:51
  • Просмотры: 332