-
Notifications
You must be signed in to change notification settings - Fork 18
/
fruit.py
104 lines (73 loc) · 2.89 KB
/
fruit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import ray
from ray import serve
from ray.serve.drivers import DAGDriver
from ray.serve.deployment_graph import InputNode
from ray.serve.http_adapters import json_request
# These imports are used only for type hints:
from typing import Dict, List
from starlette.requests import Request
from ray.serve.handle import RayServeDeploymentHandle
@serve.deployment(num_replicas=2)
class FruitMarket:
def __init__(
self,
mango_stand: RayServeDeploymentHandle,
orange_stand: RayServeDeploymentHandle,
pear_stand: RayServeDeploymentHandle,
):
self.directory = {
"MANGO": mango_stand,
"ORANGE": orange_stand,
"PEAR": pear_stand,
}
async def check_price(self, fruit: str, amount: float) -> float:
if fruit not in self.directory:
return -1
else:
fruit_stand = self.directory[fruit]
ref: ray.ObjectRef = await fruit_stand.check_price.remote(amount)
result = await ref
return result
@serve.deployment(user_config={"price": 3})
class MangoStand:
DEFAULT_PRICE = 1
def __init__(self):
# This default price is overwritten by the one specified in the
# user_config through the reconfigure() method.
self.price = self.DEFAULT_PRICE
def reconfigure(self, config: Dict):
self.price = config.get("price", self.DEFAULT_PRICE)
def check_price(self, amount: float) -> float:
return self.price * amount
@serve.deployment(user_config={"price": 2})
class OrangeStand:
DEFAULT_PRICE = 0.5
def __init__(self):
# This default price is overwritten by the one specified in the
# user_config through the reconfigure() method.
self.price = self.DEFAULT_PRICE
def reconfigure(self, config: Dict):
self.price = config.get("price", self.DEFAULT_PRICE)
def check_price(self, amount: float) -> float:
return self.price * amount
@serve.deployment(user_config={"price": 4})
class PearStand:
DEFAULT_PRICE = 0.75
def __init__(self):
# This default price is overwritten by the one specified in the
# user_config through the reconfigure() method.
self.price = self.DEFAULT_PRICE
def reconfigure(self, config: Dict):
self.price = config.get("price", self.DEFAULT_PRICE)
def check_price(self, amount: float) -> float:
return self.price * amount
async def json_resolver(request: Request) -> List:
return await request.json()
with InputNode() as query:
fruit, amount = query[0], query[1]
mango_stand = MangoStand.bind()
orange_stand = OrangeStand.bind()
pear_stand = PearStand.bind()
fruit_market = FruitMarket.bind(mango_stand, orange_stand, pear_stand)
net_price = fruit_market.check_price.bind(fruit, amount)
deployment_graph = DAGDriver.bind(net_price, http_adapter=json_request)