LCOV - code coverage report
Current view: top level - lib/pthreadpool - pthreadpool_pipe.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 59 92 64.1 %
Date: 2024-04-21 15:09:00 Functions: 7 7 100.0 %

          Line data    Source code
       1             : /*
       2             :  * Unix SMB/CIFS implementation.
       3             :  * threadpool implementation based on pthreads
       4             :  * Copyright (C) Volker Lendecke 2009,2011
       5             :  *
       6             :  * This program is free software; you can redistribute it and/or modify
       7             :  * it under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation; either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  *
      11             :  * This program is distributed in the hope that it will be useful,
      12             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  * GNU General Public License for more details.
      15             :  *
      16             :  * You should have received a copy of the GNU General Public License
      17             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             :  */
      19             : 
      20             : #include "replace.h"
      21             : #include "system/filesys.h"
      22             : #include "pthreadpool_pipe.h"
      23             : #include "pthreadpool.h"
      24             : 
      25             : struct pthreadpool_pipe {
      26             :         struct pthreadpool *pool;
      27             :         int num_jobs;
      28             :         pid_t pid;
      29             :         int pipe_fds[2];
      30             : };
      31             : 
      32             : static int pthreadpool_pipe_signal(int jobid,
      33             :                                    void (*job_fn)(void *private_data),
      34             :                                    void *job_private_data,
      35             :                                    void *private_data);
      36             : 
      37           6 : int pthreadpool_pipe_init(unsigned max_threads,
      38             :                           struct pthreadpool_pipe **presult)
      39             : {
      40           6 :         struct pthreadpool_pipe *pool;
      41           6 :         int ret;
      42             : 
      43           6 :         pool = calloc(1, sizeof(struct pthreadpool_pipe));
      44           6 :         if (pool == NULL) {
      45           0 :                 return ENOMEM;
      46             :         }
      47           6 :         pool->pid = getpid();
      48             : 
      49           6 :         ret = pipe(pool->pipe_fds);
      50           6 :         if (ret == -1) {
      51           0 :                 int err = errno;
      52           0 :                 free(pool);
      53           0 :                 return err;
      54             :         }
      55             : 
      56           6 :         ret = pthreadpool_init(max_threads, &pool->pool,
      57             :                                pthreadpool_pipe_signal, pool);
      58           6 :         if (ret != 0) {
      59           0 :                 close(pool->pipe_fds[0]);
      60           0 :                 close(pool->pipe_fds[1]);
      61           0 :                 free(pool);
      62           0 :                 return ret;
      63             :         }
      64             : 
      65           6 :         *presult = pool;
      66           6 :         return 0;
      67             : }
      68             : 
      69        9841 : static int pthreadpool_pipe_signal(int jobid,
      70             :                                    void (*job_fn)(void *private_data),
      71             :                                    void *job_private_data,
      72             :                                    void *private_data)
      73             : {
      74        9841 :         struct pthreadpool_pipe *pool = private_data;
      75        9841 :         ssize_t written;
      76             : 
      77        9841 :         do {
      78        9841 :                 written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
      79        9931 :         } while ((written == -1) && (errno == EINTR));
      80             : 
      81        9931 :         if (written != sizeof(jobid)) {
      82           0 :                 return errno;
      83             :         }
      84             : 
      85           0 :         return 0;
      86             : }
      87             : 
      88           6 : int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
      89             : {
      90           6 :         int ret;
      91             : 
      92           6 :         if (pool->num_jobs != 0) {
      93           0 :                 return EBUSY;
      94             :         }
      95             : 
      96           5 :         ret = pthreadpool_destroy(pool->pool);
      97           5 :         if (ret != 0) {
      98           0 :                 return ret;
      99             :         }
     100             : 
     101           5 :         close(pool->pipe_fds[0]);
     102           5 :         pool->pipe_fds[0] = -1;
     103             : 
     104           5 :         close(pool->pipe_fds[1]);
     105           5 :         pool->pipe_fds[1] = -1;
     106             : 
     107           5 :         free(pool);
     108           5 :         return 0;
     109             : }
     110             : 
     111       10004 : static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
     112             : {
     113       10004 :         pid_t pid = getpid();
     114       10004 :         int signal_fd;
     115       10004 :         int ret;
     116             : 
     117       10004 :         if (pid == pool->pid) {
     118           0 :                 return 0;
     119             :         }
     120             : 
     121           0 :         signal_fd = pool->pipe_fds[0];
     122             : 
     123           0 :         close(pool->pipe_fds[0]);
     124           0 :         pool->pipe_fds[0] = -1;
     125             : 
     126           0 :         close(pool->pipe_fds[1]);
     127           0 :         pool->pipe_fds[1] = -1;
     128             : 
     129           0 :         ret = pipe(pool->pipe_fds);
     130           0 :         if (ret != 0) {
     131           0 :                 return errno;
     132             :         }
     133             : 
     134           0 :         ret = dup2(pool->pipe_fds[0], signal_fd);
     135           0 :         if (ret != 0) {
     136           0 :                 return errno;
     137             :         }
     138             : 
     139           0 :         pool->pipe_fds[0] = signal_fd;
     140           0 :         pool->num_jobs = 0;
     141             : 
     142           0 :         return 0;
     143             : }
     144             : 
     145       10004 : int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
     146             :                              void (*fn)(void *private_data),
     147             :                              void *private_data)
     148             : {
     149       10004 :         int ret;
     150             : 
     151       10004 :         ret = pthreadpool_pipe_reinit(pool);
     152       10004 :         if (ret != 0) {
     153           0 :                 return ret;
     154             :         }
     155             : 
     156       10004 :         ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
     157       10004 :         if (ret != 0) {
     158           0 :                 return ret;
     159             :         }
     160             : 
     161       10004 :         pool->num_jobs += 1;
     162             : 
     163       10004 :         return 0;
     164             : }
     165             : 
     166           2 : int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
     167             : {
     168           2 :         return pool->pipe_fds[0];
     169             : }
     170             : 
     171       10003 : int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
     172             :                                    unsigned num_jobids)
     173             : {
     174       10003 :         ssize_t to_read, nread, num_jobs;
     175       10003 :         pid_t pid = getpid();
     176             : 
     177       10003 :         if (pool->pid != pid) {
     178           0 :                 return EINVAL;
     179             :         }
     180             : 
     181       10003 :         to_read = sizeof(int) * num_jobids;
     182             : 
     183       10003 :         do {
     184       10003 :                 nread = read(pool->pipe_fds[0], jobids, to_read);
     185       10003 :         } while ((nread == -1) && (errno == EINTR));
     186             : 
     187       10003 :         if (nread == -1) {
     188           0 :                 return -errno;
     189             :         }
     190       10003 :         if ((nread % sizeof(int)) != 0) {
     191           0 :                 return -EINVAL;
     192             :         }
     193             : 
     194       10003 :         num_jobs = nread / sizeof(int);
     195             : 
     196       10003 :         if (num_jobs > pool->num_jobs) {
     197           0 :                 return -EINVAL;
     198             :         }
     199       10003 :         pool->num_jobs -= num_jobs;
     200             : 
     201       10003 :         return num_jobs;
     202             : }

Generated by: LCOV version 1.14