Merge pull request #690 from BrianBland/storagedriver-process-management
Adds logic for tracking ipc storage driver process statusmaster
						commit
						378256de47
					
				|  | @ -3,6 +3,7 @@ package ipc | ||||||
| import ( | import ( | ||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net" | 	"net" | ||||||
|  | @ -15,23 +16,29 @@ import ( | ||||||
| 	"github.com/docker/libchan/spdy" | 	"github.com/docker/libchan/spdy" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // StorageDriverExecutablePrefix is the prefix which the IPC storage driver loader expects driver
 | // StorageDriverExecutablePrefix is the prefix which the IPC storage driver
 | ||||||
| // executables to begin with. For example, the s3 driver should be named "registry-storage-s3".
 | // loader expects driver executables to begin with. For example, the s3 driver
 | ||||||
|  | // should be named "registry-storagedriver-s3".
 | ||||||
| const StorageDriverExecutablePrefix = "registry-storagedriver-" | const StorageDriverExecutablePrefix = "registry-storagedriver-" | ||||||
| 
 | 
 | ||||||
| // StorageDriverClient is a storagedriver.StorageDriver implementation using a managed child process
 | // StorageDriverClient is a storagedriver.StorageDriver implementation using a
 | ||||||
| // communicating over IPC using libchan with a unix domain socket
 | // managed child process communicating over IPC using libchan with a unix domain
 | ||||||
|  | // socket
 | ||||||
| type StorageDriverClient struct { | type StorageDriverClient struct { | ||||||
| 	subprocess *exec.Cmd | 	subprocess *exec.Cmd | ||||||
|  | 	exitChan   chan error | ||||||
|  | 	exitErr    error | ||||||
|  | 	stopChan   chan struct{} | ||||||
| 	socket     *os.File | 	socket     *os.File | ||||||
| 	transport  *spdy.Transport | 	transport  *spdy.Transport | ||||||
| 	sender     libchan.Sender | 	sender     libchan.Sender | ||||||
| 	version    storagedriver.Version | 	version    storagedriver.Version | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewDriverClient constructs a new out-of-process storage driver using the driver name and
 | // NewDriverClient constructs a new out-of-process storage driver using the
 | ||||||
| // configuration parameters
 | // driver name and configuration parameters
 | ||||||
| // A user must call Start on this driver client before remote method calls can be made
 | // A user must call Start on this driver client before remote method calls can
 | ||||||
|  | // be made
 | ||||||
| //
 | //
 | ||||||
| // Looks for drivers in the following locations in order:
 | // Looks for drivers in the following locations in order:
 | ||||||
| // - Storage drivers directory (to be determined, yet not implemented)
 | // - Storage drivers directory (to be determined, yet not implemented)
 | ||||||
|  | @ -56,9 +63,13 @@ func NewDriverClient(name string, parameters map[string]string) (*StorageDriverC | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Start starts the designated child process storage driver and binds a socket to this process for
 | // Start starts the designated child process storage driver and binds a socket
 | ||||||
| // IPC method calls
 | // to this process for IPC method calls
 | ||||||
| func (driver *StorageDriverClient) Start() error { | func (driver *StorageDriverClient) Start() error { | ||||||
|  | 	driver.exitErr = nil | ||||||
|  | 	driver.exitChan = make(chan error) | ||||||
|  | 	driver.stopChan = make(chan struct{}) | ||||||
|  | 
 | ||||||
| 	fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) | 	fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
|  | @ -76,6 +87,8 @@ func (driver *StorageDriverClient) Start() error { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	go driver.handleSubprocessExit() | ||||||
|  | 
 | ||||||
| 	if err = childSocket.Close(); err != nil { | 	if err = childSocket.Close(); err != nil { | ||||||
| 		driver.Stop() | 		driver.Stop() | ||||||
| 		return err | 		return err | ||||||
|  | @ -142,6 +155,10 @@ func (driver *StorageDriverClient) Stop() error { | ||||||
| 	if driver.subprocess != nil { | 	if driver.subprocess != nil { | ||||||
| 		killErr = driver.subprocess.Process.Kill() | 		killErr = driver.subprocess.Process.Kill() | ||||||
| 	} | 	} | ||||||
|  | 	if driver.stopChan != nil { | ||||||
|  | 		driver.stopChan <- struct{}{} | ||||||
|  | 		close(driver.stopChan) | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	if closeSenderErr != nil { | 	if closeSenderErr != nil { | ||||||
| 		return closeSenderErr | 		return closeSenderErr | ||||||
|  | @ -150,12 +167,17 @@ func (driver *StorageDriverClient) Stop() error { | ||||||
| 	} else if closeSocketErr != nil { | 	} else if closeSocketErr != nil { | ||||||
| 		return closeSocketErr | 		return closeSocketErr | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return killErr | 	return killErr | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Implement the storagedriver.StorageDriver interface over IPC
 | // Implement the storagedriver.StorageDriver interface over IPC
 | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { | func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { | ||||||
|  | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 
 | 
 | ||||||
| 	params := map[string]interface{}{"Path": path} | 	params := map[string]interface{}{"Path": path} | ||||||
|  | @ -164,8 +186,8 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response ReadStreamResponse | 	response := new(ReadStreamResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | @ -183,6 +205,10 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { | func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { | ||||||
|  | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 
 | 
 | ||||||
| 	params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))} | 	params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))} | ||||||
|  | @ -191,8 +217,8 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response WriteStreamResponse | 	response := new(WriteStreamResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | @ -205,16 +231,19 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { | func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 	params := map[string]interface{}{"Path": path, "Offset": offset} | 	params := map[string]interface{}{"Path": path, "Offset": offset} | ||||||
| 	err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) | 	err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response ReadStreamResponse | 	response := new(ReadStreamResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | @ -227,16 +256,19 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { | func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 	params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)} | 	params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)} | ||||||
| 	err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) | 	err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response WriteStreamResponse | 	response := new(WriteStreamResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | @ -249,16 +281,19 @@ func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { | func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 	params := map[string]interface{}{"Path": path} | 	params := map[string]interface{}{"Path": path} | ||||||
| 	err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender}) | 	err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, err | 		return 0, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response CurrentSizeResponse | 	response := new(CurrentSizeResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, err | 		return 0, err | ||||||
| 	} | 	} | ||||||
|  | @ -271,16 +306,19 @@ func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) List(path string) ([]string, error) { | func (driver *StorageDriverClient) List(path string) ([]string, error) { | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 	params := map[string]interface{}{"Path": path} | 	params := map[string]interface{}{"Path": path} | ||||||
| 	err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) | 	err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response ListResponse | 	response := new(ListResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | @ -293,16 +331,19 @@ func (driver *StorageDriverClient) List(path string) ([]string, error) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { | func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 	params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} | 	params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} | ||||||
| 	err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) | 	err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response MoveResponse | 	response := new(MoveResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | @ -315,16 +356,19 @@ func (driver *StorageDriverClient) Move(sourcePath string, destPath string) erro | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (driver *StorageDriverClient) Delete(path string) error { | func (driver *StorageDriverClient) Delete(path string) error { | ||||||
| 	receiver, remoteSender := libchan.Pipe() | 	if err := driver.exited(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	receiver, remoteSender := libchan.Pipe() | ||||||
| 	params := map[string]interface{}{"Path": path} | 	params := map[string]interface{}{"Path": path} | ||||||
| 	err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) | 	err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var response DeleteResponse | 	response := new(DeleteResponse) | ||||||
| 	err = receiver.Receive(&response) | 	err = driver.receiveResponse(receiver, response) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | @ -335,3 +379,66 @@ func (driver *StorageDriverClient) Delete(path string) error { | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // handleSubprocessExit populates the exit channel until we have explicitly
 | ||||||
|  | // stopped the storage driver subprocess
 | ||||||
|  | // Requests can select on driver.exitChan and response receiving and not hang if
 | ||||||
|  | // the process exits
 | ||||||
|  | func (driver *StorageDriverClient) handleSubprocessExit() { | ||||||
|  | 	exitErr := driver.subprocess.Wait() | ||||||
|  | 	if exitErr == nil { | ||||||
|  | 		exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly") | ||||||
|  | 	} else { | ||||||
|  | 		exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	driver.exitErr = exitErr | ||||||
|  | 
 | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case driver.exitChan <- exitErr: | ||||||
|  | 		case <-driver.stopChan: | ||||||
|  | 			close(driver.exitChan) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // receiveResponse populates the response value with the next result from the
 | ||||||
|  | // given receiver, or returns an error if receiving failed or the driver has
 | ||||||
|  | // stopped
 | ||||||
|  | func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error { | ||||||
|  | 	receiveChan := make(chan error, 1) | ||||||
|  | 	go func(receiveChan chan<- error) { | ||||||
|  | 		defer close(receiveChan) | ||||||
|  | 		receiveChan <- receiver.Receive(response) | ||||||
|  | 	}(receiveChan) | ||||||
|  | 
 | ||||||
|  | 	var err error | ||||||
|  | 	var ok bool | ||||||
|  | 	select { | ||||||
|  | 	case err = <-receiveChan: | ||||||
|  | 	case err, ok = <-driver.exitChan: | ||||||
|  | 		go func(receiveChan <-chan error) { | ||||||
|  | 			<-receiveChan | ||||||
|  | 		}(receiveChan) | ||||||
|  | 		if !ok { | ||||||
|  | 			err = driver.exitErr | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // exited returns an exit error if the driver has exited or nil otherwise
 | ||||||
|  | func (driver *StorageDriverClient) exited() error { | ||||||
|  | 	select { | ||||||
|  | 	case err, ok := <-driver.exitChan: | ||||||
|  | 		if !ok { | ||||||
|  | 			return driver.exitErr | ||||||
|  | 		} | ||||||
|  | 		return err | ||||||
|  | 	default: | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue