天天看点

ProxySQL源码分析4-线程分析

ProxySQL源码分析4-线程分析
  1. Main thread

    初始化核心模块和线程。Main线程最后一部分用watchdog做mysql work线程和idle线程的heartbeat检查,如果超过20次则重启proxy,如果设置restart,则无限重启。

  2. Admin thread

    核心循环:admin_main_loop,是Admin模块最重要的循环

    创建并监听Admin端口(默认的6032),为每个admin连接创建一个新的线程。

    加载各种配置信息,以及管理配置变更(动态加载、持久化到sqlite)等。

    如果配置HTTP Server,AdminRestApiServer,还会负责启动这些Server并处理web请求。

这里有一个比较trick的事情,admin这个管理账号只能local连接,其他账号可以远程连接,其他这些账号都是一样的管理员权限。如果硬要解释,那就是coder的情怀了~
  1. MySQL workers threads & MySQL idle threads

    worker thread负责处理活跃的客户端mysql请求,是主要的工作线程。

    idle thread主要就是监听非活跃的Session是否有新的请求,然后移交给worker线程处理。

    当活跃连接远小于非活跃连接的时候,idle和worker搭配可以很好地提高性能。官网数据可处理连接能到1百万。

    idle thread和work thread是1:1创建的。其交互流程入下图所示。

    ProxySQL源码分析4-线程分析
  2. Monitor threads

    产生5类生产者线程:connection checks、ping checks、read-only checks、replication lag checks、group replication monitoring

    调度消费线程池:一方面负责check任务; 另一方面就是处理上面的check结果,比如ping失败之类的。处理主要是通过更新 MyHGM->p_update_mysql_error_counter状态来实现,如下代码。

void *monitor_ping_thread(void *arg) {
    MySQL_Monitor_State_Data *mmsd = (MySQL_Monitor_State_Data *) arg;
 
    if (mmsd->interr) { // ping failed
        MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname,
                                            mmsd->port, mysql_errno(mmsd->mysql));
    } else {
        if (crc == false) {
            GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
            mmsd->mysql = NULL;
        }
    }
    ...
    if (ping_success) {
        __sync_fetch_and_add(&GloMyMon->ping_check_OK, 1);
    } else {
        __sync_fetch_and_add(&GloMyMon->ping_check_ERR, 1);
    }
}
           
  1. Query Cache purge thread

    对query cache执行一个垃圾回收的功能

// ProxySQL_Admin::load_proxysql_servers_to_runtime(bool _lock)
// char *query=(char *)"SELECT hostname, port, weight, comment FROM proxysql_servers ORDER BY hostname, port";
void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset, bool _lock) {
    for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
        ...
        if (ite == umap_proxy_nodes.end()) {
            ...
            // 每个Server都创建所有连接
            if (pthread_create(&a->thrid, &attr, ProxySQL_Cluster_Monitor_thread, (void *) a) != 0) {
                ...
            }
        } else {
            ...
        }
    }
}