《APUE》这本书算是个字典了,本文算是对 pthread 的一些总结,主要是对原书中几段代码的解释说明,在读代码中记录知识点,技术含量不高。

线程属性

  首先给出一个以分离状态创建线程的函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
int make_thread(void* (*fn)(void*), void* arg)
{
    int             err;
    pthread_t       tid;
    pthread_attr_t  attr;

    err = pthread_attr_init(&attr);
    if (err != 0)
        return err;
    err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    if (err == 0)
        err = pthread_create(&tid, &attr, fn, arg);
    pthread_attr_destroy(&attr);
    return err;
}

  pthread 接口允许给“线程对象pthread_t”或“线程的同步对象”(如互斥量pthread_mutex_t、条件变量、锁等)关联相应的“属性对象”。使用int pthread_attr_init(pthread_attr_t *attr)可以为 attr 所指向的属性对象初始化内存空间,相应地释放内存使用int pthread_attr_destroy(pthread_attr_t *attr)

  一个属性对象可以代表多个属性。对于每个属性,都有一个设置属性值的函数。例如本例中设置“分离状态 detach state 属性”是使用int pthread_attr_setdetachstate(pthread_attr_t *attr, int *detachstate)

  创建新线程通过调用int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr, void* (*start_rtn)(void*), void *restrict arg)

  • 函数成功返回时,所创建的线程的 ID 会赋值给 tidp 所指,Linux 中线程 ID 用 unsigned long (%lu / 0x%lx)表示,每个线程可以通过调用pthread_t pthread_self()获得自身的 ID。
  • attr 所指的是上述的线程属性对象,用来定制不同的属性。若设为 NULL,则表示默认属性。
  • start_rtn 是新线程开始运行的函数地址。
  • 该函数只有一个 void* 类型参数 arg,指向一个 struct,而这个 struct 中存储着 start_rtn 真正所需的参数,只需要将 arg 强制类型转换成 struct 指针即可。使用 void *arg 传递参数的方法大致如下,struct to_info 就存储着 timeout 函数(见后文)传递给 timeout_helper 函数的参数。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
typedef struct to_info
{
    void            (*to_fn)(void*);
    void *          to_arg;
    struct timespec to_wait;
}to_info;

void* timeout_helper(void* arg)
{
    struct to_info* tip = (struct to_info*)arg;
    clock_nanosleep(CLOCK_REALTIME, 0, &tip->to_wait, NULL);
    (*tip->to_fn)(tip->to_arg);
    free(arg);
    return 0;
}

  一个线程一般使用void pthread_exit(void *rval_ptr)终止,也可以被同一进程的其他线程调用int pthread_cancel(pthread_t tid)取消。rval_ptr 相当于线程的返回值,如果不需要返回值可设为 NULL。但线程终止时,线程的底层存储资源并未立即被回收,可以通过调用int pthread_join(pthread_t tid, void **rval_ptr)阻塞调用线程,直到线程终止并获得其返回值,此时该终止的线程被置于“分离状态”。也可以使用int pthread_detach(pthread_t tid)分离线程。而本例的 make_thread 函数则通过修改传给 pthread_create 函数的线程属性对象,创建了一个开始就处于分离状态的线程,处于分离状态的线程会在退出时立即收回它所占用的资源。

  不仅线程有属性,线程的同步对象也有属性。看这段代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
pthread_mutexattr_t attr;
pthread_mutex_t     mutex;

void retry(void* arg)
{
    printf("require mutex for the 2nd time.\n");
    pthread_mutex_lock(&mutex);
    
    FILE* fp = fopen("./retry.dat", "w+");
    fputs("This is a retry.", fp);
    fclose(fp);
    
    pthread_mutex_unlock(&mutex);
}

int main()
{
    int             err, condition, arg;
    struct timespec when;

    if ((err = pthread_mutexattr_init(&attr)) != 0) {
        fprintf(stderr, "pthread_mutexattr_init failed");
        exit(0);
    }
    if ((err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)) != 0) {
        fprintf(stderr, "set recursive type failed");
        exit(0);
    }
    if ((err = pthread_mutex_init(&mutex, &attr)) != 0) {
        fprintf(stderr, "create recursive mutex failed");
        exit(0);
    }

    condition = 1;
    pthread_mutex_lock(&mutex);

    if (condition) {
        printf("retry when timeout (10s).\n");
        clock_gettime(CLOCK_REALTIME, &when);
        when.tv_sec += 10;
        arg = 0;
        timeout(&when, retry, (void*)((unsigned long)arg));
    }

    pthread_mutex_unlock(&mutex);
    sleep(15);

    return 0;
}

  使用互斥量前,必须进行初始化。可以将其初始化赋值为 PTHREAD_MUTEX_INITIALIZER(只适用于静态分配),也可以使用int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr)进行初始化。互斥量有 3 个常用属性:进程共享属性、健壮属性、类型属性。这里将互斥量设置为“递归类型”,允许同一线程在互斥量解锁前对它进行多次加锁,至于这样设置的原因,稍后分析。

  main 函数中,设置好互斥量后,检查被该互斥量保护的条件 condition(本例中的条件并未具体给明含义)。若条件满足,则调用 timeout 函数,并将 retry 函数及其参数(retry 函数不需要参数,因此直接传 0)传递给 timeout 函数。

  以下是 timeout “超时”函数,用于安排另一个函数 retry 在未来的某个时间运行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#define SEC2NSEC    1000000000
#define USEC2NSEC   1000

void timeout(const struct timespec* when, void (*func)(void*), void* arg)
{
    struct timespec now;
    struct to_info* tip;
    int             err;

    clock_gettime(CLOCK_REALTIME, &now);
    if (when->tv_sec > now.tv_sec ||
            (when->tv_sec == now.tv_sec && when->tv_nsec > now.tv_nsec)) {
        tip = (to_info*)malloc(sizeof(to_info));
        if (tip != NULL) {
            tip->to_fn = func;
            tip->to_arg = arg;
            tip->to_wait.tv_sec = when->tv_sec - now.tv_sec;
            if (when->tv_nsec >= now.tv_nsec) {
                tip->to_wait.tv_nsec = when->tv_nsec - now.tv_nsec;
            }
            else {
                tip->to_wait.tv_sec--;
                tip->to_wait.tv_nsec = SEC2NSEC - now.tv_nsec + when->tv_nsec;
            }
            err = make_thread(timeout_helper, (void*)tip);
            if (err == 0) {
                printf("make_thread success\n");
                return;
            }
            else 
                free(tip);
        }
    }
    (*func)(arg);
}

  由 main 函数中的语句 “when.tv_sec += 10;” 可知,timeout 函数中,when 时刻在 now 时刻大约 10s 之后。timeout 函数创建一个线程,并让该线程执行 timeout_helper 函数,timeout_helper 函数则在等待大约 10s 后开始执行 retry 函数。retry 函数的执行效果就是程序运行约 10s 之后,在当前目录创建了一个 “retry.dat” 文件,而对这个文件的操作受到同一个 mutex 的保护。

  现在解决之前的疑问,为什么要将互斥量 mutex 设置为递归类型?

  这里由于 main 函数中明确指明 when 在 now 的 10s 之后,因此处理顺序是:

  1. main 线程获取 mutex。
  2. main 线程创建一个线程,该线程等待大约 10s。
  3. main 线程释放 mutex,并等待 15s(为了不让进程直接退出)。
  4. 被创建的线程等待 10s 后,执行 retry 函数,获取 mutex,文件操作后释放 mutex。

  但假如以另一种情况进入 timeout 函数,这种情况中 when 时刻在 now 时刻之前,而我们希望在 when 时刻执行某个函数(这里是 retry),即安排函数运行的时间已经过去了,这时候应当直接调用该函数,而不是创建一个线程在那儿等待,即执行到 timeout 函数的最后一条语句。或者假如 timeout 函数中,malloc 分配 struct to_info 的内存失败,或者 make_thread 函数失败,即不能创建线程时,也会直接调用 retry 函数。这种情况的处理顺序是:

  1. main 线程获取 mutex。
  2. main 线程执行 timeout 函数,并接着执行 retry 函数。
  3. main 线程在 retry 函数中再次获取 mutex,文件操作后释放 mutex。
  4. main 线程释放 mutex,并等待 15s。

  可见,如果不将 mutex 设置为允许递归加锁,则在第 3 步时会发生死锁。

线程与信号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int         quit_flag;
sigset_t    mask;

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  wait_loc = PTHREAD_COND_INITIALIZER;

void* thr_fn(void* arg)
{
    int err, signo;
    for (;;) {
        err = sigwait(&mask, &signo);
        switch (signo) {
        case SIGINT:
            printf("\nInterrupt\n");
            break;
        case SIGQUIT:
            pthread_mutex_lock(&lock);
            quit_flag = 1;
            pthread_mutex_unlock(&lock);
            pthread_cond_signal(&wait_loc);
            return 0;
        default:
            printf("Unexpected signal %d\n", signo);
            exit(1);
        }
    }
}

int main()
{
    int         err;
    sigset_t    old_mask;
    pthread_t   tid;

    sigemptyset(&mask);		// 初始化,清除信号集中中所有信号
    sigaddset(&mask, SIGINT);	// 在信号集中添加特定信号
    sigaddset(&mask, SIGQUIT);
    if ((err = pthread_sigmask(SIG_BLOCK, &mask, &old_mask)) != 0) {
        exit(0);
    }
    err = pthread_create(&tid, NULL, thr_fn, 0);
    if (err != 0) {
        exit(0);
    }
    pthread_mutex_lock(&lock);
    while (quit_flag == 0)
        pthread_cond_wait(&wait_loc, &lock);
    pthread_mutex_unlock(&lock);
    
    /* 信号处理线程捕捉到 SIGQUIT,唤醒 main 线程,但 main 线程此时阻塞 SIGQUIT */
    quit_flag = 0;

    /* 重置信号屏蔽字,不再屏蔽 SIGQUIT,main 进程退出 */
    sigprocmask(SIG_SETMASK, &old_mask, NULL);
    exit(0);
}

  信号是软件中断,提供一种处理异步事件的方法。

  产生信号的条件有:

  1. 用户按某些终端键时。
  2. 硬件异常,如除数为0、无效内存引用等。
  3. 进程调用 kill 函数或用户使用 kill 命令发送信号。
  4. 检测到软件条件,如进程所设定时器超时。

  处理信号的方式有:

  1. 忽略信号。SIGKILL 和 SIGSTOP 这两个信号无法被忽略。
  2. 捕捉信号。调用一个用户函数对事件进行处理。
  3. 执行默认动作,大多数信号的默认动作是终止该进程。

  信号名是正整数常量。本例中SIGINT为用户按中断键(Delete 或 Ctrl + C)时终端驱动程序所产生的信号,SIGQUIT为用户按退出键(Ctrl + \)时产生的信号。

  进程和线程都有其信号屏蔽字,规定了当前阻塞而不能递送给该进程 / 线程的的信号集,调用int sigprocmask / pthread_sigmask(int how, const sigset_t *restrict set, sigset_t *restrict oset)可以检测或更改进程 / 线程的信号屏蔽字。

  这段代码的作用是,在 main 线程中屏蔽 SIGINT 和 SIGQUIT 信号,再创建一个新的用于信号处理的线程,该线程继承了 main 线程中的信号屏蔽字,但 sigwait 函数会原子地取消信号集的阻塞状态,在返回之前恢复线程的信号屏蔽字。