From d3a652037ef879f9279bc056c43d15ba7afcbb25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ludovic=20Court=C3=A8s?= Date: Mon, 23 Mar 2015 22:25:04 +0100 Subject: [PATCH] substitute-binary: Pipeline HTTP requests instead of using threads. * guix/scripts/substitute-binary.scm (fetch-narinfo, %lookup-threads, n-par-map*): Remove. (narinfo-cache-file, cached-narinfo, cache-narinfo!, narinfo-request, http-multiple-get, read-to-eof, fetch-narinfos, lookup-narinfos, narinfo-from-file): New procedures. (lookup-narinfo): Rewrite in terms of 'lookup-narinfos'. (guix-substitute-binary): Use 'lookup-narinfos' instead of 'lookup-narinfo'. --- guix/scripts/substitute-binary.scm | 270 ++++++++++++++++++++--------- 1 file changed, 192 insertions(+), 78 deletions(-) diff --git a/guix/scripts/substitute-binary.scm b/guix/scripts/substitute-binary.scm index 85c2c74520..c21c50fe9f 100755 --- a/guix/scripts/substitute-binary.scm +++ b/guix/scripts/substitute-binary.scm @@ -28,7 +28,7 @@ (define-module (guix scripts substitute-binary) #:use-module (guix base64) #:use-module (guix pk-crypto) #:use-module (guix pki) - #:use-module ((guix build utils) #:select (mkdir-p)) + #:use-module ((guix build utils) #:select (mkdir-p dump-port)) #:use-module ((guix build download) #:select (progress-proc uri-abbreviation)) #:use-module (ice-9 rdelim) @@ -48,6 +48,8 @@ (define-module (guix scripts substitute-binary) #:use-module (srfi srfi-34) #:use-module (srfi srfi-35) #:use-module (web uri) + #:use-module (web request) + #:use-module (web response) #:use-module (guix http-client) #:export (narinfo-signature->canonical-sexp read-narinfo @@ -218,7 +220,7 @@ (define-syntax-rule (open-cache* url) gonna have to wait." (delay (begin (format (current-error-port) - (_ "updating list of substitutes from '~a'...~%") + (_ "updating list of substitutes from '~a'...\r") url) (open-cache url)))) @@ -380,40 +382,56 @@ (define (string->narinfo str cache-uri) the cache STR originates form." (call-with-input-string str (cut read-narinfo <> cache-uri))) -(define (fetch-narinfo cache path) - "Return the record for PATH, or #f if CACHE does not hold PATH." - (define (download url) - ;; Download the .narinfo from URL, and return its contents as a list of - ;; key/value pairs. Don't emit an error message upon 404. - (false-if-exception (fetch (string->uri url) - #:quiet-404? #t))) - - (and (string=? (cache-store-directory cache) (%store-prefix)) - (and=> (download (string-append (cache-url cache) "/" - (store-path-hash-part path) - ".narinfo")) - (cute read-narinfo <> (cache-url cache))))) - (define (obsolete? date now ttl) "Return #t if DATE is obsolete compared to NOW + TTL seconds." (time>? (subtract-duration now (make-time time-duration 0 ttl)) (make-time time-monotonic 0 date))) -(define %lookup-threads - ;; Number of threads spawned to perform lookup operations. This means we - ;; can have this many simultaneous HTTP GET requests to the server, which - ;; limits the impact of connection latency. - 20) -(define (lookup-narinfo cache path) - "Check locally if we have valid info about PATH, otherwise go to CACHE and -check what it has." +(define (narinfo-cache-file path) + "Return the name of the local file that contains an entry for PATH." + (string-append %narinfo-cache-directory "/" + (store-path-hash-part path))) + +(define (cached-narinfo path) + "Check locally if we have valid info about PATH. Return two values: a +Boolean indicating whether we have valid cached info, and that info, which may +be either #f (when PATH is unavailable) or the narinfo for PATH." (define now (current-time time-monotonic)) (define cache-file - (string-append %narinfo-cache-directory "/" - (store-path-hash-part path))) + (narinfo-cache-file path)) + + (catch 'system-error + (lambda () + (call-with-input-file cache-file + (lambda (p) + (match (read p) + (('narinfo ('version 1) + ('cache-uri cache-uri) + ('date date) ('value #f)) + ;; A cached negative lookup. + (if (obsolete? date now %narinfo-negative-ttl) + (values #f #f) + (values #t #f))) + (('narinfo ('version 1) + ('cache-uri cache-uri) + ('date date) ('value value)) + ;; A cached positive lookup + (if (obsolete? date now %narinfo-ttl) + (values #f #f) + (values #t (string->narinfo value cache-uri)))) + (('narinfo ('version v) _ ...) + (values #f #f)))))) + (lambda _ + (values #f #f)))) + +(define (cache-narinfo! cache path narinfo) + "Cache locally NARNIFO for PATH, which originates from CACHE. NARINFO may +be #f, in which case it indicates that PATH is unavailable at CACHE." + (define now + (current-time time-monotonic)) (define (cache-entry cache-uri narinfo) `(narinfo (version 1) @@ -421,43 +439,153 @@ (define (cache-entry cache-uri narinfo) (date ,(time-second now)) (value ,(and=> narinfo narinfo->string)))) - (let*-values (((valid? cached) - (catch 'system-error - (lambda () - (call-with-input-file cache-file - (lambda (p) - (match (read p) - (('narinfo ('version 1) - ('cache-uri cache-uri) - ('date date) ('value #f)) - ;; A cached negative lookup. - (if (obsolete? date now %narinfo-negative-ttl) - (values #f #f) - (values #t #f))) - (('narinfo ('version 1) - ('cache-uri cache-uri) - ('date date) ('value value)) - ;; A cached positive lookup - (if (obsolete? date now %narinfo-ttl) - (values #f #f) - (values #t (string->narinfo value - cache-uri)))) - (('narinfo ('version v) _ ...) - (values #f #f)))))) - (lambda _ - (values #f #f))))) - (if valid? - cached ; including negative caches + (with-atomic-file-output (narinfo-cache-file path) + (lambda (out) + (write (cache-entry (cache-url cache) narinfo) out))) + narinfo) + +(define (narinfo-request cache-url path) + "Return an HTTP request for the narinfo of PATH at CACHE-URL." + (let ((url (string-append cache-url "/" (store-path-hash-part path) + ".narinfo"))) + (build-request (string->uri url) #:method 'GET))) + +(define (http-multiple-get base-url requests proc) + "Send all of REQUESTS to the server at BASE-URL. Call PROC for each +response, passing it the request object, the response, and a port from which +to read the response body. Return the list of results." + (let connect ((requests requests) + (result '())) + ;; (format (current-error-port) "connecting (~a requests left)..." + ;; (length requests)) + (let ((p (open-socket-for-uri base-url))) + ;; Send all of REQUESTS in a row. + (setvbuf p _IOFBF (expt 2 16)) + (for-each (cut write-request <> p) requests) + (force-output p) + + ;; Now start processing responses. + (let loop ((requests requests) + (result result)) + (match requests + (() + (reverse result)) + ((head tail ...) + (let* ((resp (read-response p)) + (body (response-body-port resp))) + ;; The server can choose to stop responding at any time, in which + ;; case we have to try again. Check whether that is the case. + (match (assq 'connection (response-headers resp)) + (('connection 'close) + (connect requests result)) ;try again + (_ + (loop tail ;keep going + (cons (proc head resp body) result))))))))))) + +(define (read-to-eof port) + "Read from PORT until EOF is reached. The data are discarded." + (dump-port port (%make-void-port "w"))) + +(define (narinfo-from-file file url) + "Attempt to read a narinfo from FILE, using URL as the cache URL. Return #f +if file doesn't exist, and the narinfo otherwise." + (catch 'system-error + (lambda () + (call-with-input-file file + (cut read-narinfo <> url))) + (lambda args + (if (= ENOENT (system-error-errno args)) + #f + (apply throw args))))) + +(define (fetch-narinfos cache paths) + "Retrieve all the narinfos for PATHS from CACHE and return them." + (define url + (cache-url cache)) + + (define update-progress! + (let ((done 0)) + (lambda () + (display #\cr (current-error-port)) + (force-output (current-error-port)) + (format (current-error-port) + (_ "updating list of substitutes from '~a'... ~5,1f%") + url (* 100. (/ done (length paths)))) + (set! done (+ 1 done))))) + + (define (handle-narinfo-response request response port) + (let ((len (response-content-length response))) + ;; Make sure to read no more than LEN bytes since subsequent bytes may + ;; belong to the next response. + (case (response-code response) + ((200) ; hit + (let ((narinfo (read-narinfo port url #:size len))) + (cache-narinfo! cache (narinfo-path narinfo) narinfo) + (update-progress!) + narinfo)) + ((404) ; failure + (let* ((path (uri-path (request-uri request))) + (hash-part (string-drop-right path 8))) ; drop ".narinfo" + (if len + (get-bytevector-n port len) + (read-to-eof port)) + (cache-narinfo! cache + (find (cut string-contains <> hash-part) paths) + #f) + (update-progress!)) + #f) + (else ; transient failure + (if len + (get-bytevector-n port len) + (read-to-eof port)) + #f)))) + + (and (string=? (cache-store-directory cache) (%store-prefix)) + (let ((uri (string->uri url))) + (case (and=> uri uri-scheme) + ((http) + (let ((requests (map (cut narinfo-request url <>) paths))) + (update-progress!) + (let ((result (http-multiple-get url requests + handle-narinfo-response))) + (newline (current-error-port)) + result))) + ((file #f) + (let* ((base (string-append (uri-path uri) "/")) + (files (map (compose (cut string-append base <> ".narinfo") + store-path-hash-part) + paths))) + (filter-map (cut narinfo-from-file <> url) files))) + (else + (leave (_ "~s: unsupported server URI scheme~%") + (if uri (uri-scheme uri) url))))))) + +(define (lookup-narinfos cache paths) + "Return the narinfos for PATHS, invoking the server at CACHE when no +information is available locally." + (let-values (((cached missing) + (fold2 (lambda (path cached missing) + (let-values (((valid? value) + (cached-narinfo path))) + (if valid? + (values (cons value cached) missing) + (values cached (cons path missing))))) + '() + '() + paths))) + (if (null? missing) + cached (let* ((cache (force cache)) - (narinfo (and cache (fetch-narinfo cache path)))) - ;; Cache NARINFO only when CACHE was actually accessible. This - ;; avoids caching negative hits when in fact we just lacked network - ;; access. - (when cache - (with-atomic-file-output cache-file - (lambda (out) - (write (cache-entry (cache-url cache) narinfo) out)))) - narinfo)))) + (missing (if cache + (fetch-narinfos cache missing) + '()))) + (append cached missing))))) + +(define (lookup-narinfo cache path) + "Return the narinfo for PATH in CACHE, or #f when no substitute for PATH was +found." + (match (lookup-narinfos cache (list path)) + ((answer) answer))) (define (remove-expired-cached-narinfos) "Remove expired narinfo entries from the cache. The sole purpose of this @@ -580,16 +708,6 @@ (define (show-help) ;;; Entry point. ;;; -(define n-par-map* - ;; We want the ability to run many threads in parallel, regardless of the - ;; number of cores. However, Guile 2.0.5 has a bug whereby 'n-par-map' ends - ;; up consuming a lot of memory, possibly leading to death. Thus, resort to - ;; 'par-map' on 2.0.5. - (if (guile-version>? "2.0.5") - n-par-map - (lambda (n proc lst) - (par-map proc lst)))) - (define (check-acl-initialized) "Warn if the ACL is uninitialized." (define (singleton? acl) @@ -698,9 +816,7 @@ (define (valid? obj) ;; Return the subset of PATHS available in CACHE. (let ((substitutable (if cache - (n-par-map* %lookup-threads - (cut lookup-narinfo cache <>) - paths) + (lookup-narinfos cache paths) '()))) (for-each (lambda (narinfo) (format #t "~a~%" (narinfo-path narinfo))) @@ -710,9 +826,7 @@ (define (valid? obj) ;; Reply info about PATHS if it's in CACHE. (let ((substitutable (if cache - (n-par-map* %lookup-threads - (cut lookup-narinfo cache <>) - paths) + (lookup-narinfos cache paths) '()))) (for-each (lambda (narinfo) (format #t "~a\n~a\n~a\n"