-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.py
164 lines (134 loc) Β· 5.62 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import json
import os
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Optional
from urllib import request
@dataclass
class PullRequestContext:
number: int
base_branch: str
repository: str
@dataclass
class ActionContext:
author: Dict[str, str]
access_token: str
pipeline: str
branch: str
commit: str
message: str
env: Dict[str, str]
pull_request: Optional[PullRequestContext]
is_async: bool
is_test_mode: bool
@staticmethod
def from_env(env: Dict[str, str]) -> "ActionContext":
with open(env["GITHUB_EVENT_PATH"], "rb") as event_file:
event = json.load(event_file)
return ActionContext(
author=event.get("pusher", {}),
access_token=env["INPUT_ACCESS_TOKEN"],
pipeline=env["INPUT_PIPELINE"],
branch=env.get("INPUT_BRANCH") or ActionContext.__branch(env),
commit=env.get("INPUT_COMMIT") or env["GITHUB_SHA"],
message=env["INPUT_MESSAGE"],
pull_request=ActionContext.__pull_request_context(event),
env=json.loads(env.get("INPUT_ENV") or "{}"),
is_async=env.get("INPUT_ASYNC", "false").lower() == "true",
is_test_mode=env.get("TEST_MODE", "false").lower() == "true",
)
@staticmethod
def __branch(env: Dict[str, str]) -> str:
head_ref = env.get("GITHUB_HEAD_REF") # branch name on pull requests
if head_ref:
return head_ref
git_ref = env["GITHUB_REF"]
prefix = "refs/heads/"
if git_ref.startswith(prefix):
return git_ref[len(prefix):]
return git_ref
@staticmethod
def __pull_request_context(event: dict) -> Optional[PullRequestContext]:
pull_request = event.get("pull_request")
if pull_request:
return PullRequestContext(
number=pull_request["number"],
base_branch=pull_request["base"]["ref"],
repository=pull_request["head"]["repo"]["git_url"]
)
def main():
context = ActionContext.from_env(os.environ)
print(f"πͺ Triggering {context.pipeline} for {context.branch}@{context.commit}", flush=True)
build_info = trigger_pipeline(context)
state = report_build_state(build_info)
if not context.is_async:
build_info = wait_for_build(build_info["url"], context)
state = report_build_state(build_info)
output_build_info(build_info)
if state not in ["scheduled", "running", "passed"]:
raise RuntimeError(f"Pipeline failed with state '{state}'")
def trigger_pipeline(context: ActionContext) -> dict:
url = pipeline_url(context.pipeline)
headers = {"Authorization": f"Bearer {context.access_token}"}
payload = {
"commit": context.commit,
"branch": context.branch,
"message": context.message,
"author": context.author,
"env": context.env
}
if context.pull_request:
payload["pull_request_base_branch"] = context.pull_request.base_branch
payload["pull_request_id"] = context.pull_request.number
payload["pull_request_repository"] = context.pull_request.repository
data = bytes(json.dumps(payload), encoding="utf-8")
req = request.Request(url, method="POST", headers=headers, data=data)
return http_send(req, context, test_response="create_build")
def wait_for_build(url: str, context: ActionContext) -> dict:
headers = {"Authorization": f"Bearer {context.access_token}"}
req = request.Request(url, method="GET", headers=headers)
last_status = datetime.now()
build_info = {}
print(f"β Waiting for build to finish", flush=True)
while not build_info.get("finished_at"):
time.sleep(15)
if (datetime.now() - last_status).total_seconds() > 60:
print(f"β Still waiting for build to finish", flush=True)
last_status = datetime.now()
build_info = http_send(req, context, test_response="build_passed")
return build_info
def output_build_info(build_info: dict) -> None:
print(f"::set-output name=id::{build_info['id']}")
print(f"::set-output name=number::{build_info['number']}")
print(f"::set-output name=url::{build_info['url']}")
print(f"::set-output name=web_url::{build_info['web_url']}")
print(f"::set-output name=state::{build_info['state']}")
print(f"::set-output name=data::{json.dumps(build_info)}")
def pipeline_url(pipeline: str) -> str:
organization, pipeline = pipeline.split("/", maxsplit=1)
if (not organization) or (not pipeline) or ("/" in pipeline):
raise ValueError("pipeline must be in the form 'organization/pipeline'")
return f"https://api.buildkite.com/v2/organizations/{organization}/pipelines/{pipeline}/builds"
def report_build_state(build_info: dict) -> str:
state = build_info["state"]
print(f"{state_emoji(state)} Build {state} β {build_info['web_url']}", flush=True)
return state
def state_emoji(state: str) -> str:
return {
"scheduled": "ποΈ",
"running": "π",
"passed": "π",
}.get(state, "π")
def http_send(req: request.Request, context: ActionContext, *, test_response: str) -> dict:
if context.is_test_mode:
print("π§ Stubbing HTTP request in test mode:")
print(f"π§ {req.method} {req.full_url}")
if req.data:
print(f"π§ {req.data.decode('utf-8')}")
res = open(f"./test_responses/{test_response}.json", "rb")
else:
res = request.urlopen(req, timeout=10)
return json.loads(res.read())
if __name__ == "__main__":
main()