|
- # Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- # !/bin/env python
- import os
- import sys
- import time
- import socket
- from kubernetes import client, config
-
- NAMESPACE = os.getenv("NAMESPACE")
- if os.getenv("KUBERNETES_SERVICE_HOST", None):
- config.load_incluster_config()
- else:
- config.load_kube_config()
- v1 = client.CoreV1Api()
-
-
- def get_pod_status(item):
- phase = item.status.phase
-
- # check terminate time although phase is Running.
- if item.metadata.deletion_timestamp != None:
- return "Terminating"
-
- return phase
-
-
- def containers_all_ready(label_selector):
- def container_statuses_ready(item):
- container_statuses = item.status.container_statuses
-
- for status in container_statuses:
- if not status.ready:
- return False
- return True
-
- api_response = v1.list_namespaced_pod(
- namespace=NAMESPACE, pretty=True, label_selector=label_selector)
-
- for item in api_response.items:
- if not container_statuses_ready(item):
- return False
-
- return True
-
-
- def fetch_pods_info(label_selector, phase=None):
- api_response = v1.list_namespaced_pod(
- namespace=NAMESPACE, pretty=True, label_selector=label_selector)
- pod_list = []
- for item in api_response.items:
- if phase is not None and get_pod_status(item) != phase:
- continue
-
- pod_list.append(
- (item.status.phase, item.status.pod_ip, item.metadata.name))
- return pod_list
-
-
- def wait_pods_running(label_selector, desired):
- print("label selector: %s, desired: %s" % (label_selector, desired))
- while True:
- count = count_pods_by_phase(label_selector, 'Running')
- # NOTE: pods may be scaled.
- if count >= int(desired):
- break
- print('current cnt: %d sleep for 5 seconds...' % count)
- time.sleep(5)
-
-
- def wait_containers_ready(label_selector):
- print("label selector: %s, wait all containers ready" % (label_selector))
- while True:
- if containers_all_ready(label_selector):
- break
- print('not all containers ready, sleep for 5 seconds...')
- time.sleep(5)
-
-
- def count_pods_by_phase(label_selector, phase):
- pod_list = fetch_pods_info(label_selector, phase)
- return len(pod_list)
-
-
- def fetch_ips_list(label_selector, phase=None):
- pod_list = fetch_pods_info(label_selector, phase)
- ips = [item[1] for item in pod_list]
- ips.sort()
- return ips
-
-
- def fetch_name_list(label_selector, phase=None):
- pod_list = fetch_pods_info(label_selector, phase)
- names = [item[2] for item in pod_list]
- names.sort()
- return names
-
-
- def fetch_ips_string(label_selector, phase=None):
- ips = fetch_ips_list(label_selector, phase)
- return ",".join(ips)
-
-
- def fetch_endpoints_string(label_selector, port, phase=None, sameport=True):
- ips = fetch_ips_list(label_selector, phase)
- if sameport:
- ips = ["{0}:{1}".format(ip, port) for ip in ips]
- else:
- srcips = ips
- ips = []
- port = int(port)
- for ip in srcips:
- ips.append("{0}:{1}".format(ip, port))
- port = port + 1
- return ",".join(ips)
-
-
- def fetch_pod_id(label_selector, phase=None, byname=True):
- if byname:
- names = fetch_name_list(label_selector, phase=phase)
-
- local_name = os.getenv('POD_NAME')
- for i in xrange(len(names)):
- if names[i] == local_name:
- return i
-
- return None
- else:
- ips = fetch_ips_list(label_selector, phase=phase)
-
- local_ip = socket.gethostbyname(socket.gethostname())
- for i in xrange(len(ips)):
- if ips[i] == local_ip:
- return i
-
- # in minikube there can be one node only
- local_ip = os.getenv("POD_IP")
- for i in xrange(len(ips)):
- if ips[i] == local_ip:
- return i
-
- return None
-
-
- def fetch_ips(label_selector):
- return fetch_ips_string(label_selector, phase="Running")
-
-
- def fetch_endpoints(label_selector, port):
- return fetch_endpoints_string(
- label_selector, port=port, phase="Running", sameport=True)
-
-
- def fetch_id(label_selector):
- return fetch_pod_id(label_selector, phase="Running")
-
-
- if __name__ == "__main__":
- command = sys.argv[1]
- if command == "fetch_ips":
- print(fetch_ips(sys.argv[2]))
- if command == "fetch_ips_string":
- print(fetch_ips_string(sys.argv[2], sys.argv[3]))
- elif command == "fetch_endpoints":
- print(fetch_endpoints(sys.argv[2], sys.argv[3]))
- elif command == "fetch_id":
- print(fetch_id(sys.argv[2]))
- elif command == "count_pods_by_phase":
- print(count_pods_by_phase(sys.argv[2], sys.argv[3]))
- elif command == "wait_pods_running":
- wait_pods_running(sys.argv[2], sys.argv[3])
- elif command == "wait_containers_ready":
- wait_containers_ready(sys.argv[2])
|