From fd3e54dd2779a3123fa559c870ad7d7f06a4ce62 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 4 Sep 2024 16:21:23 +0000 Subject: [PATCH 1/2] add check --- disperser/common/blobstore/shared_storage.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 0f1cc0589..151760604 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -21,6 +21,8 @@ const ( var errProcessingToDispersing = errors.New("blob transit to dispersing from non processing") +var errProcessingInitialization = errors.New("blob status initialization") + // The shared blob store that the disperser is operating on. // The metadata store is backed by DynamoDB and the blob store is backed by S3. // @@ -90,6 +92,12 @@ func (s *SharedBlobStore) StoreBlob(ctx context.Context, blob *core.Blob, reques metadataKey.BlobHash = blobHash metadataKey.MetadataHash = metadataHash + refreshedMetadata, err := s.GetBlobMetadata(ctx, metadataKey) + if !errors.Is(err, disperser.ErrMetadataNotFound) { + s.logger.Error("error intentionally get blob status before creation", "err", err, "status", refreshedMetadata.BlobStatus, "metadataKey", refreshedMetadata.GetBlobKey().String()) + return metadataKey, errProcessingInitialization + } + err = s.s3Client.UploadObject(ctx, s.bucketName, blobObjectKey(blobHash), blob.Data) if err != nil { s.logger.Error("error uploading blob", "err", err) From f5dcc79a742db143ea18b61ebc73b7167c9d9d34 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 4 Sep 2024 17:07:45 +0000 Subject: [PATCH 2/2] break down case by case --- disperser/common/blobstore/shared_storage.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 151760604..2b7108bf7 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -21,7 +21,7 @@ const ( var errProcessingToDispersing = errors.New("blob transit to dispersing from non processing") -var errProcessingInitialization = errors.New("blob status initialization") +var errProcessingMeetingPrecondition = errors.New("blob not meeting precondition to processing") // The shared blob store that the disperser is operating on. // The metadata store is backed by DynamoDB and the blob store is backed by S3. @@ -93,9 +93,21 @@ func (s *SharedBlobStore) StoreBlob(ctx context.Context, blob *core.Blob, reques metadataKey.MetadataHash = metadataHash refreshedMetadata, err := s.GetBlobMetadata(ctx, metadataKey) - if !errors.Is(err, disperser.ErrMetadataNotFound) { - s.logger.Error("error intentionally get blob status before creation", "err", err, "status", refreshedMetadata.BlobStatus, "metadataKey", refreshedMetadata.GetBlobKey().String()) - return metadataKey, errProcessingInitialization + + // the only safe condition: + // err is disperser.ErrMetadataNotFound && refreshedMetadata is nil + if err == nil && refreshedMetadata != nil { + s.logger.Error("error blob not meeting precondition", "status", refreshedMetadata.BlobStatus, "metadataKey", refreshedMetadata.GetBlobKey().String()) + return metadataKey, errProcessingMeetingPrecondition + } else if err != nil && refreshedMetadata == nil { + if !errors.Is(err, disperser.ErrMetadataNotFound) { + s.logger.Error("error blob request or marshal error", "err", err) + return metadataKey, errProcessingMeetingPrecondition + } + // else it is expected + } else { + s.logger.Error("error get blob status abnormality ", "err", err) + return metadataKey, errProcessingMeetingPrecondition } err = s.s3Client.UploadObject(ctx, s.bucketName, blobObjectKey(blobHash), blob.Data)