store: Add with-store/non-blocking.

For some applications, it's important to establish a non-blocking connection
rather than just making the socket non-blocking after the connection is
established. This is because there is I/O on the socket that will block during
the handshake.

I've noticed this blocking during the handshake causing issues in the build
coordinator for example.

This commit adds a new with-store variant to avoid changing the behaviour of
with-store/open-connection to ensure that this change can't break anything
that depends on the blocking nature of the socket.

* guix/store.scm (open-unix-domain-socket, open-inet-socket): Take
 #:non-blocking? and use SOCK_NONBLOCK when calling socket if appropriate.
(connect-to-daemon, open-connection, call-with-store): Take #:non-blocking?
and pass it on.
(with-store/non-blocking): New syntax rule.
* .dir-locals.el (scheme-mode): Add entry for with-store/non-blocking.

Change-Id: I8225762b78448bc1f7b698c8de5d736e13f577bf
This commit is contained in:
Christopher Baines 2024-05-11 17:53:20 +01:00
parent 1632fd0f1b
commit 3db1a8341c
No known key found for this signature in database
GPG key ID: 5E28A33B0B84F577
2 changed files with 40 additions and 16 deletions

View file

@ -88,6 +88,7 @@
(eval . (put 'manifest-pattern 'scheme-indent-function 0)) (eval . (put 'manifest-pattern 'scheme-indent-function 0))
(eval . (put 'substitute-keyword-arguments 'scheme-indent-function 1)) (eval . (put 'substitute-keyword-arguments 'scheme-indent-function 1))
(eval . (put 'with-store 'scheme-indent-function 1)) (eval . (put 'with-store 'scheme-indent-function 1))
(eval . (put 'with-store/non-blocking 'scheme-indent-function 1))
(eval . (put 'with-external-store 'scheme-indent-function 1)) (eval . (put 'with-external-store 'scheme-indent-function 1))
(eval . (put 'with-error-handling 'scheme-indent-function 0)) (eval . (put 'with-error-handling 'scheme-indent-function 0))
(eval . (put 'with-mutex 'scheme-indent-function 1)) (eval . (put 'with-mutex 'scheme-indent-function 1))

View file

@ -106,6 +106,7 @@ (define-module (guix store)
port->connection port->connection
close-connection close-connection
with-store with-store
with-store/non-blocking
set-build-options set-build-options
set-build-options* set-build-options*
valid-path? valid-path?
@ -462,12 +463,17 @@ (define-syntax-rule (system-error-to-connection-error file exp ...)
(file file) (file file)
(errno errno)))))))) (errno errno))))))))
(define (open-unix-domain-socket file) (define* (open-unix-domain-socket file #:key non-blocking?)
"Connect to the Unix-domain socket at FILE and return it. Raise a "Connect to the Unix-domain socket at FILE and return it. Raise a
'&store-connection-error' upon error." '&store-connection-error' upon error. If NON-BLOCKING?, make the socket
non-blocking."
(let ((s (with-fluids ((%default-port-encoding #f)) (let ((s (with-fluids ((%default-port-encoding #f))
;; This trick allows use of the `scm_c_read' optimization. ;; This trick allows use of the `scm_c_read' optimization.
(socket PF_UNIX (logior SOCK_STREAM SOCK_CLOEXEC) 0))) (socket PF_UNIX
(if non-blocking?
(logior SOCK_STREAM SOCK_CLOEXEC SOCK_NONBLOCK)
(logior SOCK_STREAM SOCK_CLOEXEC))
0)))
(a (make-socket-address PF_UNIX file))) (a (make-socket-address PF_UNIX file)))
(system-error-to-connection-error file (system-error-to-connection-error file
@ -478,9 +484,10 @@ (define %default-guix-port
;; Default port when connecting to a daemon over TCP/IP. ;; Default port when connecting to a daemon over TCP/IP.
44146) 44146)
(define (open-inet-socket host port) (define* (open-inet-socket host port #:key non-blocking?)
"Connect to the Unix-domain socket at HOST:PORT and return it. Raise a "Connect to the Unix-domain socket at HOST:PORT and return it. Raise a
'&store-connection-error' upon error." '&store-connection-error' upon error. If NON-BLOCKING?, make the socket
non-blocking."
(define addresses (define addresses
(getaddrinfo host (getaddrinfo host
(if (number? port) (number->string port) port) (if (number? port) (number->string port) port)
@ -495,7 +502,10 @@ (define addresses
((ai rest ...) ((ai rest ...)
(let ((s (socket (addrinfo:fam ai) (let ((s (socket (addrinfo:fam ai)
;; TCP/IP only ;; TCP/IP only
(logior SOCK_STREAM SOCK_CLOEXEC) IPPROTO_IP))) (if non-blocking?
(logior SOCK_STREAM SOCK_CLOEXEC SOCK_NONBLOCK)
(logior SOCK_STREAM SOCK_CLOEXEC))
IPPROTO_IP)))
(catch 'system-error (catch 'system-error
(lambda () (lambda ()
@ -514,9 +524,10 @@ (define addresses
(errno (system-error-errno args))))) (errno (system-error-errno args)))))
(loop rest))))))))) (loop rest)))))))))
(define (connect-to-daemon uri) (define* (connect-to-daemon uri #:key non-blocking?)
"Connect to the daemon at URI, a string that may be an actual URI or a file "Connect to the daemon at URI, a string that may be an actual URI or a file
name, and return an input/output port. name, and return an input/output port. If NON-BLOCKING?, use a non-blocking
socket when using the file, unix or guix URI schemes.
This is a low-level procedure that does not perform the initial handshake with This is a low-level procedure that does not perform the initial handshake with
the daemon. Use 'open-connection' for that." the daemon. Use 'open-connection' for that."
@ -533,11 +544,13 @@ (define connect
(match (uri-scheme uri) (match (uri-scheme uri)
((or #f 'file 'unix) ((or #f 'file 'unix)
(lambda (_) (lambda (_)
(open-unix-domain-socket (uri-path uri)))) (open-unix-domain-socket (uri-path uri)
#:non-blocking? non-blocking?)))
('guix ('guix
(lambda (_) (lambda (_)
(open-inet-socket (uri-host uri) (open-inet-socket (uri-host uri)
(or (uri-port uri) %default-guix-port)))) (or (uri-port uri) %default-guix-port)
#:non-blocking? non-blocking?)))
((? symbol? scheme) ((? symbol? scheme)
;; Try to dynamically load a module for SCHEME. ;; Try to dynamically load a module for SCHEME.
;; XXX: Errors are swallowed. ;; XXX: Errors are swallowed.
@ -557,7 +570,8 @@ (define connect
(connect uri)) (connect uri))
(define* (open-connection #:optional (uri (%daemon-socket-uri)) (define* (open-connection #:optional (uri (%daemon-socket-uri))
#:key port (reserve-space? #t) cpu-affinity) #:key port (reserve-space? #t) cpu-affinity
non-blocking?)
"Connect to the daemon at URI (a string), or, if PORT is not #f, use it as "Connect to the daemon at URI (a string), or, if PORT is not #f, use it as
the I/O port over which to communicate to a build daemon. the I/O port over which to communicate to a build daemon.
@ -565,7 +579,9 @@ (define* (open-connection #:optional (uri (%daemon-socket-uri))
space on the file system so that the garbage collector can still operate, space on the file system so that the garbage collector can still operate,
should the disk become full. When CPU-AFFINITY is true, it must be an integer should the disk become full. When CPU-AFFINITY is true, it must be an integer
corresponding to an OS-level CPU number to which the daemon's worker process corresponding to an OS-level CPU number to which the daemon's worker process
for this connection will be pinned. Return a server object." for this connection will be pinned. If NON-BLOCKING?, use a non-blocking
socket when using the file, unix or guix URI schemes. Return a server
object."
(define (handshake-error) (define (handshake-error)
(raise (condition (raise (condition
(&store-connection-error (file (or port uri)) (&store-connection-error (file (or port uri))
@ -577,7 +593,8 @@ (define (handshake-error)
;; really a connection error. ;; really a connection error.
(handshake-error))) (handshake-error)))
(let*-values (((port) (let*-values (((port)
(or port (connect-to-daemon uri))) (or port (connect-to-daemon
uri #:non-blocking? non-blocking?)))
((output flush) ((output flush)
(buffering-output-port port (buffering-output-port port
(make-bytevector 8192)))) (make-bytevector 8192))))
@ -657,9 +674,10 @@ (define (close-connection server)
"Close the connection to SERVER." "Close the connection to SERVER."
(close (store-connection-socket server))) (close (store-connection-socket server)))
(define (call-with-store proc) (define* (call-with-store proc #:key non-blocking?)
"Call PROC with an open store connection." "Call PROC with an open store connection. Pass NON-BLOCKING? to
(let ((store (open-connection))) open-connection."
(let ((store (open-connection #:non-blocking? non-blocking?)))
(define (thunk) (define (thunk)
(parameterize ((current-store-protocol-version (parameterize ((current-store-protocol-version
(store-connection-version store))) (store-connection-version store)))
@ -678,6 +696,11 @@ (define-syntax-rule (with-store store exp ...)
automatically close the store when the dynamic extent of EXP is left." automatically close the store when the dynamic extent of EXP is left."
(call-with-store (lambda (store) exp ...))) (call-with-store (lambda (store) exp ...)))
(define-syntax-rule (with-store/non-blocking store exp ...)
"Bind STORE to an non-blocking open connection to the store and evaluate
EXPs; automatically close the store when the dynamic extent of EXP is left."
(call-with-store (lambda (store) exp ...) #:non-blocking? #t))
(define current-store-protocol-version (define current-store-protocol-version
;; Protocol version of the store currently used. XXX: This is a hack to ;; Protocol version of the store currently used. XXX: This is a hack to
;; communicate the protocol version to the build output port. It's a hack ;; communicate the protocol version to the build output port. It's a hack