Skip to content

Commit

Permalink
feat: support v2 backing image download file from sync service
Browse files Browse the repository at this point in the history
ref: longhorn/longhorn 6341

Signed-off-by: Jack Lin <[email protected]>
  • Loading branch information
ChanYiLin committed Nov 29, 2024
1 parent 411cf2f commit 8b65294
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 41 deletions.
12 changes: 8 additions & 4 deletions pkg/client/sync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (client *SyncClient) Fetch(srcFilePath, dstFilePath, uuid, diskUUID, expect
return nil
}

func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID, expectedChecksum string) error {
func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID, expectedChecksum, dataEngine string) error {
httpClient := &http.Client{Timeout: 0}

requestURL := fmt.Sprintf("http://%s/v1/files", client.Remote)
Expand All @@ -201,6 +201,7 @@ func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID,
q.Add("uuid", uuid)
q.Add("disk-uuid", diskUUID)
q.Add("expected-checksum", expectedChecksum)
q.Add("data-engine", dataEngine)
req.URL.RawQuery = q.Encode()

resp, err := httpClient.Do(req)
Expand All @@ -220,7 +221,7 @@ func (client *SyncClient) DownloadFromURL(downloadURL, filePath, uuid, diskUUID,
return nil
}

func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string) error {
func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string, dataEngine string) error {
httpClient := &http.Client{Timeout: 0}
encodedCredential, err := json.Marshal(credential)
if err != nil {
Expand All @@ -242,6 +243,7 @@ func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackin
q.Add("uuid", uuid)
q.Add("disk-uuid", diskUUID)
q.Add("expected-checksum", expectedChecksum)
q.Add("data-engine", dataEngine)

req.URL.RawQuery = q.Encode()

Expand All @@ -262,7 +264,7 @@ func (client *SyncClient) CloneFromBackingImage(sourceBackingImage, sourceBackin
return nil
}

func (client *SyncClient) RestoreFromBackupURL(backupURL, concurrentLimit, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string) error {
func (client *SyncClient) RestoreFromBackupURL(backupURL, concurrentLimit, filePath, uuid, diskUUID, expectedChecksum string, credential map[string]string, dataEngine string) error {
httpClient := &http.Client{Timeout: 0}
encodedCredential, err := json.Marshal(credential)
if err != nil {
Expand All @@ -283,6 +285,7 @@ func (client *SyncClient) RestoreFromBackupURL(backupURL, concurrentLimit, fileP
q.Add("disk-uuid", diskUUID)
q.Add("expected-checksum", expectedChecksum)
q.Add("concurrent-limit", concurrentLimit)
q.Add("data-engine", dataEngine)

req.URL.RawQuery = q.Encode()

Expand Down Expand Up @@ -363,7 +366,7 @@ func (client *SyncClient) Upload(src, dst, uuid, diskUUID, expectedChecksum stri
return nil
}

func (client *SyncClient) Receive(filePath, uuid, diskUUID, expectedChecksum, fileType string, receiverPort int, size int64) error {
func (client *SyncClient) Receive(filePath, uuid, diskUUID, expectedChecksum, fileType string, receiverPort int, size int64, dataEngine string) error {
httpClient := &http.Client{Timeout: 0}

requestURL := fmt.Sprintf("http://%s/v1/files", client.Remote)
Expand All @@ -380,6 +383,7 @@ func (client *SyncClient) Receive(filePath, uuid, diskUUID, expectedChecksum, fi
q.Add("file-type", fileType)
q.Add("port", strconv.Itoa(receiverPort))
q.Add("size", strconv.FormatInt(size, 10))
q.Add("data-engine", dataEngine)
req.URL.RawQuery = q.Encode()

resp, err := httpClient.Do(req)
Expand Down
34 changes: 30 additions & 4 deletions pkg/datasource/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (s *Service) cloneFromBackingImage() (err error) {
return fmt.Errorf("%v is not specified", types.DataSourceTypeCloneParameterBackingImageUUID)
}

dataEngine := s.parameters[types.DataSourceTypeParameterDataEngine]
if dataEngine == "" {
dataEngine = types.DataEnginev1
}

encryption := s.parameters[types.DataSourceTypeCloneParameterEncryption]
if types.EncryptionType(encryption) != types.EncryptionTypeEncrypt &&
types.EncryptionType(encryption) != types.EncryptionTypeDecrypt &&
Expand All @@ -214,7 +219,7 @@ func (s *Service) cloneFromBackingImage() (err error) {
}
}

return s.syncClient.CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential)
return s.syncClient.CloneFromBackingImage(sourceBackingImage, sourceBackingImageUUID, encryption, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential, dataEngine)
}

func (s *Service) restoreFromBackupURL() (err error) {
Expand All @@ -227,16 +232,25 @@ func (s *Service) restoreFromBackupURL() (err error) {
return fmt.Errorf("no %v for restore", types.DataSourceTypeRestoreParameterConcurrentLimit)
}

return s.syncClient.RestoreFromBackupURL(backupURL, concurrentLimit, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential)
dataEngine := s.parameters[types.DataSourceTypeParameterDataEngine]
if dataEngine == "" {
dataEngine = types.DataEnginev1
}

return s.syncClient.RestoreFromBackupURL(backupURL, concurrentLimit, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, s.credential, dataEngine)
}

func (s *Service) downloadFromURL(parameters map[string]string) (err error) {
url := parameters[types.DataSourceTypeDownloadParameterURL]
if url == "" {
return fmt.Errorf("no URL for file downloading")
}
dataEngine := parameters[types.DataSourceTypeParameterDataEngine]
if dataEngine == "" {
dataEngine = types.DataEnginev1
}

return s.syncClient.DownloadFromURL(url, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum)
return s.syncClient.DownloadFromURL(url, s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, dataEngine)
}

func (s *Service) prepareForUpload() (err error) {
Expand All @@ -262,6 +276,11 @@ func (s *Service) exportFromVolume(parameters map[string]string) error {
}
fileType := parameters[types.DataSourceTypeFileType]

dataEngine := parameters[types.DataSourceTypeParameterDataEngine]
if dataEngine == "" {
dataEngine = types.DataEnginev1
}

var size int64
var err error
if size, err = strconv.ParseInt(parameters[types.DataSourceTypeExportFromVolumeParameterVolumeSize], 10, 64); err != nil {
Expand All @@ -283,7 +302,7 @@ func (s *Service) exportFromVolume(parameters map[string]string) error {
}
s.log.Infof("DataSource Service: export volume via %v", storageIP)

if err := s.syncClient.Receive(s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, fileType, types.DefaultVolumeExportReceiverPort, size); err != nil {
if err := s.syncClient.Receive(s.filePath, s.uuid, s.diskUUID, s.expectedChecksum, fileType, types.DefaultVolumeExportReceiverPort, size, dataEngine); err != nil {
return err
}

Expand Down Expand Up @@ -323,6 +342,13 @@ func (s *Service) Upload(writer http.ResponseWriter, request *http.Request) {
q.Add("file-path", s.filePath)
q.Add("uuid", s.uuid)
q.Add("expected-checksum", s.expectedChecksum)

dataEngine := s.parameters[types.DataSourceTypeParameterDataEngine]
if dataEngine == "" {
dataEngine = types.DataEnginev1
}
q.Add("data-engine", dataEngine)

request.URL.RawQuery = q.Encode()
s.log.Debugf("DataSource Service: forwarding upload request to sync server %v", request.URL.String())

Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (m *Manager) Sync(ctx context.Context, req *rpc.SyncRequest) (resp *rpc.Bac
}()

biFilePath := types.GetBackingImageFilePath(m.diskPath, req.Spec.Name, req.Spec.Uuid)
if err := m.syncClient.Receive(biFilePath, req.Spec.Uuid, m.diskUUID, req.Spec.Checksum, "", int(port), req.Spec.Size); err != nil {
if err := m.syncClient.Receive(biFilePath, req.Spec.Uuid, m.diskUUID, req.Spec.Checksum, "", int(port), req.Spec.Size, types.DataEnginev1); err != nil {
portReleaseChannel <- nil
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sync/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewRouter(service *Service) *mux.Router {
router.HandleFunc("/v1/files/{id}", service.Delete).Methods("DELETE")
router.HandleFunc("/v1/files/{id}", service.Forget).Methods("POST").Queries("action", "forget")
router.HandleFunc("/v1/files/{id}", service.SendToPeer).Methods("POST").Queries("action", "sendToPeer")
router.HandleFunc("/v1/files/{id}/download", service.DownloadToDst).Methods("GET")
router.HandleFunc("/v1/files/{id}/download", service.DownloadToDst).Methods("GET", "HEAD")

// Launch a new file
router.HandleFunc("/v1/files", service.Fetch).Methods("POST").Queries("action", "fetch")
Expand Down
18 changes: 9 additions & 9 deletions pkg/sync/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *SyncTestSuite) BenchmarkMultipleDownload(c *C) {
Remote: s.addr,
}

err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, curUUID, TestDiskUUID, "")
err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, curUUID, TestDiskUUID, "", types.DataEnginev1)
c.Assert(err, IsNil)

_, err = getAndWaitFileState(cli, curPath, string(types.StateReady), 30)
Expand Down Expand Up @@ -218,7 +218,7 @@ func (s *SyncTestSuite) BenchmarkOneReceiveAndMultiSendWithSendingLimit(c *C) {
curUUID := TestSyncingFileUUID + "-dst-" + strconv.Itoa(i)
curReceiverPort := TestSyncServiceReceivePort + i
curReceiverAddress := fmt.Sprintf("localhost:%d", curReceiverPort)
err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB))
err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB), types.DataEnginev1)
c.Assert(err, IsNil)

err = cli.Send(originalFilePath, curReceiverAddress)
Expand Down Expand Up @@ -314,7 +314,7 @@ func (s *SyncTestSuite) BenchmarkMultiReceiveAndMultiSend(c *C) {
Remote: s.addr,
}

err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB))
err := cli.Receive(dstFilePath, curUUID, TestDiskUUID, checksum, types.SyncingFileTypeQcow2, curReceiverPort, int64(sizeInMB*MB), types.DataEnginev1)
c.Assert(err, IsNil)
err = cli.Send(srcFilePath, curReceiverAddress)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s *SyncTestSuite) TestTimeoutReceiveFromPeers(c *C) {
}

go func() {
err := cli.Receive(curPath, TestSyncingFileUUID, TestDiskUUID, "", types.SyncingFileTypeQcow2, TestSyncServiceReceivePort, MockFileSize)
err := cli.Receive(curPath, TestSyncingFileUUID, TestDiskUUID, "", types.SyncingFileTypeQcow2, TestSyncServiceReceivePort, MockFileSize, types.DataEnginev1)
c.Assert(err, IsNil)
}()

Expand Down Expand Up @@ -522,21 +522,21 @@ func (s *SyncTestSuite) TestDuplicateCalls(c *C) {
Remote: s.addr,
}

err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "")
err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1)
c.Assert(err, IsNil)

_, err = getAndWaitFileState(cli, curPath, string(types.StateReady), 30)
c.Assert(err, IsNil)

// Duplicate file launching calls should error out:
// "resp.StatusCode(500) != http.StatusOK(200), response body content: file /root/test-dir/sync-tests/sync-download-file-for-dup-calls already exists\n"
err = cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "")
err = cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1)
c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`)
err = cli.Upload(curPath, curPath, TestSyncingFileUUID, TestDiskUUID, "")
c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`)
err = cli.Receive(curPath, TestDiskUUID, TestSyncingFileUUID, "", "", types.DefaultVolumeExportReceiverPort, MockFileSize)
err = cli.Receive(curPath, TestDiskUUID, TestSyncingFileUUID, "", "", types.DefaultVolumeExportReceiverPort, MockFileSize, types.DataEnginev1)
c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`)
err = cli.DownloadFromURL("http://test-download-from-url.io", curPath+"-non-existing", TestSyncingFileUUID, TestDiskUUID, "")
err = cli.DownloadFromURL("http://test-download-from-url.io", curPath+"-non-existing", TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1)
c.Assert(err, ErrorMatches, `.*already exists[\s\S]*`)

// Duplicate delete or forget calls won't error out
Expand Down Expand Up @@ -608,7 +608,7 @@ func (s *SyncTestSuite) TestReadyFileValidation(c *C) {
Remote: s.addr,
}

err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "")
err := cli.DownloadFromURL("http://test-download-from-url.io", curPath, TestSyncingFileUUID, TestDiskUUID, "", types.DataEnginev1)
c.Assert(err, IsNil)

fInfo, err := getAndWaitFileState(cli, curPath, string(types.StateReady), 30)
Expand Down
Loading

0 comments on commit 8b65294

Please sign in to comment.