Skip to content

Commit

Permalink
Merge pull request #438 from CortexFoundation/remote-cvm-fix
Browse files Browse the repository at this point in the history
remote cvm fix
  • Loading branch information
ucwong authored Feb 20, 2024
2 parents cdb7861 + 32d4db5 commit 254cf30
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
20 changes: 11 additions & 9 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/mclock"
//"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/torrentfs/backend"
"github.com/CortexFoundation/torrentfs/backend/caffe"
Expand All @@ -45,21 +45,21 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi
}(ctx, infohash)

if params.IsGood(infohash) {
start := mclock.Now()
log.Info("Downloading ... ...", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "current", fs.monitor.CurrentNumber())
//start := mclock.Now()
//log.Info("Downloading ... ...", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "current", fs.monitor.CurrentNumber())

if mux != nil {
sub := mux.Subscribe(caffe.TorrentEvent{})
defer sub.Unsubscribe()

select {
case ev := <-sub.Chan():
log.Info("Seeding notify !!! !!!", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "current", fs.monitor.CurrentNumber(), "ev", ev.Data)
case <-sub.Chan():
//log.Info("Seeding notify !!! !!!", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "current", fs.monitor.CurrentNumber(), "ev", ev.Data)
if ret, _, err := fs.storage().GetFile(ctx, infohash, subpath); err != nil {
log.Debug("File downloading ... ...", "ih", infohash, "size", common.StorageSize(rawSize), "path", subpath, "err", err)
} else {
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
log.Info("Downloaded", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "elapsed", common.PrettyDuration(elapsed), "current", fs.monitor.CurrentNumber())
//elapsed := time.Duration(mclock.Now()) - time.Duration(start)
//log.Info("Downloaded", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "elapsed", common.PrettyDuration(elapsed), "current", fs.monitor.CurrentNumber())
if uint64(len(ret)) > rawSize {
return nil, backend.ErrInvalidRawSize
}
Expand All @@ -71,6 +71,7 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi
log.Warn("Timeout", "ih", infohash, "size", common.StorageSize(rawSize), "err", ctx.Err(), "retry", fs.retry.Load(), "complete", common.StorageSize(co), "timeout", to, "exist", ex)
return nil, ctx.Err()
case <-fs.closeAll:
log.Info("File out")
return nil, nil
}
} else {
Expand All @@ -83,8 +84,8 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi
log.Debug("File downloading ... ...", "ih", infohash, "size", common.StorageSize(rawSize), "path", subpath, "err", err)
t.Reset(1000 * time.Millisecond)
} else {
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
log.Info("Downloaded", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "elapsed", common.PrettyDuration(elapsed), "current", fs.monitor.CurrentNumber())
//elapsed := time.Duration(mclock.Now()) - time.Duration(start)
//log.Info("Downloaded", "ih", infohash, "size", common.StorageSize(rawSize), "neighbors", fs.Neighbors(), "elapsed", common.PrettyDuration(elapsed), "current", fs.monitor.CurrentNumber())
if uint64(len(ret)) > rawSize {
return nil, backend.ErrInvalidRawSize
}
Expand All @@ -96,6 +97,7 @@ func (fs *TorrentFS) GetFileWithSize(ctx context.Context, infohash string, rawSi
log.Warn("Timeout", "ih", infohash, "size", common.StorageSize(rawSize), "err", ctx.Err(), "retry", fs.retry.Load(), "complete", common.StorageSize(co), "timeout", to, "exist", ex)
return nil, ctx.Err()
case <-fs.closeAll:
log.Info("File out")
return nil, nil
}
}
Expand Down
19 changes: 13 additions & 6 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ func create(config *params.Config, cache, compress, listen bool) (*TorrentFS, er
"number": inst.monitor.CurrentNumber(),
"maxMessageSize": inst.MaxMessageSize(),
// "listen": monitor.listen,
"metrics": inst.NasCounter(),
"neighbours": inst.Neighbors(),
"received": inst.received.Load(),
"sent": inst.sent.Load(),
"metrics": inst.NasCounter(),
//"neighbours": inst.Neighbors(),
"received": inst.received.Load(),
"sent": inst.sent.Load(),
},
//"score": inst.scoreTable,
"worm": inst.worm,
Expand Down Expand Up @@ -396,6 +396,7 @@ func (fs *TorrentFS) bitsflow(ctx context.Context, ih string, size uint64) error
case <-ctx.Done():
return ctx.Err()
case <-fs.closeAll:
log.Info("bitsflow out")
return nil
}

Expand Down Expand Up @@ -429,8 +430,14 @@ func (fs *TorrentFS) Stop() error {
close(fs.closeAll)
fs.wg.Wait()

for _, p := range fs.peers {
p.stop()
if len(fs.peers) > 0 {
for _, p := range fs.peers {
p.stop()
}
}

if fs.net != nil {
fs.net.Stop()
}

if fs.tunnel != nil {
Expand Down
4 changes: 2 additions & 2 deletions infohash.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/torrentfs/params"
//"github.com/CortexFoundation/torrentfs/params"
"github.com/ucwong/go-ttlmap"
)

Expand Down Expand Up @@ -59,7 +59,7 @@ func (fs *TorrentFS) download(ctx context.Context, ih string, request uint64) er
defer fs.wg.Done()
s := fs.broadcast(ih, p)
if s {
log.Debug("Nas "+params.ProtocolVersionStr+" tunnel", "ih", ih, "request", common.StorageSize(float64(p)), "tunnel", fs.tunnel.Len(), "peers", fs.Neighbors())
//log.Debug("Nas "+params.ProtocolVersionStr+" tunnel", "ih", ih, "request", common.StorageSize(float64(p)), "tunnel", fs.tunnel.Len(), "peers", fs.Neighbors())
}
}(ih, p)
}
Expand Down

0 comments on commit 254cf30

Please sign in to comment.