现在的位置: 首页 > 综合 > 正文

fastdfs源码分析3-tracker主流程

2013年12月22日 ⁄ 综合 ⁄ 共 7348字 ⁄ 字号 评论关闭
主流程
1,概述:

tracker服务和sotrage的处理方式相似。
(1) 在每个tracker服务线程work_thread_entrance函数中,创建全局管道,并把的读端pipe_fds[0]描述符添加到事件的读监控队列中,并设置消息处理函数recv_notify_read。
(2) 而管道读描述符pipe_fds[0]是由主监控循环tracker_accept_loop来写的,当tcp连接完成后,连接监控主线程会把连接成功的描述符通过pipe_fds[0]写入管道,tracker的work_thread_entrance线程的recv_notify_read函数接受到连接好的socket描述符,再次把它加入到libevent事件读和写队列中。
(3) 而最终来处理任务消息的其实是client_sock_read和client_sock_write函数,分别处理连接端的读和写请求。


2, 基本数据结构

typedef struct
{
int sock;  //socket描述符
int port;  //端口
char ip_addr[IP_ADDRESS_SIZE]; //ip地址
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];  //tracker的组名
} TrackerServerInfo;


typedef struct {
TrackerServerInfo *pTrackerServer;
int running_time;     //running seconds, more means higher weight
int restart_interval; //restart interval, less mean higher weight
bool if_leader;       //if leader
} TrackerRunningStatus;


3, 主函数

int main(int argc, char *argv[])
{
char *conf_filename;
int result;
int sock;
pthread_t schedule_tid;
struct sigaction act;
//默认3个tracker调度进程
ScheduleEntry scheduleEntries[SCHEDULE_ENTRIES_COUNT];
ScheduleArray scheduleArray;

//配置文件
if (argc < 2)
{
printf("Usage: %s <config_file>\n", argv[0]);
return 1;
}

//记录启动时间
g_up_time = time(NULL);
srand(g_up_time);

log_init();

#if defined(DEBUG_FLAG) && defined(OS_LINUX)
if (getExeAbsoluteFilename(argv[0], g_exe_name, \
sizeof(g_exe_name)) == NULL)
{
log_destroy();
return errno != 0 ? errno : ENOENT;
}
#endif

conf_filename = argv[1];
memset(bind_addr, 0, sizeof(bind_addr));
//读取配置文件选项
if ((result=tracker_load_from_conf_file(conf_filename, \
bind_addr, sizeof(bind_addr))) != 0)
{
log_destroy();
return result;
}

if ((result=tracker_load_status_from_file(&g_tracker_last_status)) != 0)
{
log_destroy();
return result;
}

base64_init_ex(&g_base64_context, 0, '-', '_', '.');
if ((result=set_rand_seed()) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"set_rand_seed fail, program exit!", __LINE__);
return result;
}

if ((result=tracker_mem_init()) != 0)
{
log_destroy();
return result;
}

//创建tracker的server socket
sock = socketServer(bind_addr, g_server_port, &result);
if (sock < 0)
{
log_destroy();
return result;
}

//设置tracker socket选项
if ((result=tcpsetserveropt(sock, g_fdfs_network_timeout)) != 0)
{
log_destroy();
return result;
}
//后台执行
daemon_init(true);
umask(0);
if (dup2(g_log_context.log_fd, STDOUT_FILENO) < 0 || \
dup2(g_log_context.log_fd, STDERR_FILENO) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call dup2 fail, errno: %d, error info: %s, " \
"program exit!", __LINE__, errno, STRERROR(errno));
g_continue_flag = false;
return errno;
}
//初始化tracker服务进程
if ((result=tracker_service_init()) != 0)
{
log_destroy();
return result;
}
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
        //设置信号处理函数
act.sa_handler = sigUsrHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}

act.sa_handler = sigHupHandler;
if(sigaction(SIGHUP, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}
act.sa_handler = SIG_IGN;
if(sigaction(SIGPIPE, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}

act.sa_handler = sigQuitHandler;
if(sigaction(SIGINT, &act, NULL) < 0 || \
sigaction(SIGTERM, &act, NULL) < 0 || \
sigaction(SIGQUIT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}

#if defined(DEBUG_FLAG)
/*
#if defined(OS_LINUX)
memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
        act.sa_sigaction = sigSegvHandler;
        act.sa_flags = SA_SIGINFO;
        if (sigaction(SIGSEGV, &act, NULL) < 0 || \
        
sigaction(SIGABRT, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}
#endif
*/

memset(&act, 0, sizeof(act));
sigemptyset(&act.sa_mask);
act.sa_handler = sigDumpHandler;
if(sigaction(SIGUSR1, &act, NULL) < 0 || \
sigaction(SIGUSR2, &act, NULL) < 0)
{
logCrit("file: "__FILE__", line: %d, " \
"call sigaction fail, errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
return errno;
}
#endif

#ifdef WITH_HTTPD
if (!g_http_params.disabled)
{
if ((result=tracker_httpd_start(bind_addr)) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"tracker_httpd_start fail, program exit!", \
__LINE__);
return result;
}

}

if ((result=tracker_http_check_start()) != 0)
{
logCrit("file: "__FILE__", line: %d, " \
"tracker_http_check_start fail, " \
"program exit!", __LINE__);
return result;
}
#endif
//设置运行权限
if ((result=set_run_by(g_run_by_group, g_run_by_user)) != 0)
{
log_destroy();
return result;
}

scheduleArray.entries = scheduleEntries;
scheduleArray.count = SCHEDULE_ENTRIES_COUNT;
//设置调度实体处理函数
memset(scheduleEntries, 0, sizeof(scheduleEntries));
scheduleEntries[0].id = 1;
scheduleEntries[0].time_base.hour = TIME_NONE;
scheduleEntries[0].time_base.minute = TIME_NONE;
scheduleEntries[0].interval = g_sync_log_buff_interval;
scheduleEntries[0].task_func = log_sync_func;
scheduleEntries[0].func_args = &g_log_context;

scheduleEntries[1].id = 2;
scheduleEntries[1].time_base.hour = TIME_NONE;
scheduleEntries[1].time_base.minute = TIME_NONE;
scheduleEntries[1].interval = g_check_active_interval;
scheduleEntries[1].task_func = tracker_mem_check_alive;
scheduleEntries[1].func_args = NULL;

scheduleEntries[2].id = 3;
scheduleEntries[2].time_base.hour = 0;
scheduleEntries[2].time_base.minute = 0;
scheduleEntries[2].interval = TRACKER_SYNC_STATUS_FILE_INTERVAL;
scheduleEntries[2].task_func = tracker_write_status_to_file;
scheduleEntries[2].func_args = NULL;
if ((result=sched_start(&scheduleArray, &schedule_tid, \
g_thread_stack_size, &g_continue_flag)) != 0)
{
log_destroy();
return result;
}
//选主
if ((result=tracker_relationship_init()) != 0)
{
log_destroy();
return result;
}

log_set_cache(true);

bTerminateFlag = false;
bAcceptEndFlag = false;

//接受连接主函数
tracker_accept_loop(sock);
bAcceptEndFlag = true;
if (g_schedule_flag)
{
pthread_kill(schedule_tid, SIGINT);
}
//终止线程
tracker_terminate_threads();

#ifdef WITH_HTTPD
if (g_http_check_flag)
{
tracker_http_check_stop();
}

while (g_http_check_flag)
{
usleep(50000);
}
#endif

while ((g_tracker_thread_count != 0) || g_schedule_flag)
{

/*
#if defined(DEBUG_FLAG) && defined(OS_LINUX)
if (bSegmentFault)
{
sleep(5);
break;
}
#endif
*/

usleep(50000);
}
tracker_mem_destroy();
tracker_service_destroy();
tracker_relationship_destroy();
logInfo("exit nomally.\n");
log_destroy();
return 0;
}

//tracker服务初始化
//tracker服务初始化在tracker_service_init函数中完成,其实就是开启默认是4个工作服务线程。
int tracker_service_init()
{
int result;
struct tracker_thread_data *pThreadData;
struct tracker_thread_data *pDataEnd;
pthread_t tid;
pthread_attr_t thread_attr;
    
        //初始化线程锁
if ((result=init_pthread_lock(&tracker_thread_lock)) != 0)
{
return result;
}
        //初始化
if ((result=init_pthread_lock(&lb_thread_lock)) != 0)
{
return result;
}
        //设置线程属性
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"init_pthread_attr fail, program exit!", __LINE__);
return result;
}
        //队列初始化
if ((result=free_queue_init(g_max_connections, TRACKER_MAX_PACKAGE_SIZE,\
                TRACKER_MAX_PACKAGE_SIZE, sizeof(TrackerClientInfo))) != 0)
{
return result;
}
        //为每个线程分配一个线程结构
g_thread_data = (struct tracker_thread_data *)malloc(sizeof( \
struct tracker_thread_data) * g_work_threads);
if (g_thread_data == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, error info: %s", \
__LINE__, (int)sizeof(struct tracker_thread_data) * \
g_work_threads, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
    
        //遍历N个工作线程结构,创建管道
g_tracker_thread_count = 0;
pDataEnd = g_thread_data + g_work_threads;
for (pThreadData=g_thread_data; pThreadData<pDataEnd; pThreadData++)
{
pThreadData->ev_base = event_base_new();
if (pThreadData->ev_base

抱歉!评论已关闭.