1 /*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fec_private.h"
18
19 struct process_info {
20 int id;
21 fec_handle *f;
22 uint8_t *buf;
23 size_t count;
24 uint64_t offset;
25 read_func func;
26 ssize_t rc;
27 size_t errors;
28 };
29
30 /* thread function */
__process(void * cookie)31 static void * __process(void *cookie)
32 {
33 process_info *p = static_cast<process_info *>(cookie);
34
35 debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
36 p->offset + p->count);
37
38 p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
39 return p;
40 }
41
42 /* launches a maximum number of threads to process a read */
process(fec_handle * f,uint8_t * buf,size_t count,uint64_t offset,read_func func)43 ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
44 read_func func)
45 {
46 check(f);
47 check(buf)
48 check(func);
49
50 if (count == 0) {
51 return 0;
52 }
53
54 int threads = sysconf(_SC_NPROCESSORS_ONLN);
55
56 if (threads < WORK_MIN_THREADS) {
57 threads = WORK_MIN_THREADS;
58 } else if (threads > WORK_MAX_THREADS) {
59 threads = WORK_MAX_THREADS;
60 }
61
62 uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
63 size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE);
64
65 size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
66 size_t max_threads = fec_div_round_up(count, count_per_thread);
67
68 if ((size_t)threads > max_threads) {
69 threads = (int)max_threads;
70 }
71
72 size_t left = count;
73 uint64_t pos = offset;
74 uint64_t end = start + count_per_thread;
75
76 debug("%d threads, %zu bytes per thread (total %zu)", threads,
77 count_per_thread, count);
78
79 std::vector<pthread_t> handles;
80 process_info info[threads];
81 ssize_t rc = 0;
82
83 /* start threads to process queue */
84 for (int i = 0; i < threads; ++i) {
85 check(left > 0);
86
87 info[i].id = i;
88 info[i].f = f;
89 info[i].buf = &buf[pos - offset];
90 info[i].count = (size_t)(end - pos);
91 info[i].offset = pos;
92 info[i].func = func;
93 info[i].rc = -1;
94 info[i].errors = 0;
95
96 if (info[i].count > left) {
97 info[i].count = left;
98 }
99
100 pthread_t thread;
101
102 if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
103 error("failed to create thread: %s", strerror(errno));
104 rc = -1;
105 } else {
106 handles.push_back(thread);
107 }
108
109 pos = end;
110 end += count_per_thread;
111 left -= info[i].count;
112 }
113
114 check(left == 0);
115
116 ssize_t nread = 0;
117
118 /* wait for all threads to complete */
119 for (auto thread : handles) {
120 process_info *p = NULL;
121
122 if (pthread_join(thread, (void **)&p) != 0) {
123 error("failed to join thread: %s", strerror(errno));
124 rc = -1;
125 } else if (!p || p->rc == -1) {
126 rc = -1;
127 } else {
128 nread += p->rc;
129 f->errors += p->errors;
130 }
131 }
132
133 if (rc == -1) {
134 errno = EIO;
135 return -1;
136 }
137
138 return nread;
139 }
140