Skip to main content

[Airflow] Change default sqlite to mysql database and manage services with systemd

· 6 min read

上一篇文章介绍了怎样在 CentOS7 上快速安装 airflow: [Airflow] Setup apache airflow on CentOS 7

一、使用 systemd 管理 airflow 服务

1、为 airflow 创建 user 和 group:

# useradd -U airflow

2、创建 pid 和 log 目录:

# mkdir -p /run/airflow
# chown airflow:airflow /run/airflow
# chmod 755 /run/airflow

# mkdir -p /var/log/airflow
# chown airflow:airflow /var/log/airflow
# chmod 755 /var/log/airflow

3、生成环境变量文件:

# cat <<EOF > /etc/sysconfig/airflow
AIRFLOW_CONFIG=/etc/airflow/airflow.cfg
AIRFLOW_HOME=/etc/airflow
EOF

4、把之前安装在~/airflow 目录下的 airflow 移动到/etc:

# mv ~/airflow /etc/

5、修改/etc/airflow/airflow.cfg

a. 修改 dags_folder, plugins_folder:

dags_folder = $AIRFLOW_HOME/dags
plugins_folder = $AIRFLOW_HOME/plugins

b. 修改各个 log 目录的路径:

base_log_folder = /var/log/airflow
dag_processor_manager_log_location = /var/log/airflow/dag_processor_manager/dag_processor_manager.log
child_process_log_directory = /var/log/airflow/scheduler

6、创建各个服务的 systemd 文件, 从 github airflow 代码库(https://github.com/apache/airflow/tree/master/scripts/systemd)找 systemd 文件模板, 创建各个服务的 systemd 文件, 注意修改各个文件的路径:

a. airflow webserver:

# cat <<EOF > /usr/lib/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver daemon
After=network.target
Wants=

[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true

[Install]
WantedBy=multi-user.target
EOF

b. airflow scheduler:

cat <<EOF > /usr/lib/systemd/system/airflow-scheduler.service
[Unit]
Description=Airflow scheduler daemon
After=network.target
Wants=

[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target
EOF

c. 其他...

二、使用 MySql 数据库

1、使用 charset "utf8mb4"和 collation "utf8mb4_general_ci"为 airflow 创建 MySql 数据库

2、安装 MySql for Python 的驱动 pymysql

# pip3 install pymysql

3、修改/etc/airflow/airflow.cfg

a. 修改 sql_alchemy_conn, 把默认的 sqlite 数据库修改为 MySql:

sql_alchemy_conn = mysql+pymysql://{username}:{password}@{hostname}:3306/airflow

格式:{数据库类型}+{驱动}://{用户名}:{密码}@{MySql 服务器地址}:{端口}/{数据库名}, 更多信息参见 SqlAlchemy 文档: https://docs.sqlalchemy.org/

b. 修改 executor 为 LocalExecutor:

executor = LocalExecutor

c. 初始化 MySql 数据库:

# airflow initdb

三、启动 webserver, scheduler 等服务

# systemctl enable airflow-webserver && systemctl start airflow-webserver
# systemctl enable airflow-scheduler && systemctl start airflow-scheduler

四、其他

检查/var/log/messages 查看各服务的状态,发现 scheduler 有奇怪错误:

Oct 31 05:56:35 build-node airflow: Traceback (most recent call last):
Oct 31 05:56:35 build-node airflow: File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
Oct 31 05:56:35 build-node airflow: self.run()
Oct 31 05:56:35 build-node airflow: File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run
Oct 31 05:56:35 build-node airflow: self._target(*self._args, **self._kwargs)
Oct 31 05:56:35 build-node airflow: File "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 128, in _run_file_processor
Oct 31 05:56:35 build-node airflow: set_context(log, file_path)
Oct 31 05:56:35 build-node airflow: File "/usr/local/lib/python3.6/site-packages/airflow/utils/log/logging_mixin.py", line 170, in set_context
Oct 31 05:56:35 build-node airflow: handler.set_context(value)
Oct 31 05:56:35 build-node airflow: File "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_processor_handler.py", line 65, in set_context
Oct 31 05:56:35 build-node airflow: local_loc = self._init_file(filename)
Oct 31 05:56:35 build-node airflow: File "/usr/local/lib/python3.6/site-packages/airflow/utils/log/file_processor_handler.py", line 141, in _init_file
Oct 31 05:56:35 build-node airflow: os.makedirs(directory)
Oct 31 05:56:35 build-node airflow: File "/usr/lib64/python3.6/os.py", line 210, in makedirs
Oct 31 05:56:35 build-node airflow: makedirs(head, mode, exist_ok)
Oct 31 05:56:35 build-node airflow: File "/usr/lib64/python3.6/os.py", line 210, in makedirs
Oct 31 05:56:35 build-node airflow: makedirs(head, mode, exist_ok)
Oct 31 05:56:35 build-node airflow: File "/usr/lib64/python3.6/os.py", line 210, in makedirs
Oct 31 05:56:35 build-node airflow: makedirs(head, mode, exist_ok)
Oct 31 05:56:35 build-node airflow: [Previous line repeated 3 more times]
Oct 31 05:56:35 build-node airflow: File "/usr/lib64/python3.6/os.py", line 220, in makedirs
Oct 31 05:56:35 build-node airflow: mkdir(name, mode)
Oct 31 05:56:35 build-node airflow: PermissionError: [Errno 13] Permission denied: '/var/log/airflow/scheduler/2019-10-31/../../../usr'

airflow scheduler 尝试在/var/log/目录下创建目录, 用户 airflow 没有权限, 所以出现 PermissionError, 如果在/var/log/目录下创建 usr 目录并把 owner 分配给 airflow, 在目录 "/var/log/airflow/scheduler/2019-10-31/../../../usr/local/lib/python3.6/site-packages/airflow/example_dags/"会产生很多 log 文件:

# ls -la /var/log/airflow/scheduler/2019-10-31/../../../usr/local/lib/python3.6/site-packages/airflow/example_dags/
total 2212
drwxr-xr-x. 3 airflow airflow 4096 Oct 31 06:04 .
drwxr-xr-x. 3 airflow airflow 26 Oct 31 06:04 ..
-rw-r--r--. 1 airflow airflow 90610 Oct 31 06:18 docker_copy_data.py.log
-rw-r--r--. 1 airflow airflow 93636 Oct 31 06:18 example_bash_operator.py.log
-rw-r--r--. 1 airflow airflow 95777 Oct 31 06:18 example_branch_operator.py.log
-rw-r--r--. 1 airflow airflow 50840 Oct 31 06:18 example_branch_python_dop_operator_3.py.log
-rw-r--r--. 1 airflow airflow 93480 Oct 31 06:18 example_docker_operator.py.log
-rw-r--r--. 1 airflow airflow 94792 Oct 31 06:18 example_http_operator.py.log
-rw-r--r--. 1 airflow airflow 93152 Oct 31 06:18 example_latest_only.py.log
-rw-r--r--. 1 airflow airflow 98334 Oct 31 06:18 example_latest_only_with_trigger.py.log
-rw-r--r--. 1 airflow airflow 103648 Oct 31 06:18 example_passing_params_via_test_command.py.log
-rw-r--r--. 1 airflow airflow 93150 Oct 31 06:18 example_pig_operator.py.log
-rw-r--r--. 1 airflow airflow 67744 Oct 31 06:18 example_python_operator.py.log
-rw-r--r--. 1 airflow airflow 49610 Oct 31 06:18 example_short_circuit_operator.py.log
-rw-r--r--. 1 airflow airflow 92332 Oct 31 06:18 example_skip_dag.py.log
-rw-r--r--. 1 airflow airflow 101844 Oct 31 06:18 example_subdag_operator.py.log
-rw-r--r--. 1 airflow airflow 99220 Oct 31 06:18 example_trigger_controller_dag.py.log
-rw-r--r--. 1 airflow airflow 97252 Oct 31 06:18 example_trigger_target_dag.py.log
-rw-r--r--. 1 airflow airflow 90364 Oct 31 06:18 example_xcom.py.log
drwxr-xr-x. 2 airflow airflow 27 Oct 31 06:04 subdags
-rw-r--r--. 1 airflow airflow 55590 Oct 31 06:18 test_utils.py.log
-rw-r--r--. 1 airflow airflow 86240 Oct 31 06:18 tutorial.py.log

绝对路径是: "/var/log/usr/local/lib/python3.6/site-packages/airflow/example_dags/", 搞不懂 airflow scheduler 为什么没有使用 airflow.cfg 中的 log 目录配置,而是使用一个相对路径. 应该是 scheduler 的 bug, 有人已经在 Airflow JIRA 中提了 issue, 我也把遇到的问题写在了 Comment 里, 见: https://issues.apache.org/jira/browse/AIRFLOW-4719

[Kubernetes] Create deployment, service by Python client

· 2 min read

Install Kubernetes Python Client and PyYaml:

# pip install kubernetes pyyaml

1. Get Namespaces or Pods by CoreV1Api:

# -*- coding: utf-8 -*-
from kubernetes import client, config, utils

config.kube_config.load_kube_config(config_file="../kubecfg.yaml")
coreV1Api = client.CoreV1Api()

print("\nListing all namespaces")
for ns in coreV1Api.list_namespace().items:
print(ns.metadata.name)

print("\nListing pods with their IP, namespace, names:")
for pod in coreV1Api.list_pod_for_all_namespaces(watch=False).items:
print("%s\t\t%s\t%s" % (pod.status.pod_ip, pod.metadata.namespace, pod.metadata.name))

2. Create Deployment and Service by AppsV1Api:

# -*- coding: utf-8 -*-
from kubernetes import client, config, utils
import yaml

config.kube_config.load_kube_config(config_file="../kubecfg.yaml")
yamlDeploy = open( r'deploy.yaml')
jsonDeploy = yaml.load(yamlDeploy, Loader = yaml.FullLoader)

yamlService = open(r'service.yaml')
jsonService = yaml.load(yamlService, Loader = yaml.FullLoader)

appsV1Api = client.AppsV1Api()

if jsonDeploy['kind'] == 'Deployment':
appsV1Api.create_namespaced_deployment(
namespace="default", body = jsonDeploy
)

if jsonService['kind'] == 'Service':
coreV1Api.create_namespaced_service(
namespace="default",
body=jsonService
)

3. Create ANY type of objects from a yaml file by utils.create_from_yaml, you can put multiple resources in one yaml file:

# -*- coding: utf-8 -*-
from kubernetes import client, config, utils

config.kube_config.load_kube_config(config_file="../kubecfg.yaml")

k8sClient = client.ApiClient()
utils.create_from_yaml(k8sClient, "deploy-service.yaml")

Reference: https://github.com/kubernetes-client/python/blob/6709b753b4ad2e09aa472b6452bbad9f96e264e3/examples/create_deployment_from_yaml.py https://stackoverflow.com/questions/56673919/kubernetes-python-api-client-execute-full-yaml-file

ClustrMaps