1. 安装
- 使用 wingetUI 安装 k8s 包
- 安装 docker desktop 开启 k8s
2. 编写代码
2.1 pod中的计算代码
代码语言:javascript复制# _*_ coding: utf-8 _*_
# @Time : 2023/12/03 14:44
# @Author : Michael
# @File : process_task.py
# @desc :
import time
def process_task(task):
# 根据任务进行计算
time.sleep(5)
result = f"fake to process task {task}, finish!"
# 返回计算结果
print(result)
return result
if __name__ == '__main__':
process_task(0)
2.2 主程序代码
代码语言:javascript复制# _*_ coding: utf-8 _*_
# @Time : 2023/12/2 8:23
# @Author : Michael
# @File : demo1.py
# @desc :
from kubernetes import client, config
import time
import multiprocessing
def create_pod(api_instance, name, image, command):
"""
创建一个Pod
"""
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": name},
"spec": {
"containers": [{
"name": name,
"image": image,
"command": command
}]
}
}
api_instance.create_namespaced_pod(body=pod_manifest, namespace="default")
def get_pod_logs(api_instance, pod_name):
"""
获取Pod的日志
"""
log = api_instance.read_namespaced_pod_log(name=pod_name, namespace="default")
return log
def wait_for_pod_to_be_ready(api_instance, pod_name, timeout=300):
"""
等待Pod变为就绪状态
"""
start_time = time.time()
while True:
pod = api_instance.read_namespaced_pod(name=pod_name, namespace="default")
if pod.status.phase == "Running" and all([condition.status == "True" for condition in pod.status.conditions if condition.type == "Ready"]):
print(f"Pod {pod_name} is ready")
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Timed out waiting for POD {pod_name} to be ready")
time.sleep(1) # 每5秒检查一次
def delete_pod(api_instance, pod_name):
"""
删除一个Pod
"""
api_instance.delete_namespaced_pod(name=pod_name, namespace="default", body=client.V1DeleteOptions())
def do_task(name):
# 加载kubeconfig配置
config.load_kube_config()
# 创建一个Kubernetes client实例
api_instance = client.CoreV1Api()
# 创建Pod
create_pod(api_instance, name, "michael23o/myapp:latest", [
"python",
"-c",
f"import process_task; process_task.process_task('{name}')",
])
# 等待Pod变为就绪状态
wait_for_pod_to_be_ready(api_instance, name)
# 等待一段时间以便Pod生成日志
time.sleep(5)
# 等待Pod运行并获取输出
logs = get_pod_logs(api_instance, name) # 实际应该存入数据库,在其他地方进行查询
# 清理Pod
delete_pod(api_instance, name)
return logs
if __name__ == '__main__':
pod_names = ["pod1", "pod2", "pod3"]
pool = multiprocessing.Pool(processes=3)
results = [pool.apply_async(do_task, args=(pod_name,)) for pod_name in pod_names]
pool.close()
pool.join()
results = [result.get() for result in results]
print("汇总结果:")
for result in results:
print(f'--{result}--')
3. 打包docker镜像
编写一个Dockerfile 如下,把代码打包进去
代码语言:javascript复制FROM bitnami/python:3.11.6
COPY . /app
WORKDIR /app
RUN pip install --no-cache-dir -r requirements.txt
EXPOSE 8080
CMD ["python", "-c", "import process_task; process_task.process_task('task0')"]
创建镜像
代码语言:javascript复制>docker build -t myapp .
[ ] Building 55.2s (9/9) FINISHED docker:default
=> [internal] load .dockerignore 0.6s
=> => transferring context: 2B 0.0s
=> [internal] load build definition from Dockerfile 1.0s
=> => transferring dockerfile: 244B 0.0s
=> [internal] load metadata for docker.io/bitnami/py 39.7s
=> [internal] load build context 0.5s
=> => transferring context: 3.82kB 0.0s
=> CACHED [1/4] FROM docker.io/bitnami/python:3.11.6@ 0.0s
=> [2/4] COPY . /app 1.3s
=> [3/4] WORKDIR /app 1.9s
=> [4/4] RUN pip install --no-cache-dir -r requiremen 4.7s
=> exporting to image 3.1s
=> => exporting layers 2.8s
=> => writing image sha256:a67a18412908d5cbd2660c9889 0.1s
=> => naming to docker.io/library/myapp 0.2s
What's Next?
View a summary of image vulnerabilities and recommendations → docker scout quickview
将制作好镜像推向云端,要将Docker镜像推送到云端,你需要遵循以下步骤:
- 创建一个Docker Hub账户:首先,你需要在Docker Hub上创建一个免费账户,以便在云端存储和管理你的Docker镜像。
- 登录Docker CLI:在命令行中使用
docker login
命令登录到你的Docker Hub账户。这将提示你输入用户名和密码。
docker login
- 为镜像打标签:在推送镜像到Docker Hub之前,你需要为镜像指定一个仓库名称(通常与你的Docker Hub用户名相同)以及一个标签。可以使用
docker tag
命令为镜像打标签。例如,如果你的Docker Hub用户名是yourusername
,镜像名称是myimage
,标签是latest
:
docker tag myimage yourusername/myimage:latest
- 推送镜像:现在你可以使用
docker push
命令将镜像推送到Docker Hub。在本例中,我们将推送yourusername/myimage:latest
镜像:
docker push yourusername/myimage:latest
- 验证镜像:登录到你的Docker Hub账户,然后转到“Repositories”部分以验证你的镜像是否已成功推送。
现在,你已经将Docker镜像推送到云端。你可以在其他计算机或服务器上使用docker pull
命令拉取此镜像,并在Kubernetes集群中使用它。
4. 测试
运行 主程序demo1.py
, 同时用 kubectl get pods
命令观察 pod 状态
C:Usersxx>kubectl get pods
No resources found in default namespace.
C:Usersxx>kubectl get pods
NAME READY STATUS RESTARTS AGE
pod1 0/1 ContainerCreating 0 1s
pod2 0/1 Pending 0 1s
pod3 0/1 Pending 0 1s
可以看见有3个 pod 被创建,并开始执行镜像内部的计算程序
运行结果如下:
代码语言:javascript复制D:ProgramDataAnaconda3envscvpython.exe D:gitcodePython_learningk8sdemo1demo1.py
Pod pod2 is ready
Pod pod3 is ready
Pod pod1 is ready
汇总结果:
--fake to process task pod1, finish!
--
--fake to process task pod2, finish!
--
--fake to process task pod3, finish!
--
Process finished with exit code 0
其中pod
计算结果的返回应该通过数据库或者别的方式存储。这里示例只是读取 pod 日志来获取。
这样可以解决单机资源无法满足的大规模的计算任务,并且可以以分布式的方式并行,提高计算速度