Skip to content

Commit

Permalink
prov/lnx: Convert peer table to use buffer pools
Browse files Browse the repository at this point in the history
Convert peer table to use buffer pools in order to utilize the built-in
capabilities of expanding the table as more peers are added dynamically.

The peer table is protected by the domain's genlock.

Signed-off-by: Amir Shehata <[email protected]>
  • Loading branch information
amirshehataornl authored and aingerson committed Dec 6, 2024
1 parent 622a773 commit 3b85472
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 133 deletions.
19 changes: 6 additions & 13 deletions prov/lnx/include/lnx.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#ifndef LNX_H
#define LNX_H

#define LNX_DEF_AV_SIZE 1024
#define LNX_MAX_LOCAL_EPS 16
#define LNX_IOV_LIMIT 4

Expand Down Expand Up @@ -180,6 +179,7 @@ struct lnx_peer_prov {
struct lnx_peer {
/* true if peer can be reached over shared memory, false otherwise */
bool lp_local;
fi_addr_t lp_fi_addr;

/* Each provider that we can reach the peer on will have an entry
* below. Each entry will contain all the local provider endpoints we
Expand All @@ -200,10 +200,9 @@ struct lnx_peer {
struct lnx_peer_table {
struct util_av lpt_av;
int lpt_max_count;
int lpt_count;
struct lnx_domain *lpt_domain;
/* an array of peer entries */
struct lnx_peer **lpt_entries;
/* an array of peer entries of type struct lnx_peer */
struct ofi_bufpool *lpt_entries;
};

struct lnx_ctx {
Expand Down Expand Up @@ -293,6 +292,9 @@ int lnx_domain_open(struct fid_fabric *fabric, struct fi_info *info,
int lnx_av_open(struct fid_domain *domain, struct fi_av_attr *attr,
struct fid_av **av, void *context);

struct lnx_peer *
lnx_av_lookup_addr(struct lnx_peer_table *peer_tbl, fi_addr_t addr);

int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq, void *context);

Expand All @@ -314,15 +316,6 @@ void lnx_free_entry(struct fi_peer_rx_entry *entry);
void lnx_foreach_unspec_addr(struct fid_peer_srx *srx,
fi_addr_t (*get_addr)(struct fi_peer_rx_entry *));

static inline struct lnx_peer *
lnx_get_peer(struct lnx_peer **peers, fi_addr_t addr)
{
if (!peers || addr == FI_ADDR_UNSPEC)
return NULL;

return peers[addr];
}

static inline
void lnx_get_core_desc(struct lnx_mem_desc *desc, void **mem_desc)
{
Expand Down
142 changes: 50 additions & 92 deletions prov/lnx/src/lnx_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,76 +58,25 @@
#include "rdma/fi_ext.h"
#include "lnx.h"

static void lnx_free_peer(struct lnx_peer *lp)
struct lnx_peer *
lnx_av_lookup_addr(struct lnx_peer_table *peer_tbl, fi_addr_t addr)
{
struct lnx_peer_prov *lpp;
struct dlist_entry *tmp, *tmp2;
struct lnx_local2peer_map *lpm;
struct lnx_peer *entry;

dlist_foreach_container_safe(&lp->lp_provs,
struct lnx_peer_prov, lpp, entry, tmp) {
dlist_foreach_container_safe(&lpp->lpp_map,
struct lnx_local2peer_map, lpm, entry, tmp2) {
dlist_remove(&lpm->entry);
free(lpm);
}
dlist_remove(&lpp->entry);
free(lpp);
}
if (addr == FI_ADDR_UNSPEC)
return NULL;

free(lp);
}

#if ENABLE_DEBUG
static void lnx_print_peer(int idx, struct lnx_peer *lp)
{
int k;
struct lnx_peer_prov *lpp;
struct lnx_local2peer_map *lpm;
ofi_genlock_lock(&peer_tbl->lpt_domain->ld_domain.lock);

FI_DBG(&lnx_prov, FI_LOG_CORE,
"%d: lnx_peer[%d] is %s\n", getpid(), idx,
(lp->lp_local) ? "local" : "remote");
dlist_foreach_container(&lp->lp_provs,
struct lnx_peer_prov, lpp, entry) {
FI_DBG(&lnx_prov, FI_LOG_CORE,
"%d: peer[%p] provider %s\n", getpid(), lpp,
lpp->lpp_prov_name);
dlist_foreach_container(&lpp->lpp_map,
struct lnx_local2peer_map, lpm, entry) {
FI_DBG(&lnx_prov, FI_LOG_CORE,
" %d: peer has %d mapped addrs\n",
getpid(), lpm->addr_count);
for (k = 0; k < lpm->addr_count; k++)
FI_DBG(&lnx_prov, FI_LOG_CORE,
" %d: addr = %lu\n",
getpid(), lpm->peer_addrs[k]);
}
}
}
#endif /* ENABLE_DEBUG */
entry = ofi_bufpool_get_ibuf(peer_tbl->lpt_entries, addr);

static int lnx_peer_insert(struct lnx_peer_table *tbl,
struct lnx_peer *lp)
{
int i;
ofi_genlock_unlock(&peer_tbl->lpt_domain->ld_domain.lock);

if (tbl->lpt_max_count == 0 ||
tbl->lpt_count >= tbl->lpt_max_count)
return -FI_ENOENT;

for (i = 0; i < tbl->lpt_max_count; i++) {
if (!tbl->lpt_entries[i]) {
tbl->lpt_entries[i] = lp;
#if ENABLE_DEBUG
lnx_print_peer(i, lp);
#endif
tbl->lpt_count++;
return i;
}
}
if (!entry)
FI_WARN(&lnx_prov, FI_LOG_CORE,
"Invalid fi_addr %#lx\n", addr);

return -FI_ENOENT;
return entry;
}

static int lnx_peer_av_remove(struct lnx_peer *lp)
Expand Down Expand Up @@ -160,19 +109,22 @@ static int lnx_peer_av_remove(struct lnx_peer *lp)
return frc;
}

static int lnx_peer_remove(struct lnx_peer_table *tbl, int idx)
static int lnx_peer_remove(struct lnx_peer_table *tbl, fi_addr_t addr)
{
struct lnx_peer *lp = tbl->lpt_entries[idx];
struct lnx_peer *lp = NULL;
int rc = 0;

ofi_genlock_lock(&tbl->lpt_domain->ld_domain.lock);
lp = ofi_bufpool_get_ibuf(tbl->lpt_entries, addr);
if (!lp)
return 0;
goto out;

rc = lnx_peer_av_remove(lp);

tbl->lpt_entries[idx] = NULL;
tbl->lpt_count--;
ofi_ibuf_free(lp);

out:
ofi_genlock_unlock(&tbl->lpt_domain->ld_domain.lock);
return rc;
}

Expand All @@ -193,7 +145,7 @@ static int lnx_cleanup_avs(struct local_prov *prov)

static inline void lnx_free_peer_tbl(struct lnx_peer_table *peer_tbl)
{
free(peer_tbl->lpt_entries);
ofi_bufpool_destroy(peer_tbl->lpt_entries);
free(peer_tbl);
}

Expand Down Expand Up @@ -501,10 +453,14 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count,
la->la_prov_count <= 0)
return -FI_EPROTO;

/* this is a local peer */
lp = calloc(sizeof(*lp), 1);
if (!lp)
ofi_genlock_lock(&peer_tbl->lpt_domain->ld_domain.lock);
lp = ofi_ibuf_alloc(peer_tbl->lpt_entries);
if (!lp) {
ofi_genlock_unlock(&peer_tbl->lpt_domain->ld_domain.lock);
return -FI_ENOMEM;
}
idx = ofi_buf_index(lp);
ofi_genlock_unlock(&peer_tbl->lpt_domain->ld_domain.lock);

dlist_init(&lp->lp_provs);

Expand All @@ -521,20 +477,18 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count,

rc = lnx_peer_map_addrs(prov_table, lp, la, flags, context);
if (rc) {
free(lp);
ofi_genlock_lock(&peer_tbl->lpt_domain->ld_domain.lock);
ofi_ibuf_free(lp);
ofi_genlock_unlock(&peer_tbl->lpt_domain->ld_domain.lock);
return rc;
}

idx = lnx_peer_insert(peer_tbl, lp);
if (idx == -1) {
rc = lnx_peer_av_remove(lp);
lnx_free_peer(lp);
FI_INFO(&lnx_prov, FI_LOG_CORE,
"Peer table size exceeded. Removed = %d\n", rc);
return -FI_ENOENT;
}
if (flags & FI_AV_USER_ID)
lp->lp_fi_addr = fi_addr[i];
else
lp->lp_fi_addr = idx;

fi_addr[i] = (fi_addr_t) idx;
fi_addr[i] = idx;

la = next_peer(la);
}
Expand Down Expand Up @@ -622,8 +576,12 @@ int lnx_av_open(struct fid_domain *domain, struct fi_av_attr *attr,
struct lnx_domain *lnx_domain;
struct lnx_peer_table *peer_tbl;
struct local_prov *entry;
size_t table_sz = LNX_DEF_AV_SIZE;
size_t table_sz;
int rc = 0;
struct ofi_bufpool_attr pool_attr = {
.size = sizeof(struct lnx_peer),
.flags = OFI_BUFPOOL_NO_TRACK | OFI_BUFPOOL_INDEXED,
};

if (!attr)
return -FI_EINVAL;
Expand All @@ -634,24 +592,24 @@ int lnx_av_open(struct fid_domain *domain, struct fi_av_attr *attr,
if (attr->type != FI_AV_TABLE)
attr->type = FI_AV_TABLE;

lnx_domain = container_of(domain, struct lnx_domain,
ld_domain.domain_fid.fid);
fabric = lnx_domain->ld_fabric;

peer_tbl = calloc(sizeof(*peer_tbl), 1);
if (!peer_tbl)
return -FI_ENOMEM;

if (attr->count != 0)
table_sz = attr->count;
table_sz = attr->count ? attr->count : ofi_universe_size;
table_sz = roundup_power_of_two(table_sz);
pool_attr.chunk_cnt = table_sz;

peer_tbl->lpt_entries =
calloc(sizeof(struct lnx_peer *) * table_sz, 1);
if (!peer_tbl->lpt_entries) {
rc = ofi_bufpool_create_attr(&pool_attr, &peer_tbl->lpt_entries);
if (rc) {
rc = -FI_ENOMEM;
goto failed;
}

lnx_domain = container_of(domain, struct lnx_domain,
ld_domain.domain_fid.fid);
fabric = lnx_domain->ld_fabric;

rc = ofi_av_init_lightweight(&lnx_domain->ld_domain, attr,
&peer_tbl->lpt_av, context);
if (rc) {
Expand Down
Loading

0 comments on commit 3b85472

Please sign in to comment.