cmd/cigocacher: remove Windows-specific disk code moved upstream (#18697)

Updates tailscale/corp#10808
Updates bradfitz/go-tool-cache#27

Change-Id: I27a2af63d882d916998933521f17e410692255ca
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
main
Brad Fitzpatrick 2 months ago committed by GitHub
parent 3f3af841af
commit bfc15cb57c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 134
      cmd/cigocacher/cigocacher.go
  2. 88
      cmd/cigocacher/disk.go
  3. 44
      cmd/cigocacher/disk_notwindows.go
  4. 102
      cmd/cigocacher/disk_windows.go
  5. 109
      cmd/cigocacher/http.go
  6. 2
      flake.nix
  7. 4
      go.mod
  8. 2
      go.mod.sri
  9. 8
      go.sum
  10. 2
      shell.nix

@ -12,10 +12,8 @@
package main
import (
"bytes"
"context"
jsonv1 "encoding/json"
"errors"
"flag"
"fmt"
"io"
@ -103,13 +101,7 @@ func main() {
if tk == "" {
log.Fatal("--token is empty; cannot fetch stats")
}
c := &gocachedClient{
baseURL: *srvURL,
cl: httpClient(srvHost, *srvHostDial),
accessToken: tk,
verbose: *verbose,
}
stats, err := c.fetchStats()
stats, err := fetchStats(httpClient(srvHost, *srvHostDial), *srvURL, tk)
if err != nil {
log.Fatalf("error fetching gocached stats: %v", err)
}
@ -140,11 +132,13 @@ func main() {
if *verbose {
log.Printf("Using cigocached at %s", *srvURL)
}
c.gocached = &gocachedClient{
baseURL: *srvURL,
cl: httpClient(srvHost, *srvHostDial),
accessToken: *token,
verbose: *verbose,
c.remote = &cachers.HTTPClient{
BaseURL: *srvURL,
Disk: c.disk,
HTTPClient: httpClient(srvHost, *srvHostDial),
AccessToken: *token,
Verbose: *verbose,
BestEffortHTTP: true,
}
}
var p *cacheproc.Process
@ -186,9 +180,9 @@ func httpClient(srvHost, srvHostDial string) *http.Client {
}
type cigocacher struct {
disk *cachers.DiskCache
gocached *gocachedClient
verbose bool
disk *cachers.DiskCache
remote *cachers.HTTPClient // nil if no remote server
verbose bool
getNanos atomic.Int64 // total nanoseconds spent in gets
putNanos atomic.Int64 // total nanoseconds spent in puts
@ -209,39 +203,33 @@ func (c *cigocacher) get(ctx context.Context, actionID string) (outputID, diskPa
defer func() {
c.getNanos.Add(time.Since(t0).Nanoseconds())
}()
if c.gocached == nil {
return c.disk.Get(ctx, actionID)
}
outputID, diskPath, err = c.disk.Get(ctx, actionID)
if err == nil && outputID != "" {
return outputID, diskPath, nil
if c.remote == nil || (err == nil && outputID != "") {
return outputID, diskPath, err
}
// Disk miss; try remote. HTTPClient.Get handles the HTTP fetch
// (including lz4 decompression) and writes to disk for us.
c.getHTTP.Add(1)
t0HTTP := time.Now()
defer func() {
c.getHTTPNanos.Add(time.Since(t0HTTP).Nanoseconds())
}()
outputID, res, err := c.gocached.get(ctx, actionID)
outputID, diskPath, err = c.remote.Get(ctx, actionID)
if err != nil {
c.getHTTPErrors.Add(1)
return "", "", nil
}
if outputID == "" || res == nil {
if outputID == "" {
c.getHTTPMisses.Add(1)
return "", "", nil
}
defer res.Body.Close()
diskPath, err = put(c.disk, actionID, outputID, res.ContentLength, res.Body)
if err != nil {
return "", "", fmt.Errorf("error filling disk cache from HTTP: %w", err)
}
c.getHTTPHits.Add(1)
c.getHTTPBytes.Add(res.ContentLength)
if fi, err := os.Stat(diskPath); err == nil {
c.getHTTPBytes.Add(fi.Size())
}
return outputID, diskPath, nil
}
@ -250,56 +238,25 @@ func (c *cigocacher) put(ctx context.Context, actionID, outputID string, size in
defer func() {
c.putNanos.Add(time.Since(t0).Nanoseconds())
}()
if c.gocached == nil {
return put(c.disk, actionID, outputID, size, r)
}
c.putHTTP.Add(1)
var diskReader, httpReader io.Reader
tee := &bestEffortTeeReader{r: r}
if size == 0 {
// Special case the empty file so NewRequest sets "Content-Length: 0",
// as opposed to thinking we didn't set it and not being able to sniff its size
// from the type.
diskReader, httpReader = bytes.NewReader(nil), bytes.NewReader(nil)
} else {
pr, pw := io.Pipe()
defer pw.Close()
// The diskReader is in the driving seat. We will try to forward data
// to httpReader as well, but only best-effort.
diskReader = tee
tee.w = pw
httpReader = pr
if c.remote == nil {
return c.disk.Put(ctx, actionID, outputID, size, r)
}
httpErrCh := make(chan error)
go func() {
t0HTTP := time.Now()
defer func() {
c.putHTTPNanos.Add(time.Since(t0HTTP).Nanoseconds())
}()
httpErrCh <- c.gocached.put(ctx, actionID, outputID, size, httpReader)
}()
diskPath, err = put(c.disk, actionID, outputID, size, diskReader)
c.putHTTP.Add(1)
diskPath, err = c.remote.Put(ctx, actionID, outputID, size, r)
c.putHTTPNanos.Add(time.Since(t0).Nanoseconds())
if err != nil {
return "", fmt.Errorf("error writing to disk cache: %w", errors.Join(err, tee.err))
}
select {
case err := <-httpErrCh:
if err != nil {
c.putHTTPErrors.Add(1)
} else {
c.putHTTPBytes.Add(size)
}
case <-ctx.Done():
c.putHTTPErrors.Add(1)
} else {
c.putHTTPBytes.Add(size)
}
return diskPath, nil
return diskPath, err
}
func (c *cigocacher) close() error {
if !c.verbose || c.gocached == nil {
if !c.verbose || c.remote == nil {
return nil
}
@ -307,7 +264,7 @@ func (c *cigocacher) close() error {
c.getHTTP.Load(), float64(c.getHTTPBytes.Load())/float64(1<<20), float64(c.getHTTPNanos.Load())/float64(time.Second), c.getHTTPHits.Load(), c.getHTTPMisses.Load(), c.getHTTPErrors.Load(),
c.putHTTP.Load(), float64(c.putHTTPBytes.Load())/float64(1<<20), float64(c.putHTTPNanos.Load())/float64(time.Second), c.putHTTPErrors.Load())
stats, err := c.gocached.fetchStats()
stats, err := fetchStats(c.remote.HTTPClient, c.remote.BaseURL, c.remote.AccessToken)
if err != nil {
log.Printf("error fetching gocached stats: %v", err)
} else {
@ -354,19 +311,20 @@ func fetchAccessToken(cl *http.Client, idTokenURL, idTokenRequestToken, gocached
return accessToken.AccessToken, nil
}
type bestEffortTeeReader struct {
r io.Reader
w io.WriteCloser
err error
}
func (t *bestEffortTeeReader) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if n > 0 && t.w != nil {
if _, err := t.w.Write(p[:n]); err != nil {
t.err = errors.Join(err, t.w.Close())
t.w = nil
}
func fetchStats(cl *http.Client, baseURL, accessToken string) (string, error) {
req, _ := http.NewRequest("GET", baseURL+"/session/stats", nil)
req.Header.Set("Authorization", "Bearer "+accessToken)
resp, err := cl.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("fetching stats: %s", resp.Status)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return n, err
return string(b), nil
}

@ -1,88 +0,0 @@
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
"github.com/bradfitz/go-tool-cache/cachers"
)
// indexEntry is the metadata that DiskCache stores on disk for an ActionID.
type indexEntry struct {
Version int `json:"v"`
OutputID string `json:"o"`
Size int64 `json:"n"`
TimeNanos int64 `json:"t"`
}
func validHex(x string) bool {
if len(x) < 4 || len(x) > 100 {
return false
}
for _, b := range x {
if b >= '0' && b <= '9' || b >= 'a' && b <= 'f' {
continue
}
return false
}
return true
}
// put is like dc.Put but refactored to support safe concurrent writes on Windows.
// TODO(tomhjp): upstream these changes to go-tool-cache once they look stable.
func put(dc *cachers.DiskCache, actionID, outputID string, size int64, body io.Reader) (diskPath string, _ error) {
if len(actionID) < 4 || len(outputID) < 4 {
return "", fmt.Errorf("actionID and outputID must be at least 4 characters long")
}
if !validHex(actionID) {
log.Printf("diskcache: got invalid actionID %q", actionID)
return "", errors.New("actionID must be hex")
}
if !validHex(outputID) {
log.Printf("diskcache: got invalid outputID %q", outputID)
return "", errors.New("outputID must be hex")
}
actionFile := dc.ActionFilename(actionID)
outputFile := dc.OutputFilename(outputID)
actionDir := filepath.Dir(actionFile)
outputDir := filepath.Dir(outputFile)
if err := os.MkdirAll(actionDir, 0755); err != nil {
return "", fmt.Errorf("failed to create action directory: %w", err)
}
if err := os.MkdirAll(outputDir, 0755); err != nil {
return "", fmt.Errorf("failed to create output directory: %w", err)
}
wrote, err := writeOutputFile(outputFile, body, size, outputID)
if err != nil {
return "", err
}
if wrote != size {
return "", fmt.Errorf("wrote %d bytes, expected %d", wrote, size)
}
ij, err := json.Marshal(indexEntry{
Version: 1,
OutputID: outputID,
Size: size,
TimeNanos: time.Now().UnixNano(),
})
if err != nil {
return "", err
}
if err := writeActionFile(dc.ActionFilename(actionID), ij); err != nil {
return "", fmt.Errorf("atomic write failed: %w", err)
}
return outputFile, nil
}

@ -1,44 +0,0 @@
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
//go:build !windows
package main
import (
"bytes"
"io"
"os"
"path/filepath"
)
func writeActionFile(dest string, b []byte) error {
_, err := writeAtomic(dest, bytes.NewReader(b))
return err
}
func writeOutputFile(dest string, r io.Reader, _ int64, _ string) (int64, error) {
return writeAtomic(dest, r)
}
func writeAtomic(dest string, r io.Reader) (int64, error) {
tf, err := os.CreateTemp(filepath.Dir(dest), filepath.Base(dest)+".*")
if err != nil {
return 0, err
}
size, err := io.Copy(tf, r)
if err != nil {
tf.Close()
os.Remove(tf.Name())
return 0, err
}
if err := tf.Close(); err != nil {
os.Remove(tf.Name())
return 0, err
}
if err := os.Rename(tf.Name(), dest); err != nil {
os.Remove(tf.Name())
return 0, err
}
return size, nil
}

@ -1,102 +0,0 @@
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package main
import (
"crypto/sha256"
"errors"
"fmt"
"io"
"os"
)
// The functions in this file are based on go's own cache in
// cmd/go/internal/cache/cache.go, particularly putIndexEntry and copyFile.
// writeActionFile writes the indexEntry metadata for an ActionID to disk. It
// may be called for the same actionID concurrently from multiple processes,
// and the outputID for a specific actionID may change from time to time due
// to non-deterministic builds. It makes a best-effort to delete the file if
// anything goes wrong.
func writeActionFile(dest string, b []byte) (retErr error) {
f, err := os.OpenFile(dest, os.O_WRONLY|os.O_CREATE, 0o666)
if err != nil {
return err
}
defer func() {
cerr := f.Close()
if retErr != nil || cerr != nil {
retErr = errors.Join(retErr, cerr, os.Remove(dest))
}
}()
_, err = f.Write(b)
if err != nil {
return err
}
// Truncate the file only *after* writing it.
// (This should be a no-op, but truncate just in case of previous corruption.)
//
// This differs from os.WriteFile, which truncates to 0 *before* writing
// via os.O_TRUNC. Truncating only after writing ensures that a second write
// of the same content to the same file is idempotent, and does not - even
// temporarily! - undo the effect of the first write.
return f.Truncate(int64(len(b)))
}
// writeOutputFile writes content to be cached to disk. The outputID is the
// sha256 hash of the content, and each file should only be written ~once,
// assuming no sha256 hash collisions. It may be written multiple times if
// concurrent processes are both populating the same output. The file is opened
// with FILE_SHARE_READ|FILE_SHARE_WRITE, which means both processes can write
// the same contents concurrently without conflict.
//
// It makes a best effort to clean up if anything goes wrong, but the file may
// be left in an inconsistent state in the event of disk-related errors such as
// another process taking file locks, or power loss etc.
func writeOutputFile(dest string, r io.Reader, size int64, outputID string) (_ int64, retErr error) {
info, err := os.Stat(dest)
if err == nil && info.Size() == size {
// Already exists, check the hash.
if f, err := os.Open(dest); err == nil {
h := sha256.New()
io.Copy(h, f)
f.Close()
if fmt.Sprintf("%x", h.Sum(nil)) == outputID {
// Still drain the reader to ensure associated resources are released.
return io.Copy(io.Discard, r)
}
}
}
// Didn't successfully find the pre-existing file, write it.
mode := os.O_WRONLY | os.O_CREATE
if err == nil && info.Size() > size {
mode |= os.O_TRUNC // Should never happen, but self-heal.
}
f, err := os.OpenFile(dest, mode, 0644)
if err != nil {
return 0, fmt.Errorf("failed to open output file %q: %w", dest, err)
}
defer func() {
cerr := f.Close()
if retErr != nil || cerr != nil {
retErr = errors.Join(retErr, cerr, os.Remove(dest))
}
}()
// Copy file to f, but also into h to double-check hash.
h := sha256.New()
w := io.MultiWriter(f, h)
n, err := io.Copy(w, r)
if err != nil {
return 0, err
}
if fmt.Sprintf("%x", h.Sum(nil)) != outputID {
return 0, errors.New("file content changed underfoot")
}
return n, nil
}

@ -1,109 +0,0 @@
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package main
import (
"context"
"fmt"
"io"
"log"
"net/http"
)
type gocachedClient struct {
baseURL string // base URL of the cacher server, like "http://localhost:31364".
cl *http.Client // http.Client to use.
accessToken string // Bearer token to use in the Authorization header.
verbose bool
}
// drainAndClose reads and throws away a small bounded amount of data. This is a
// best-effort attempt to allow connection reuse; Go's HTTP/1 Transport won't
// reuse a TCP connection unless you fully consume HTTP responses.
func drainAndClose(body io.ReadCloser) {
io.CopyN(io.Discard, body, 4<<10)
body.Close()
}
func tryReadErrorMessage(res *http.Response) []byte {
msg, _ := io.ReadAll(io.LimitReader(res.Body, 4<<10))
return msg
}
func (c *gocachedClient) get(ctx context.Context, actionID string) (outputID string, resp *http.Response, err error) {
req, _ := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/action/"+actionID, nil)
req.Header.Set("Want-Object", "1") // opt in to single roundtrip protocol
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
res, err := c.cl.Do(req)
if err != nil {
return "", nil, err
}
defer func() {
if resp == nil {
drainAndClose(res.Body)
}
}()
if res.StatusCode == http.StatusNotFound {
return "", nil, nil
}
if res.StatusCode != http.StatusOK {
msg := tryReadErrorMessage(res)
if c.verbose {
log.Printf("error GET /action/%s: %v, %s", actionID, res.Status, msg)
}
return "", nil, fmt.Errorf("unexpected GET /action/%s status %v", actionID, res.Status)
}
outputID = res.Header.Get("Go-Output-Id")
if outputID == "" {
return "", nil, fmt.Errorf("missing Go-Output-Id header in response")
}
if res.ContentLength == -1 {
return "", nil, fmt.Errorf("no Content-Length from server")
}
return outputID, res, nil
}
func (c *gocachedClient) put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) error {
req, _ := http.NewRequestWithContext(ctx, "PUT", c.baseURL+"/"+actionID+"/"+outputID, body)
req.ContentLength = size
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
res, err := c.cl.Do(req)
if err != nil {
if c.verbose {
log.Printf("error PUT /%s/%s: %v", actionID, outputID, err)
}
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
msg := tryReadErrorMessage(res)
if c.verbose {
log.Printf("error PUT /%s/%s: %v, %s", actionID, outputID, res.Status, msg)
}
return fmt.Errorf("unexpected PUT /%s/%s status %v", actionID, outputID, res.Status)
}
return nil
}
func (c *gocachedClient) fetchStats() (string, error) {
req, _ := http.NewRequest("GET", c.baseURL+"/session/stats", nil)
req.Header.Set("Authorization", "Bearer "+c.accessToken)
resp, err := c.cl.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(b), nil
}

@ -151,4 +151,4 @@
});
};
}
# nix-direnv cache busting line: sha256-e5fAO7gye8B5FGBTxLNVTKq6dp8By9iDEw72M1/y4ZE=
# nix-direnv cache busting line: sha256-JD1PZPZT5clhRWIAQO8skBRN59QPiyfTc7nPYTvGbd8=

@ -17,7 +17,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.75.3
github.com/aws/aws-sdk-go-v2/service/ssm v1.44.7
github.com/axiomhq/hyperloglog v0.0.0-20240319100328-84253e514e02
github.com/bradfitz/go-tool-cache v0.0.0-20251113223507-0124e698e0bd
github.com/bradfitz/go-tool-cache v0.0.0-20260216153636-9e5201344fe5
github.com/bradfitz/monogok v0.0.0-20260208031948-2219c393d032
github.com/bramvdbogaerde/go-scp v1.4.0
github.com/cilium/ebpf v0.16.0
@ -419,7 +419,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pierrec/lz4/v4 v4.1.25 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect

@ -1 +1 @@
sha256-e5fAO7gye8B5FGBTxLNVTKq6dp8By9iDEw72M1/y4ZE=
sha256-JD1PZPZT5clhRWIAQO8skBRN59QPiyfTc7nPYTvGbd8=

@ -202,8 +202,8 @@ github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bombsimon/wsl/v4 v4.2.1 h1:Cxg6u+XDWff75SIFFmNsqnIOgob+Q9hG6y/ioKbRFiM=
github.com/bombsimon/wsl/v4 v4.2.1/go.mod h1:Xu/kDxGZTofQcDGCtQe9KCzhHphIe0fDuyWTxER9Feo=
github.com/bradfitz/go-tool-cache v0.0.0-20251113223507-0124e698e0bd h1:1Df3FBmfyUCIQ4eKzAPXIWTfewY89L0fWPWO56zWCyI=
github.com/bradfitz/go-tool-cache v0.0.0-20251113223507-0124e698e0bd/go.mod h1:2+xptBAd0m2kZ1wLO4AYZhldLEFPy+KeGwmnlXLvy+w=
github.com/bradfitz/go-tool-cache v0.0.0-20260216153636-9e5201344fe5 h1:0sG3c7afYdBNlc3QyhckvZ4bV9iqlfqCQM1i+mWm0eE=
github.com/bradfitz/go-tool-cache v0.0.0-20260216153636-9e5201344fe5/go.mod h1:78ZLITnBUCDJeU01+wYYJKaPYYgsDzJPRfxeI8qFh5g=
github.com/bradfitz/monogok v0.0.0-20260208031948-2219c393d032 h1:xDomVqO85ss/98Ky5zxM/g86bXDNBLebM2I9G/fu6uA=
github.com/bradfitz/monogok v0.0.0-20260208031948-2219c393d032/go.mod h1:TG1HbU9fRVDnNgXncVkKz9GdvjIvqquXjH6QZSEVmY4=
github.com/bramvdbogaerde/go-scp v1.4.0 h1:jKMwpwCbcX1KyvDbm/PDJuXcMuNVlLGi0Q0reuzjyKY=
@ -935,8 +935,8 @@ github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkM
github.com/peterbourgon/ff/v3 v3.4.0/go.mod h1:zjJVUhx+twciwfDl0zBcFzl4dW8axCRyXE/eKY9RztQ=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0=
github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pires/go-proxyproto v0.8.1 h1:9KEixbdJfhrbtjpz/ZwCdWDD2Xem0NZ38qMYaASJgp0=
github.com/pires/go-proxyproto v0.8.1/go.mod h1:ZKAAyp3cgy5Y5Mo4n9AlScrkCZwUy0g3Jf+slqQVcuU=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=

@ -16,4 +16,4 @@
) {
src = ./.;
}).shellNix
# nix-direnv cache busting line: sha256-e5fAO7gye8B5FGBTxLNVTKq6dp8By9iDEw72M1/y4ZE=
# nix-direnv cache busting line: sha256-JD1PZPZT5clhRWIAQO8skBRN59QPiyfTc7nPYTvGbd8=

Loading…
Cancel
Save