1// Copyright 2017 Google Inc. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package finder 16 17import ( 18 "bufio" 19 "bytes" 20 "encoding/json" 21 "errors" 22 "fmt" 23 "io" 24 "os" 25 "path/filepath" 26 "runtime" 27 "sort" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "android/soong/finder/fs" 34) 35 36// This file provides a Finder struct that can quickly search for files satisfying 37// certain criteria. 38// This Finder gets its speed partially from parallelism and partially from caching. 39// If a Stat call returns the same result as last time, then it means Finder 40// can skip the ReadDir call for that dir. 41 42// The primary data structure used by the finder is the field Finder.nodes , 43// which is a tree of nodes of type *pathMap . 44// Each node represents a directory on disk, along with its stats, subdirectories, 45// and contained files. 46 47// The common use case for the Finder is that the caller creates a Finder and gives 48// it the same query that was given to it in the previous execution. 49// In this situation, the major events that take place are: 50// 1. The Finder begins to load its db 51// 2. The Finder begins to stat the directories mentioned in its db (using multiple threads) 52// Calling Stat on each of these directories is generally a large fraction of the total time 53// 3. The Finder begins to construct a separate tree of nodes in each of its threads 54// 4. The Finder merges the individual node trees into the main node tree 55// 5. The Finder may call ReadDir a few times if there are a few directories that are out-of-date 56// These ReadDir calls might prompt additional Stat calls, etc 57// 6. The Finder waits for all loading to complete 58// 7. The Finder searches the cache for files matching the user's query (using multiple threads) 59 60// These are the invariants regarding concurrency: 61// 1. The public methods of Finder are threadsafe. 62// The public methods are only performance-optimized for one caller at a time, however. 63// For the moment, multiple concurrent callers shouldn't expect any better performance than 64// multiple serial callers. 65// 2. While building the node tree, only one thread may ever access the <children> collection of a 66// *pathMap at once. 67// a) The thread that accesses the <children> collection is the thread that discovers the 68// children (by reading them from the cache or by having received a response to ReadDir). 69// 1) Consequently, the thread that discovers the children also spawns requests to stat 70// subdirs. 71// b) Consequently, while building the node tree, no thread may do a lookup of its 72// *pathMap via filepath because another thread may be adding children to the 73// <children> collection of an ancestor node. Additionally, in rare cases, another thread 74// may be removing children from an ancestor node if the children were only discovered to 75// be irrelevant after calling ReadDir (which happens if a prune-file was just added). 76// 3. No query will begin to be serviced until all loading (both reading the db 77// and scanning the filesystem) is complete. 78// Tests indicate that it only takes about 10% as long to search the in-memory cache as to 79// generate it, making this not a huge loss in performance. 80// 4. The parsing of the db and the initial setup of the pathMap tree must complete before 81// beginning to call listDirSync (because listDirSync can create new entries in the pathMap) 82 83// see cmd/finder.go or finder_test.go for usage examples 84 85// Update versionString whenever making a backwards-incompatible change to the cache file format 86const versionString = "Android finder version 1" 87 88// a CacheParams specifies which files and directories the user wishes be scanned and 89// potentially added to the cache 90type CacheParams struct { 91 // WorkingDirectory is used as a base for any relative file paths given to the Finder 92 WorkingDirectory string 93 94 // RootDirs are the root directories used to initiate the search 95 RootDirs []string 96 97 // ExcludeDirs are directory names that if encountered are removed from the search 98 ExcludeDirs []string 99 100 // PruneFiles are file names that if encountered prune their entire directory 101 // (including siblings) 102 PruneFiles []string 103 104 // IncludeFiles are file names to include as matches 105 IncludeFiles []string 106} 107 108// a cacheConfig stores the inputs that determine what should be included in the cache 109type cacheConfig struct { 110 CacheParams 111 112 // FilesystemView is a unique identifier telling which parts of which file systems 113 // are readable by the Finder. In practice its value is essentially username@hostname. 114 // FilesystemView is set to ensure that a cache file copied to another host or 115 // found by another user doesn't inadvertently get reused. 116 FilesystemView string 117} 118 119func (p *cacheConfig) Dump() ([]byte, error) { 120 bytes, err := json.Marshal(p) 121 return bytes, err 122} 123 124// a cacheMetadata stores version information about the cache 125type cacheMetadata struct { 126 // The Version enables the Finder to determine whether it can even parse the file 127 // If the version changes, the entire cache file must be regenerated 128 Version string 129 130 // The CacheParams enables the Finder to determine whether the parameters match 131 // If the CacheParams change, the Finder can choose how much of the cache file to reuse 132 // (although in practice, the Finder will probably choose to ignore the entire file anyway) 133 Config cacheConfig 134} 135 136type Logger interface { 137 Output(calldepth int, s string) error 138} 139 140// the Finder is the main struct that callers will want to use 141type Finder struct { 142 // configuration 143 DbPath string 144 numDbLoadingThreads int 145 numSearchingThreads int 146 cacheMetadata cacheMetadata 147 logger Logger 148 filesystem fs.FileSystem 149 150 // temporary state 151 threadPool *threadPool 152 mutex sync.Mutex 153 fsErrs []fsErr 154 errlock sync.Mutex 155 shutdownWaitgroup sync.WaitGroup 156 157 // non-temporary state 158 modifiedFlag int32 159 nodes pathMap 160} 161 162var defaultNumThreads = runtime.NumCPU() * 2 163 164// New creates a new Finder for use 165func New(cacheParams CacheParams, filesystem fs.FileSystem, 166 logger Logger, dbPath string) (f *Finder, err error) { 167 return newImpl(cacheParams, filesystem, logger, dbPath, defaultNumThreads) 168} 169 170// newImpl is like New but accepts more params 171func newImpl(cacheParams CacheParams, filesystem fs.FileSystem, 172 logger Logger, dbPath string, numThreads int) (f *Finder, err error) { 173 numDbLoadingThreads := numThreads 174 numSearchingThreads := numThreads 175 176 metadata := cacheMetadata{ 177 Version: versionString, 178 Config: cacheConfig{ 179 CacheParams: cacheParams, 180 FilesystemView: filesystem.ViewId(), 181 }, 182 } 183 184 f = &Finder{ 185 numDbLoadingThreads: numDbLoadingThreads, 186 numSearchingThreads: numSearchingThreads, 187 cacheMetadata: metadata, 188 logger: logger, 189 filesystem: filesystem, 190 191 nodes: *newPathMap("/"), 192 DbPath: dbPath, 193 194 shutdownWaitgroup: sync.WaitGroup{}, 195 } 196 197 f.loadFromFilesystem() 198 199 // check for any filesystem errors 200 err = f.getErr() 201 if err != nil { 202 return nil, err 203 } 204 205 // confirm that every path mentioned in the CacheConfig exists 206 for _, path := range cacheParams.RootDirs { 207 if !filepath.IsAbs(path) { 208 path = filepath.Join(f.cacheMetadata.Config.WorkingDirectory, path) 209 } 210 node := f.nodes.GetNode(filepath.Clean(path), false) 211 if node == nil || node.ModTime == 0 { 212 return nil, fmt.Errorf("path %v was specified to be included in the cache but does not exist\n", path) 213 } 214 } 215 216 return f, nil 217} 218 219// FindNamed searches for every cached file 220func (f *Finder) FindAll() []string { 221 return f.FindAt("/") 222} 223 224// FindNamed searches for every cached file under <rootDir> 225func (f *Finder) FindAt(rootDir string) []string { 226 filter := func(entries DirEntries) (dirNames []string, fileNames []string) { 227 return entries.DirNames, entries.FileNames 228 } 229 return f.FindMatching(rootDir, filter) 230} 231 232// FindNamed searches for every cached file named <fileName> 233func (f *Finder) FindNamed(fileName string) []string { 234 return f.FindNamedAt("/", fileName) 235} 236 237// FindNamedAt searches under <rootPath> for every file named <fileName> 238// The reason a caller might use FindNamedAt instead of FindNamed is if they want 239// to limit their search to a subset of the cache 240func (f *Finder) FindNamedAt(rootPath string, fileName string) []string { 241 filter := func(entries DirEntries) (dirNames []string, fileNames []string) { 242 matches := []string{} 243 for _, foundName := range entries.FileNames { 244 if foundName == fileName { 245 matches = append(matches, foundName) 246 } 247 } 248 return entries.DirNames, matches 249 250 } 251 return f.FindMatching(rootPath, filter) 252} 253 254// FindFirstNamed searches for every file named <fileName> 255// Whenever it finds a match, it stops search subdirectories 256func (f *Finder) FindFirstNamed(fileName string) []string { 257 return f.FindFirstNamedAt("/", fileName) 258} 259 260// FindFirstNamedAt searches for every file named <fileName> 261// Whenever it finds a match, it stops search subdirectories 262func (f *Finder) FindFirstNamedAt(rootPath string, fileName string) []string { 263 filter := func(entries DirEntries) (dirNames []string, fileNames []string) { 264 matches := []string{} 265 for _, foundName := range entries.FileNames { 266 if foundName == fileName { 267 matches = append(matches, foundName) 268 } 269 } 270 271 if len(matches) > 0 { 272 return []string{}, matches 273 } 274 return entries.DirNames, matches 275 } 276 return f.FindMatching(rootPath, filter) 277} 278 279// FindMatching is the most general exported function for searching for files in the cache 280// The WalkFunc will be invoked repeatedly and is expected to modify the provided DirEntries 281// in place, removing file paths and directories as desired. 282// WalkFunc will be invoked potentially many times in parallel, and must be threadsafe. 283func (f *Finder) FindMatching(rootPath string, filter WalkFunc) []string { 284 // set up some parameters 285 scanStart := time.Now() 286 var isRel bool 287 workingDir := f.cacheMetadata.Config.WorkingDirectory 288 289 isRel = !filepath.IsAbs(rootPath) 290 if isRel { 291 rootPath = filepath.Join(workingDir, rootPath) 292 } 293 294 rootPath = filepath.Clean(rootPath) 295 296 // ensure nothing else is using the Finder 297 f.verbosef("FindMatching waiting for finder to be idle\n") 298 f.lock() 299 defer f.unlock() 300 301 node := f.nodes.GetNode(rootPath, false) 302 if node == nil { 303 f.verbosef("No data for path %v ; apparently not included in cache params: %v\n", 304 rootPath, f.cacheMetadata.Config.CacheParams) 305 // path is not found; don't do a search 306 return []string{} 307 } 308 309 // search for matching files 310 f.verbosef("Finder finding %v using cache\n", rootPath) 311 results := f.findInCacheMultithreaded(node, filter, f.numSearchingThreads) 312 313 // format and return results 314 if isRel { 315 for i := 0; i < len(results); i++ { 316 results[i] = strings.Replace(results[i], workingDir+"/", "", 1) 317 } 318 } 319 sort.Strings(results) 320 f.verbosef("Found %v files under %v in %v using cache\n", 321 len(results), rootPath, time.Since(scanStart)) 322 return results 323} 324 325// Shutdown declares that the finder is no longer needed and waits for its cleanup to complete 326// Currently, that only entails waiting for the database dump to complete. 327func (f *Finder) Shutdown() { 328 f.WaitForDbDump() 329} 330 331// WaitForDbDump returns once the database has been written to f.DbPath. 332func (f *Finder) WaitForDbDump() { 333 f.shutdownWaitgroup.Wait() 334} 335 336// End of public api 337 338func (f *Finder) goDumpDb() { 339 if f.wasModified() { 340 f.shutdownWaitgroup.Add(1) 341 go func() { 342 err := f.dumpDb() 343 if err != nil { 344 f.verbosef("%v\n", err) 345 } 346 f.shutdownWaitgroup.Done() 347 }() 348 } else { 349 f.verbosef("Skipping dumping unmodified db\n") 350 } 351} 352 353// joinCleanPaths is like filepath.Join but is faster because 354// joinCleanPaths doesn't have to support paths ending in "/" or containing ".." 355func joinCleanPaths(base string, leaf string) string { 356 if base == "" { 357 return leaf 358 } 359 if base == "/" { 360 return base + leaf 361 } 362 if leaf == "" { 363 return base 364 } 365 return base + "/" + leaf 366} 367 368func (f *Finder) verbosef(format string, args ...interface{}) { 369 f.logger.Output(2, fmt.Sprintf(format, args...)) 370} 371 372// loadFromFilesystem populates the in-memory cache based on the contents of the filesystem 373func (f *Finder) loadFromFilesystem() { 374 f.threadPool = newThreadPool(f.numDbLoadingThreads) 375 376 err := f.startFromExternalCache() 377 if err != nil { 378 f.startWithoutExternalCache() 379 } 380 381 f.goDumpDb() 382 383 f.threadPool = nil 384} 385 386func (f *Finder) startFind(path string) { 387 if !filepath.IsAbs(path) { 388 path = filepath.Join(f.cacheMetadata.Config.WorkingDirectory, path) 389 } 390 node := f.nodes.GetNode(path, true) 391 f.statDirAsync(node) 392} 393 394func (f *Finder) lock() { 395 f.mutex.Lock() 396} 397 398func (f *Finder) unlock() { 399 f.mutex.Unlock() 400} 401 402// a statResponse is the relevant portion of the response from the filesystem to a Stat call 403type statResponse struct { 404 ModTime int64 405 Inode uint64 406 Device uint64 407} 408 409// a pathAndStats stores a path and its stats 410type pathAndStats struct { 411 statResponse 412 413 Path string 414} 415 416// a dirFullInfo stores all of the relevant information we know about a directory 417type dirFullInfo struct { 418 pathAndStats 419 420 FileNames []string 421} 422 423// a PersistedDirInfo is the information about a dir that we save to our cache on disk 424type PersistedDirInfo struct { 425 // These field names are short because they are repeated many times in the output json file 426 P string // path 427 T int64 // modification time 428 I uint64 // inode number 429 F []string // relevant filenames contained 430} 431 432// a PersistedDirs is the information that we persist for a group of dirs 433type PersistedDirs struct { 434 // the device on which each directory is stored 435 Device uint64 436 // the common root path to which all contained dirs are relative 437 Root string 438 // the directories themselves 439 Dirs []PersistedDirInfo 440} 441 442// a CacheEntry is the smallest unit that can be read and parsed from the cache (on disk) at a time 443type CacheEntry []PersistedDirs 444 445// a DirEntries lists the files and directories contained directly within a specific directory 446type DirEntries struct { 447 Path string 448 449 // elements of DirNames are just the dir names; they don't include any '/' character 450 DirNames []string 451 // elements of FileNames are just the file names; they don't include '/' character 452 FileNames []string 453} 454 455// a WalkFunc is the type that is passed into various Find functions for determining which 456// directories the caller wishes be walked. The WalkFunc is expected to decide which 457// directories to walk and which files to consider as matches to the original query. 458type WalkFunc func(DirEntries) (dirs []string, files []string) 459 460// a mapNode stores the relevant stats about a directory to be stored in a pathMap 461type mapNode struct { 462 statResponse 463 FileNames []string 464} 465 466// a pathMap implements the directory tree structure of nodes 467type pathMap struct { 468 mapNode 469 470 path string 471 472 children map[string]*pathMap 473 474 // number of descendent nodes, including self 475 approximateNumDescendents int 476} 477 478func newPathMap(path string) *pathMap { 479 result := &pathMap{path: path, children: make(map[string]*pathMap, 4), 480 approximateNumDescendents: 1} 481 return result 482} 483 484// GetNode returns the node at <path> 485func (m *pathMap) GetNode(path string, createIfNotFound bool) *pathMap { 486 if len(path) > 0 && path[0] == '/' { 487 path = path[1:] 488 } 489 490 node := m 491 for { 492 if path == "" { 493 return node 494 } 495 496 index := strings.Index(path, "/") 497 var firstComponent string 498 if index >= 0 { 499 firstComponent = path[:index] 500 path = path[index+1:] 501 } else { 502 firstComponent = path 503 path = "" 504 } 505 506 child, found := node.children[firstComponent] 507 508 if !found { 509 if createIfNotFound { 510 child = node.newChild(firstComponent) 511 } else { 512 return nil 513 } 514 } 515 516 node = child 517 } 518} 519 520func (m *pathMap) newChild(name string) (child *pathMap) { 521 path := joinCleanPaths(m.path, name) 522 newChild := newPathMap(path) 523 m.children[name] = newChild 524 525 return m.children[name] 526} 527 528func (m *pathMap) UpdateNumDescendents() int { 529 count := 1 530 for _, child := range m.children { 531 count += child.approximateNumDescendents 532 } 533 m.approximateNumDescendents = count 534 return count 535} 536 537func (m *pathMap) UpdateNumDescendentsRecursive() { 538 for _, child := range m.children { 539 child.UpdateNumDescendentsRecursive() 540 } 541 m.UpdateNumDescendents() 542} 543 544func (m *pathMap) MergeIn(other *pathMap) { 545 for key, theirs := range other.children { 546 ours, found := m.children[key] 547 if found { 548 ours.MergeIn(theirs) 549 } else { 550 m.children[key] = theirs 551 } 552 } 553 if other.ModTime != 0 { 554 m.mapNode = other.mapNode 555 } 556 m.UpdateNumDescendents() 557} 558 559func (m *pathMap) DumpAll() []dirFullInfo { 560 results := []dirFullInfo{} 561 m.dumpInto("", &results) 562 return results 563} 564 565func (m *pathMap) dumpInto(path string, results *[]dirFullInfo) { 566 *results = append(*results, 567 dirFullInfo{ 568 pathAndStats{statResponse: m.statResponse, Path: path}, 569 m.FileNames}, 570 ) 571 for key, child := range m.children { 572 childPath := joinCleanPaths(path, key) 573 if len(childPath) == 0 || childPath[0] != '/' { 574 childPath = "/" + childPath 575 } 576 child.dumpInto(childPath, results) 577 } 578} 579 580// a semaphore can be locked by up to <capacity> callers at once 581type semaphore struct { 582 pool chan bool 583} 584 585func newSemaphore(capacity int) *semaphore { 586 return &semaphore{pool: make(chan bool, capacity)} 587} 588 589func (l *semaphore) Lock() { 590 l.pool <- true 591} 592 593func (l *semaphore) Unlock() { 594 <-l.pool 595} 596 597// A threadPool runs goroutines and supports throttling and waiting. 598// Without throttling, Go may exhaust the maximum number of various resources, such as 599// threads or file descriptors, and crash the program. 600type threadPool struct { 601 receivedRequests sync.WaitGroup 602 activeRequests semaphore 603} 604 605func newThreadPool(maxNumConcurrentThreads int) *threadPool { 606 return &threadPool{ 607 receivedRequests: sync.WaitGroup{}, 608 activeRequests: *newSemaphore(maxNumConcurrentThreads), 609 } 610} 611 612// Run requests to run the given function in its own goroutine 613func (p *threadPool) Run(function func()) { 614 p.receivedRequests.Add(1) 615 // If Run() was called from within a goroutine spawned by this threadPool, 616 // then we may need to return from Run() before having capacity to actually 617 // run <function>. 618 // 619 // It's possible that the body of <function> contains a statement (such as a syscall) 620 // that will cause Go to pin it to a thread, or will contain a statement that uses 621 // another resource that is in short supply (such as a file descriptor), so we can't 622 // actually run <function> until we have capacity. 623 // 624 // However, the semaphore used for synchronization is implemented via a channel and 625 // shouldn't require a new thread for each access. 626 go func() { 627 p.activeRequests.Lock() 628 function() 629 p.activeRequests.Unlock() 630 p.receivedRequests.Done() 631 }() 632} 633 634// Wait waits until all goroutines are done, just like sync.WaitGroup's Wait 635func (p *threadPool) Wait() { 636 p.receivedRequests.Wait() 637} 638 639type fsErr struct { 640 path string 641 err error 642} 643 644func (e fsErr) String() string { 645 return e.path + ": " + e.err.Error() 646} 647 648func (f *Finder) serializeCacheEntry(dirInfos []dirFullInfo) ([]byte, error) { 649 // group each dirFullInfo by its Device, to avoid having to repeat it in the output 650 dirsByDevice := map[uint64][]PersistedDirInfo{} 651 for _, entry := range dirInfos { 652 _, found := dirsByDevice[entry.Device] 653 if !found { 654 dirsByDevice[entry.Device] = []PersistedDirInfo{} 655 } 656 dirsByDevice[entry.Device] = append(dirsByDevice[entry.Device], 657 PersistedDirInfo{P: entry.Path, T: entry.ModTime, I: entry.Inode, F: entry.FileNames}) 658 } 659 660 cacheEntry := CacheEntry{} 661 662 for device, infos := range dirsByDevice { 663 // find common prefix 664 prefix := "" 665 if len(infos) > 0 { 666 prefix = infos[0].P 667 } 668 for _, info := range infos { 669 for !strings.HasPrefix(info.P+"/", prefix+"/") { 670 prefix = filepath.Dir(prefix) 671 if prefix == "/" { 672 break 673 } 674 } 675 } 676 // remove common prefix 677 for i := range infos { 678 suffix := strings.Replace(infos[i].P, prefix, "", 1) 679 if len(suffix) > 0 && suffix[0] == '/' { 680 suffix = suffix[1:] 681 } 682 infos[i].P = suffix 683 } 684 685 // turn the map (keyed by device) into a list of structs with labeled fields 686 // this is to improve readability of the output 687 cacheEntry = append(cacheEntry, PersistedDirs{Device: device, Root: prefix, Dirs: infos}) 688 } 689 690 // convert to json. 691 // it would save some space to use a different format than json for the db file, 692 // but the space and time savings are small, and json is easy for humans to read 693 bytes, err := json.Marshal(cacheEntry) 694 return bytes, err 695} 696 697func (f *Finder) parseCacheEntry(bytes []byte) ([]dirFullInfo, error) { 698 var cacheEntry CacheEntry 699 err := json.Unmarshal(bytes, &cacheEntry) 700 if err != nil { 701 return nil, err 702 } 703 704 // convert from a CacheEntry to a []dirFullInfo (by copying a few fields) 705 capacity := 0 706 for _, element := range cacheEntry { 707 capacity += len(element.Dirs) 708 } 709 nodes := make([]dirFullInfo, capacity) 710 count := 0 711 for _, element := range cacheEntry { 712 for _, dir := range element.Dirs { 713 path := joinCleanPaths(element.Root, dir.P) 714 715 nodes[count] = dirFullInfo{ 716 pathAndStats: pathAndStats{ 717 statResponse: statResponse{ 718 ModTime: dir.T, Inode: dir.I, Device: element.Device, 719 }, 720 Path: path}, 721 FileNames: dir.F} 722 count++ 723 } 724 } 725 return nodes, nil 726} 727 728// We use the following separator byte to distinguish individually parseable blocks of json 729// because we know this separator won't appear in the json that we're parsing. 730// 731// The newline byte can only appear in a UTF-8 stream if the newline character appears, because: 732// - The newline character is encoded as "0000 1010" in binary ("0a" in hex) 733// - UTF-8 dictates that bytes beginning with a "0" bit are never emitted as part of a multibyte 734// character. 735// 736// We know that the newline character will never appear in our json string, because: 737// - If a newline character appears as part of a data string, then json encoding will 738// emit two characters instead: '\' and 'n'. 739// - The json encoder that we use doesn't emit the optional newlines between any of its 740// other outputs. 741const lineSeparator = byte('\n') 742 743func (f *Finder) readLine(reader *bufio.Reader) ([]byte, error) { 744 return reader.ReadBytes(lineSeparator) 745} 746 747// validateCacheHeader reads the cache header from cacheReader and tells whether the cache is compatible with this Finder 748func (f *Finder) validateCacheHeader(cacheReader *bufio.Reader) bool { 749 cacheVersionBytes, err := f.readLine(cacheReader) 750 if err != nil { 751 f.verbosef("Failed to read database header; database is invalid\n") 752 return false 753 } 754 if len(cacheVersionBytes) > 0 && cacheVersionBytes[len(cacheVersionBytes)-1] == lineSeparator { 755 cacheVersionBytes = cacheVersionBytes[:len(cacheVersionBytes)-1] 756 } 757 cacheVersionString := string(cacheVersionBytes) 758 currentVersion := f.cacheMetadata.Version 759 if cacheVersionString != currentVersion { 760 f.verbosef("Version changed from %q to %q, database is not applicable\n", cacheVersionString, currentVersion) 761 return false 762 } 763 764 cacheParamBytes, err := f.readLine(cacheReader) 765 if err != nil { 766 f.verbosef("Failed to read database search params; database is invalid\n") 767 return false 768 } 769 770 if len(cacheParamBytes) > 0 && cacheParamBytes[len(cacheParamBytes)-1] == lineSeparator { 771 cacheParamBytes = cacheParamBytes[:len(cacheParamBytes)-1] 772 } 773 774 currentParamBytes, err := f.cacheMetadata.Config.Dump() 775 if err != nil { 776 panic("Finder failed to serialize its parameters") 777 } 778 cacheParamString := string(cacheParamBytes) 779 currentParamString := string(currentParamBytes) 780 if cacheParamString != currentParamString { 781 f.verbosef("Params changed from %q to %q, database is not applicable\n", cacheParamString, currentParamString) 782 return false 783 } 784 return true 785} 786 787// loadBytes compares the cache info in <data> to the state of the filesystem 788// loadBytes returns a map representing <data> and also a slice of dirs that need to be re-walked 789func (f *Finder) loadBytes(id int, data []byte) (m *pathMap, dirsToWalk []string, err error) { 790 791 helperStartTime := time.Now() 792 793 cachedNodes, err := f.parseCacheEntry(data) 794 if err != nil { 795 return nil, nil, fmt.Errorf("Failed to parse block %v: %v\n", id, err.Error()) 796 } 797 798 unmarshalDate := time.Now() 799 f.verbosef("Unmarshaled %v objects for %v in %v\n", 800 len(cachedNodes), id, unmarshalDate.Sub(helperStartTime)) 801 802 tempMap := newPathMap("/") 803 stats := make([]statResponse, len(cachedNodes)) 804 805 for i, node := range cachedNodes { 806 // check the file system for an updated timestamp 807 stats[i] = f.statDirSync(node.Path) 808 } 809 810 dirsToWalk = []string{} 811 for i, cachedNode := range cachedNodes { 812 updated := stats[i] 813 // save the cached value 814 container := tempMap.GetNode(cachedNode.Path, true) 815 container.mapNode = mapNode{statResponse: updated} 816 817 // if the metadata changed and the directory still exists, then 818 // make a note to walk it later 819 if !f.isInfoUpToDate(cachedNode.statResponse, updated) && updated.ModTime != 0 { 820 f.setModified() 821 // make a note that the directory needs to be walked 822 dirsToWalk = append(dirsToWalk, cachedNode.Path) 823 } else { 824 container.mapNode.FileNames = cachedNode.FileNames 825 } 826 } 827 // count the number of nodes to improve our understanding of the shape of the tree, 828 // thereby improving parallelism of subsequent searches 829 tempMap.UpdateNumDescendentsRecursive() 830 831 f.verbosef("Statted inodes of block %v in %v\n", id, time.Now().Sub(unmarshalDate)) 832 return tempMap, dirsToWalk, nil 833} 834 835// startFromExternalCache loads the cache database from disk 836// startFromExternalCache waits to return until the load of the cache db is complete, but 837// startFromExternalCache does not wait for all every listDir() or statDir() request to complete 838func (f *Finder) startFromExternalCache() (err error) { 839 startTime := time.Now() 840 dbPath := f.DbPath 841 842 // open cache file and validate its header 843 reader, err := f.filesystem.Open(dbPath) 844 if err != nil { 845 return errors.New("No data to load from database\n") 846 } 847 bufferedReader := bufio.NewReader(reader) 848 if !f.validateCacheHeader(bufferedReader) { 849 return errors.New("Cache header does not match") 850 } 851 f.verbosef("Database header matches, will attempt to use database %v\n", f.DbPath) 852 853 // read the file and spawn threads to process it 854 nodesToWalk := [][]*pathMap{} 855 mainTree := newPathMap("/") 856 857 // read the blocks and stream them into <blockChannel> 858 type dataBlock struct { 859 id int 860 err error 861 data []byte 862 } 863 blockChannel := make(chan dataBlock, f.numDbLoadingThreads) 864 readBlocks := func() { 865 index := 0 866 for { 867 // It takes some time to unmarshal the input from json, so we want 868 // to unmarshal it in parallel. In order to find valid places to 869 // break the input, we scan for the line separators that we inserted 870 // (for this purpose) when we dumped the database. 871 data, err := f.readLine(bufferedReader) 872 var response dataBlock 873 done := false 874 if err != nil && err != io.EOF { 875 response = dataBlock{id: index, err: err, data: nil} 876 done = true 877 } else { 878 done = (err == io.EOF) 879 response = dataBlock{id: index, err: nil, data: data} 880 } 881 blockChannel <- response 882 index++ 883 duration := time.Since(startTime) 884 f.verbosef("Read block %v after %v\n", index, duration) 885 if done { 886 f.verbosef("Read %v blocks in %v\n", index, duration) 887 close(blockChannel) 888 return 889 } 890 } 891 } 892 go readBlocks() 893 894 // Read from <blockChannel> and stream the responses into <resultChannel>. 895 type workResponse struct { 896 id int 897 err error 898 tree *pathMap 899 updatedDirs []string 900 } 901 resultChannel := make(chan workResponse) 902 processBlocks := func() { 903 numProcessed := 0 904 threadPool := newThreadPool(f.numDbLoadingThreads) 905 for { 906 // get a block to process 907 block, received := <-blockChannel 908 if !received { 909 break 910 } 911 912 if block.err != nil { 913 resultChannel <- workResponse{err: block.err} 914 break 915 } 916 numProcessed++ 917 // wait until there is CPU available to process it 918 threadPool.Run( 919 func() { 920 processStartTime := time.Now() 921 f.verbosef("Starting to process block %v after %v\n", 922 block.id, processStartTime.Sub(startTime)) 923 tempMap, updatedDirs, err := f.loadBytes(block.id, block.data) 924 var response workResponse 925 if err != nil { 926 f.verbosef( 927 "Block %v failed to parse with error %v\n", 928 block.id, err) 929 response = workResponse{err: err} 930 } else { 931 response = workResponse{ 932 id: block.id, 933 err: nil, 934 tree: tempMap, 935 updatedDirs: updatedDirs, 936 } 937 } 938 f.verbosef("Processed block %v in %v\n", 939 block.id, time.Since(processStartTime), 940 ) 941 resultChannel <- response 942 }, 943 ) 944 } 945 threadPool.Wait() 946 f.verbosef("Finished processing %v blocks in %v\n", 947 numProcessed, time.Since(startTime)) 948 close(resultChannel) 949 } 950 go processBlocks() 951 952 // Read from <resultChannel> and use the results 953 combineResults := func() (err error) { 954 for { 955 result, received := <-resultChannel 956 if !received { 957 break 958 } 959 if err != nil { 960 // In case of an error, wait for work to complete before 961 // returning the error. This ensures that any subsequent 962 // work doesn't need to compete for resources (and possibly 963 // fail due to, for example, a filesystem limit on the number of 964 // concurrently open files) with past work. 965 continue 966 } 967 if result.err != nil { 968 err = result.err 969 continue 970 } 971 // update main tree 972 mainTree.MergeIn(result.tree) 973 // record any new directories that we will need to Stat() 974 updatedNodes := make([]*pathMap, len(result.updatedDirs)) 975 for j, dir := range result.updatedDirs { 976 node := mainTree.GetNode(dir, false) 977 updatedNodes[j] = node 978 } 979 nodesToWalk = append(nodesToWalk, updatedNodes) 980 } 981 return err 982 } 983 err = combineResults() 984 if err != nil { 985 return err 986 } 987 988 f.nodes = *mainTree 989 990 // after having loaded the entire db and therefore created entries for 991 // the directories we know of, now it's safe to start calling ReadDir on 992 // any updated directories 993 for i := range nodesToWalk { 994 f.listDirsAsync(nodesToWalk[i]) 995 } 996 f.verbosef("Loaded db and statted known dirs in %v\n", time.Since(startTime)) 997 f.threadPool.Wait() 998 f.verbosef("Loaded db and statted all dirs in %v\n", time.Now().Sub(startTime)) 999 1000 return err 1001} 1002 1003// startWithoutExternalCache starts scanning the filesystem according to the cache config 1004// startWithoutExternalCache should be called if startFromExternalCache is not applicable 1005func (f *Finder) startWithoutExternalCache() { 1006 startTime := time.Now() 1007 configDirs := f.cacheMetadata.Config.RootDirs 1008 1009 // clean paths 1010 candidates := make([]string, len(configDirs)) 1011 for i, dir := range configDirs { 1012 candidates[i] = filepath.Clean(dir) 1013 } 1014 // remove duplicates 1015 dirsToScan := make([]string, 0, len(configDirs)) 1016 for _, candidate := range candidates { 1017 include := true 1018 for _, included := range dirsToScan { 1019 if included == "/" || strings.HasPrefix(candidate+"/", included+"/") { 1020 include = false 1021 break 1022 } 1023 } 1024 if include { 1025 dirsToScan = append(dirsToScan, candidate) 1026 } 1027 } 1028 1029 // start searching finally 1030 for _, path := range dirsToScan { 1031 f.verbosef("Starting find of %v\n", path) 1032 f.startFind(path) 1033 } 1034 1035 f.threadPool.Wait() 1036 1037 f.verbosef("Scanned filesystem (not using cache) in %v\n", time.Now().Sub(startTime)) 1038} 1039 1040// isInfoUpToDate tells whether <new> can confirm that results computed at <old> are still valid 1041func (f *Finder) isInfoUpToDate(old statResponse, new statResponse) (equal bool) { 1042 if old.Inode != new.Inode { 1043 return false 1044 } 1045 if old.ModTime != new.ModTime { 1046 return false 1047 } 1048 if old.Device != new.Device { 1049 return false 1050 } 1051 return true 1052} 1053 1054func (f *Finder) wasModified() bool { 1055 return atomic.LoadInt32(&f.modifiedFlag) > 0 1056} 1057 1058func (f *Finder) setModified() { 1059 var newVal int32 1060 newVal = 1 1061 atomic.StoreInt32(&f.modifiedFlag, newVal) 1062} 1063 1064// sortedDirEntries exports directory entries to facilitate dumping them to the external cache 1065func (f *Finder) sortedDirEntries() []dirFullInfo { 1066 startTime := time.Now() 1067 nodes := make([]dirFullInfo, 0) 1068 for _, node := range f.nodes.DumpAll() { 1069 if node.ModTime != 0 { 1070 nodes = append(nodes, node) 1071 } 1072 } 1073 discoveryDate := time.Now() 1074 f.verbosef("Generated %v cache entries in %v\n", len(nodes), discoveryDate.Sub(startTime)) 1075 less := func(i int, j int) bool { 1076 return nodes[i].Path < nodes[j].Path 1077 } 1078 sort.Slice(nodes, less) 1079 sortDate := time.Now() 1080 f.verbosef("Sorted %v cache entries in %v\n", len(nodes), sortDate.Sub(discoveryDate)) 1081 1082 return nodes 1083} 1084 1085// serializeDb converts the cache database into a form to save to disk 1086func (f *Finder) serializeDb() ([]byte, error) { 1087 // sort dir entries 1088 var entryList = f.sortedDirEntries() 1089 1090 // Generate an output file that can be conveniently loaded using the same number of threads 1091 // as were used in this execution (because presumably that will be the number of threads 1092 // used in the next execution too) 1093 1094 // generate header 1095 header := []byte{} 1096 header = append(header, []byte(f.cacheMetadata.Version)...) 1097 header = append(header, lineSeparator) 1098 configDump, err := f.cacheMetadata.Config.Dump() 1099 if err != nil { 1100 return nil, err 1101 } 1102 header = append(header, configDump...) 1103 1104 // serialize individual blocks in parallel 1105 numBlocks := f.numDbLoadingThreads 1106 if numBlocks > len(entryList) { 1107 numBlocks = len(entryList) 1108 } 1109 blocks := make([][]byte, 1+numBlocks) 1110 blocks[0] = header 1111 blockMin := 0 1112 wg := sync.WaitGroup{} 1113 var errLock sync.Mutex 1114 1115 for i := 1; i <= numBlocks; i++ { 1116 // identify next block 1117 blockMax := len(entryList) * i / numBlocks 1118 block := entryList[blockMin:blockMax] 1119 1120 // process block 1121 wg.Add(1) 1122 go func(index int, block []dirFullInfo) { 1123 byteBlock, subErr := f.serializeCacheEntry(block) 1124 f.verbosef("Serialized block %v into %v bytes\n", index, len(byteBlock)) 1125 if subErr != nil { 1126 f.verbosef("%v\n", subErr.Error()) 1127 errLock.Lock() 1128 err = subErr 1129 errLock.Unlock() 1130 } else { 1131 blocks[index] = byteBlock 1132 } 1133 wg.Done() 1134 }(i, block) 1135 1136 blockMin = blockMax 1137 } 1138 1139 wg.Wait() 1140 1141 if err != nil { 1142 return nil, err 1143 } 1144 1145 content := bytes.Join(blocks, []byte{lineSeparator}) 1146 1147 return content, nil 1148} 1149 1150// dumpDb saves the cache database to disk 1151func (f *Finder) dumpDb() error { 1152 startTime := time.Now() 1153 f.verbosef("Dumping db\n") 1154 1155 tempPath := f.DbPath + ".tmp" 1156 1157 bytes, err := f.serializeDb() 1158 if err != nil { 1159 return err 1160 } 1161 serializeDate := time.Now() 1162 f.verbosef("Serialized db in %v\n", serializeDate.Sub(startTime)) 1163 // dump file and atomically move 1164 err = f.filesystem.WriteFile(tempPath, bytes, 0777) 1165 if err != nil { 1166 return err 1167 } 1168 err = f.filesystem.Rename(tempPath, f.DbPath) 1169 if err != nil { 1170 return err 1171 } 1172 1173 f.verbosef("Wrote db in %v\n", time.Now().Sub(serializeDate)) 1174 return nil 1175 1176} 1177 1178// canIgnoreFsErr checks for certain classes of filesystem errors that are safe to ignore 1179func (f *Finder) canIgnoreFsErr(err error) bool { 1180 pathErr, isPathErr := err.(*os.PathError) 1181 if !isPathErr { 1182 // Don't recognize this error 1183 return false 1184 } 1185 if os.IsPermission(pathErr) { 1186 // Permission errors are ignored: 1187 // https://issuetracker.google.com/37553659 1188 // https://github.com/google/kati/pull/116 1189 return true 1190 } 1191 if pathErr.Err == os.ErrNotExist { 1192 // If a directory doesn't exist, that generally means the cache is out-of-date 1193 return true 1194 } 1195 // Don't recognize this error 1196 return false 1197} 1198 1199// onFsError should be called whenever a potentially fatal error is returned from a filesystem call 1200func (f *Finder) onFsError(path string, err error) { 1201 if !f.canIgnoreFsErr(err) { 1202 // We could send the errors through a channel instead, although that would cause this call 1203 // to block unless we preallocated a sufficient buffer or spawned a reader thread. 1204 // Although it wouldn't be too complicated to spawn a reader thread, it's still slightly 1205 // more convenient to use a lock. Only in an unusual situation should this code be 1206 // invoked anyway. 1207 f.errlock.Lock() 1208 f.fsErrs = append(f.fsErrs, fsErr{path: path, err: err}) 1209 f.errlock.Unlock() 1210 } 1211} 1212 1213// discardErrsForPrunedPaths removes any errors for paths that are no longer included in the cache 1214func (f *Finder) discardErrsForPrunedPaths() { 1215 // This function could be somewhat inefficient due to being single-threaded, 1216 // but the length of f.fsErrs should be approximately 0, so it shouldn't take long anyway. 1217 relevantErrs := make([]fsErr, 0, len(f.fsErrs)) 1218 for _, fsErr := range f.fsErrs { 1219 path := fsErr.path 1220 node := f.nodes.GetNode(path, false) 1221 if node != nil { 1222 // The path in question wasn't pruned due to a failure to process a parent directory. 1223 // So, the failure to process this path is important 1224 relevantErrs = append(relevantErrs, fsErr) 1225 } 1226 } 1227 f.fsErrs = relevantErrs 1228} 1229 1230// getErr returns an error based on previous calls to onFsErr, if any 1231func (f *Finder) getErr() error { 1232 f.discardErrsForPrunedPaths() 1233 1234 numErrs := len(f.fsErrs) 1235 if numErrs < 1 { 1236 return nil 1237 } 1238 1239 maxNumErrsToInclude := 10 1240 message := "" 1241 if numErrs > maxNumErrsToInclude { 1242 message = fmt.Sprintf("finder encountered %v errors: %v...", numErrs, f.fsErrs[:maxNumErrsToInclude]) 1243 } else { 1244 message = fmt.Sprintf("finder encountered %v errors: %v", numErrs, f.fsErrs) 1245 } 1246 1247 return errors.New(message) 1248} 1249 1250func (f *Finder) statDirAsync(dir *pathMap) { 1251 node := dir 1252 path := dir.path 1253 f.threadPool.Run( 1254 func() { 1255 updatedStats := f.statDirSync(path) 1256 1257 if !f.isInfoUpToDate(node.statResponse, updatedStats) { 1258 node.mapNode = mapNode{ 1259 statResponse: updatedStats, 1260 FileNames: []string{}, 1261 } 1262 f.setModified() 1263 if node.statResponse.ModTime != 0 { 1264 // modification time was updated, so re-scan for 1265 // child directories 1266 f.listDirAsync(dir) 1267 } 1268 } 1269 }, 1270 ) 1271} 1272 1273func (f *Finder) statDirSync(path string) statResponse { 1274 1275 fileInfo, err := f.filesystem.Lstat(path) 1276 1277 var stats statResponse 1278 if err != nil { 1279 // possibly record this error 1280 f.onFsError(path, err) 1281 // in case of a failure to stat the directory, treat the directory as missing (modTime = 0) 1282 return stats 1283 } 1284 modTime := fileInfo.ModTime() 1285 stats = statResponse{} 1286 inode, err := f.filesystem.InodeNumber(fileInfo) 1287 if err != nil { 1288 panic(fmt.Sprintf("Could not get inode number of %v: %v\n", path, err.Error())) 1289 } 1290 stats.Inode = inode 1291 device, err := f.filesystem.DeviceNumber(fileInfo) 1292 if err != nil { 1293 panic(fmt.Sprintf("Could not get device number of %v: %v\n", path, err.Error())) 1294 } 1295 stats.Device = device 1296 permissionsChangeTime, err := f.filesystem.PermTime(fileInfo) 1297 1298 if err != nil { 1299 panic(fmt.Sprintf("Could not get permissions modification time (CTime) of %v: %v\n", path, err.Error())) 1300 } 1301 // We're only interested in knowing whether anything about the directory 1302 // has changed since last check, so we use the latest of the two 1303 // modification times (content modification (mtime) and 1304 // permission modification (ctime)) 1305 if permissionsChangeTime.After(modTime) { 1306 modTime = permissionsChangeTime 1307 } 1308 stats.ModTime = modTime.UnixNano() 1309 1310 return stats 1311} 1312 1313// pruneCacheCandidates removes the items that we don't want to include in our persistent cache 1314func (f *Finder) pruneCacheCandidates(items *DirEntries) { 1315 1316 for _, fileName := range items.FileNames { 1317 for _, abortedName := range f.cacheMetadata.Config.PruneFiles { 1318 if fileName == abortedName { 1319 items.FileNames = []string{} 1320 items.DirNames = []string{} 1321 return 1322 } 1323 } 1324 } 1325 1326 // remove any files that aren't the ones we want to include 1327 writeIndex := 0 1328 for _, fileName := range items.FileNames { 1329 // include only these files 1330 for _, includedName := range f.cacheMetadata.Config.IncludeFiles { 1331 if fileName == includedName { 1332 items.FileNames[writeIndex] = fileName 1333 writeIndex++ 1334 break 1335 } 1336 } 1337 } 1338 // resize 1339 items.FileNames = items.FileNames[:writeIndex] 1340 1341 writeIndex = 0 1342 for _, dirName := range items.DirNames { 1343 items.DirNames[writeIndex] = dirName 1344 // ignore other dirs that are known to not be inputs to the build process 1345 include := true 1346 for _, excludedName := range f.cacheMetadata.Config.ExcludeDirs { 1347 if dirName == excludedName { 1348 // don't include 1349 include = false 1350 break 1351 } 1352 } 1353 if include { 1354 writeIndex++ 1355 } 1356 } 1357 // resize 1358 items.DirNames = items.DirNames[:writeIndex] 1359} 1360 1361func (f *Finder) listDirsAsync(nodes []*pathMap) { 1362 f.threadPool.Run( 1363 func() { 1364 for i := range nodes { 1365 f.listDirSync(nodes[i]) 1366 } 1367 }, 1368 ) 1369} 1370 1371func (f *Finder) listDirAsync(node *pathMap) { 1372 f.threadPool.Run( 1373 func() { 1374 f.listDirSync(node) 1375 }, 1376 ) 1377} 1378 1379func (f *Finder) listDirSync(dir *pathMap) { 1380 path := dir.path 1381 children, err := f.filesystem.ReadDir(path) 1382 1383 if err != nil { 1384 // possibly record this error 1385 f.onFsError(path, err) 1386 // if listing the contents of the directory fails (presumably due to 1387 // permission denied), then treat the directory as empty 1388 children = nil 1389 } 1390 1391 var subdirs []string 1392 var subfiles []string 1393 1394 for _, child := range children { 1395 linkBits := child.Mode() & os.ModeSymlink 1396 isLink := linkBits != 0 1397 if isLink { 1398 childPath := filepath.Join(path, child.Name()) 1399 childStat, err := f.filesystem.Stat(childPath) 1400 if err != nil { 1401 // If stat fails this is probably a broken or dangling symlink, treat it as a file. 1402 subfiles = append(subfiles, child.Name()) 1403 } else if childStat.IsDir() { 1404 // Skip symlink dirs. 1405 // We don't have to support symlink dirs because 1406 // that would cause duplicates. 1407 } else { 1408 // We do have to support symlink files because the link name might be 1409 // different than the target name 1410 // (for example, Android.bp -> build/soong/root.bp) 1411 subfiles = append(subfiles, child.Name()) 1412 } 1413 } else if child.IsDir() { 1414 subdirs = append(subdirs, child.Name()) 1415 } else { 1416 subfiles = append(subfiles, child.Name()) 1417 } 1418 1419 } 1420 parentNode := dir 1421 1422 entry := &DirEntries{Path: path, DirNames: subdirs, FileNames: subfiles} 1423 f.pruneCacheCandidates(entry) 1424 1425 // create a pathMap node for each relevant subdirectory 1426 relevantChildren := map[string]*pathMap{} 1427 for _, subdirName := range entry.DirNames { 1428 childNode, found := parentNode.children[subdirName] 1429 // if we already knew of this directory, then we already have a request pending to Stat it 1430 // if we didn't already know of this directory, then we must Stat it now 1431 if !found { 1432 childNode = parentNode.newChild(subdirName) 1433 f.statDirAsync(childNode) 1434 } 1435 relevantChildren[subdirName] = childNode 1436 } 1437 // Note that in rare cases, it's possible that we're reducing the set of 1438 // children via this statement, if these are all true: 1439 // 1. we previously had a cache that knew about subdirectories of parentNode 1440 // 2. the user created a prune-file (described in pruneCacheCandidates) 1441 // inside <parentNode>, which specifies that the contents of parentNode 1442 // are to be ignored. 1443 // The fact that it's possible to remove children here means that *pathMap structs 1444 // must not be looked up from f.nodes by filepath (and instead must be accessed by 1445 // direct pointer) until after every listDirSync completes 1446 parentNode.FileNames = entry.FileNames 1447 parentNode.children = relevantChildren 1448 1449} 1450 1451// listMatches takes a node and a function that specifies which subdirectories and 1452// files to include, and listMatches returns the matches 1453func (f *Finder) listMatches(node *pathMap, 1454 filter WalkFunc) (subDirs []*pathMap, filePaths []string) { 1455 entries := DirEntries{ 1456 FileNames: node.FileNames, 1457 } 1458 entries.DirNames = make([]string, 0, len(node.children)) 1459 for childName := range node.children { 1460 entries.DirNames = append(entries.DirNames, childName) 1461 } 1462 1463 dirNames, fileNames := filter(entries) 1464 1465 subDirs = []*pathMap{} 1466 filePaths = make([]string, 0, len(fileNames)) 1467 for _, fileName := range fileNames { 1468 filePaths = append(filePaths, joinCleanPaths(node.path, fileName)) 1469 } 1470 subDirs = make([]*pathMap, 0, len(dirNames)) 1471 for _, childName := range dirNames { 1472 child, ok := node.children[childName] 1473 if ok { 1474 subDirs = append(subDirs, child) 1475 } 1476 } 1477 1478 return subDirs, filePaths 1479} 1480 1481// findInCacheMultithreaded spawns potentially multiple goroutines with which to search the cache. 1482func (f *Finder) findInCacheMultithreaded(node *pathMap, filter WalkFunc, 1483 approxNumThreads int) []string { 1484 1485 if approxNumThreads < 2 { 1486 // Done spawning threads; process remaining directories 1487 return f.findInCacheSinglethreaded(node, filter) 1488 } 1489 1490 totalWork := 0 1491 for _, child := range node.children { 1492 totalWork += child.approximateNumDescendents 1493 } 1494 childrenResults := make(chan []string, len(node.children)) 1495 1496 subDirs, filePaths := f.listMatches(node, filter) 1497 1498 // process child directories 1499 for _, child := range subDirs { 1500 numChildThreads := approxNumThreads * child.approximateNumDescendents / totalWork 1501 childProcessor := func(child *pathMap) { 1502 childResults := f.findInCacheMultithreaded(child, filter, numChildThreads) 1503 childrenResults <- childResults 1504 } 1505 // If we're allowed to use more than 1 thread to process this directory, 1506 // then instead we use 1 thread for each subdirectory. 1507 // It would be strange to spawn threads for only some subdirectories. 1508 go childProcessor(child) 1509 } 1510 1511 // collect results 1512 for i := 0; i < len(subDirs); i++ { 1513 childResults := <-childrenResults 1514 filePaths = append(filePaths, childResults...) 1515 } 1516 close(childrenResults) 1517 1518 return filePaths 1519} 1520 1521// findInCacheSinglethreaded synchronously searches the cache for all matching file paths 1522// note findInCacheSinglethreaded runs 2X to 4X as fast by being iterative rather than recursive 1523func (f *Finder) findInCacheSinglethreaded(node *pathMap, filter WalkFunc) []string { 1524 if node == nil { 1525 return []string{} 1526 } 1527 1528 nodes := []*pathMap{node} 1529 matches := []string{} 1530 1531 for len(nodes) > 0 { 1532 currentNode := nodes[0] 1533 nodes = nodes[1:] 1534 1535 subDirs, filePaths := f.listMatches(currentNode, filter) 1536 1537 nodes = append(nodes, subDirs...) 1538 1539 matches = append(matches, filePaths...) 1540 } 1541 return matches 1542} 1543