Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 

README.md

APISIX源代码阅读

apisix主要是lua脚本跟openresty(或者说nginx)的组合, 流量具体转发由nginx承载, 但是按照什么规则转发用lua脚本定义. apisix在nginx之上封装了非常多功能强大有用的特性, 提供丰富的流量管理功能,比如态调整upstream, 灰度发布, 流量熔断, 认证, 观测性等。

代码可分为两个部分

  • 启动前

    启动前主要是检查环境是否符合要求(比如openresty版本, luajit版本), 加载配置文件, 初始化数据中心, 渲染nginx.conf配置文件

  • 启动后

    启动后主要是个组件的初始化(比如router, services, upstream等对象), 基于这些组件接受客户端的请求,这些请求可以大致可分为两种类型,一是配置路由, 服务, 上游等数据,即配置转发规则,二是基于已有的规则转发请求。

代码讲解大部分放在代码中的上下行

启动前

要启动apisix不能直接使用openresty -p /path/to/prefix -c /path/to/conf这样的命令,apisix为启动编写专门的启动脚本, 即apisix start, 启动前的所有操作全部已经涵盖在这个命令里了。

如果你不想直接启动,可以先初始化试试,又或者将启动命令分解成以下三步。

初始化配置文件。

/usr/bin/apisix init 

初始化etcd

/usr/bin/apisix init_etcd

启动openresty

/usr/local/openresty/bin/openresty -p /usr/local/apisix -g 'daemon off;'

启动脚本

apisix的启动脚本逻辑比较简单就是找到相应的lua解释器(lua或者luajit)然后调用apisix.lua

源代码简化如下:

APISIX_LUA=/usr/local/apisix/apisix/cli/apisix.lua

# 寻找openresty, lua命令
OR_BIN=$(which openresty || exit 1)
OR_EXEC=${OR_BIN:-'/usr/local/openresty-debug/bin/openresty'}
OR_VER=$(openresty -v 2>&1 | awk -F '/' '{print $2}' | awk -F '.' '{print $1"."$2}')
LUA_VERSION=$(lua -v 2>&1| grep -E -o  "Lua [0-9]+.[0-9]+")

# 判断环境决定使用lua还是luajit。
if [[ -e $OR_EXEC && "$OR_VER" =~ "1.19" ]]; then
    # OpenResty version is 1.19, use luajit by default
    # find the luajit binary of openresty
    LUAJIT_BIN=$(${OR_EXEC} -V 2>&1 | grep prefix | grep -Eo 'prefix=(.*)/nginx\s+--' | grep -Eo '/.*/')luajit/bin/luajit

    # use the luajit of openresty
    echo "$LUAJIT_BIN $APISIX_LUA $*"
    exec $LUAJIT_BIN $APISIX_LUA $*
elif [[ "$LUA_VERSION" =~ "Lua 5.1" ]]; then
    # OpenResty version is not 1.19, use Lua 5.1 by default
    echo "lua $open $*"
    exec lua $APISIX_LUA $*
else
fi

其中openresty1.19版本以上默认使用luajit, 否则使用lua原生解释器,在找到lua解释器之后就执行apisix.lua脚本。

而apisix.lua代码如下

local pkg_cpath_org = package.cpath
local pkg_path_org = package.path

local apisix_home = "/usr/local/apisix"
-- 代码依赖的路径注入, 其中包括yaml, etcd, radixtree等依赖库
local pkg_cpath = apisix_home .. "/deps/lib64/lua/5.1/?.so;"
                  .. apisix_home .. "/deps/lib/lua/5.1/?.so;"
local pkg_path = apisix_home .. "/deps/share/lua/5.1/?.lua;"
package.cpath = pkg_cpath .. pkg_cpath_org
package.path  = pkg_path .. pkg_path_org


local env = require("apisix.cli.env")(apisix_home, pkg_cpath_org, pkg_path_org)
-- 获取apisix家目录, 当前目录是否是root目录, openresty启动参数, 依赖库路径, 最小etcd版本, ulimit参数(ulimit -n返回值)
local ops = require("apisix.cli.ops")

-- 启动入口
ops.execute(env, arg)

这部分代码主要是获取一些基本的信息, apisix家目录, 当前目录是否是root目录, openresty启动参数, 依赖库路径, 最小etcd版本, ulimit参数(ulimit -n返回值)等数据。

通过这些参数就可以进入启动流程了。

ops.lua的源代码如下:

-- apisix支持的命令行
local action = {
    help = help,
    version = version,
    init = init,
    init_etcd = etcd.init,
    start = start,
    stop = stop,
    quit = quit,
    restart = restart,
    reload = reload,
    test = test,
}

-- 通过table找到对应的命令, start自然对应的是start
function _M.execute(env, arg)
    local cmd_action = arg[1]
    action[cmd_action](env, arg[2])
end


local function start(env, ...)
    -- 因为apisix的工作进程以nobody权限启动,所以不能访问/root目录, 所以禁止在/root目录启动
    if env.is_root_path then
        util.die("Error: It is forbidden to run APISIX in the /root directory.\n")
    end

    -- 创建日志目录
    local cmd_logs = "mkdir -p " .. env.apisix_home .. "/logs"
    util.execute_cmd(cmd_logs)

    -- 检查是否正在运行
    local pid_path = env.apisix_home .. "/logs/nginx.pid"
    local pid = util.read_file(pid_path)
    pid = tonumber(pid)
    if pid then
        local lsof_cmd = "lsof -p " .. pid
        local res, err = util.execute_cmd(lsof_cmd)
        if not (res and res == "") then
            if not res then
                print(err)
            else
                print("APISIX is running...")
            end

            return
        end
    end

    -- 初始化环境
    -- 检查端口是否可用,配置文件参数是否合法,生成nginx.conf文件等
    init(env)
    -- 检测与etcd的连通性以及根据创建必要的key
    init_etcd(env, args)
    
    -- 最终执行/usr/local/openresty/bin/openresty -p /usr/local/apisix -g 'daemon off;'
    util.execute_cmd(env.openresty_args)
end

ops.execute调用链如下:

ops.execute -> ops.start -> ops.init -> ops.init_etcd -> util.execute_cmd。

其中execute_cmd的函数功能比较简单,就是指定对应的命令,如果有错误就读取错误输出并返回

local function execute_cmd(cmd)
    -- 调用命令
    local t, err = popen(cmd)
    if not t then
        return nil, "failed to execute command: "
                    .. cmd .. ", error info: " .. err
    end
    local data, err = t:read("*all")
    t:close()
    return data
end

启动前的逻辑并不是太复杂,代码之所多是因为做了很多的参数检查,环境检查,这是因为apisix功能丰富导致的必然结果,只要不深入各个函数,整体代码结构还是比较清晰。

在进入代码启动后的段落前得看看apisix渲染的nginx.confg配置文件是什么样的,它的模板文件在apisix\cli\ngx_tpl.lua

nginx.conf配置文件简化如下:

http {
    
	# 各种数据容器初始化
    lua_shared_dict internal_status      10m;
	# ....

	# 负载均衡入口
    upstream apisix_backend {
        server 0.0.0.1;
        balancer_by_lua_block {
            # 负载均衡解析逻辑在这
            apisix.http_balancer_phase()
        }
    }

    # openresty master进程初始化
    init_by_lua_block {
        require "resty.core"
        apisix = require("apisix")

        local dns_resolver = { "127.0.0.11", }
        local args = {
            dns_resolver = dns_resolver,
        }
        # 主要初始化过程在这
        apisix.http_init(args)
    }
	
    # openresty worker进程时初始化
    init_worker_by_lua_block {
        # 初始化工作进程
        apisix.http_init_worker()
    }

    ## 默认http/https监听端口
    server {
        listen 9080 default_server reuseport;
        listen 9443 ssl default_server http2 reuseport;
        
        server_name _;
		# 管理接口,用于路由等对象的增删改查
        location /apisix/admin {
                allow 0.0.0.0/0;
                deny all;
            content_by_lua_block {
                apisix.http_admin()
            }
        }
		
        # ssl握手也有lua脚本处理
        ssl_certificate_by_lua_block {
            apisix.http_ssl_phase()
        }

        location / {
            # 在balancer_by_lua_block之前调用
            access_by_lua_block {
                apisix.http_access_phase()
            }

            # 流量代理
            proxy_pass      $upstream_scheme://apisix_backend$upstream_uri;

            # 过滤http头信息
            header_filter_by_lua_block {
                apisix.http_header_filter_phase()
            }
			
            # 过滤http请求体信息
            body_filter_by_lua_block {
                apisix.http_body_filter_phase()
            }
			# 日志收尾阶段
            log_by_lua_block {
                apisix.http_log_phase()
            }
        }
    }
}

如果想搞清整个流量的转发流程就得看看下面这张图

1661911163550

根据上图,可以知道openresty启动是会调用init_by_lua_blockinit_worker_by_lua_block对应的lua代码。

而http流量跟https流量的主要区别在于是否要处理ssl证书,所以http请求的流量流向如下:

access_by_lua_block -> balancer_by_lua_block -> header_filter_by_lua_block -> body_filter_by_lua_block -> log_by_lua_block

而https的流量的不同在于多了ssl证书的处理,流量流向如下:

ssl_certificate_by_lua_block -> access_by_lua_block -> balancer_by_lua_block -> header_filter_by_lua_block -> body_filter_by_lua_block -> log_by_lua_block

小结

apisix的启动流程不是很复杂,但是代码不少,这是因为apisix的配置项非常多,所以参数校验占了很大的一部分,所以在阅读代码时不要过于纠结各个参数的细节,即使有些参数看懂也没关系,当大体流程搞懂之后就可以深入细节,看看细节部分的具体实现。

启动后

openresty启动后就开始根据nginx.conf配置文件加载相应的lua代码。

根据配置文件不难知道初始化由下面两个部分配置组成。

init_by_lua_block {
    require "resty.core"
    apisix = require("apisix")

    local dns_resolver = { "127.0.0.1"}
    local args = {
        dns_resolver = dns_resolver,
    }
    apisix.http_init(args)
}

init_worker_by_lua_block {
    apisix.http_init_worker()
}

初始化

初始化主要分为两个部分,一是master进程的初始化,二是worker进程的初始化。

master进程初始化

apisix.http_init(args)的代码在apisix\init.lua

function _M.http_init(args)
    -- 设置dns服务器
    core.resolver.int_resolver(args)
    -- 设置实例id
    core.id.init()

	-- 启用特权进程
    local process = require("ngx.process")
    local ok, err = process.enable_privileged_agent()
    if not ok then
        core.log.error("failed to enable privileged_agent: ", err)
    end

    -- 检查配置中心是否可以正常工作, 默认是etcd
    if core.config.init then
        local ok, err = core.config.init()
        if not ok then
            core.log.error("failed to load the configuration: ", err)
        end
    end
end

这部分的初始化并不复杂。

其中特权进程是为了后续代码中判断是否在进程的类型,master或worker。

worker进程初始化

apisix的所有对象的初始化都在这一部分,初始化的对象非常多,本文主要聚焦在路由,负载均衡等部分的初始化,其他如services, upstream等暂不涉及。

代码简化如下:

function _M.http_init_worker()
    -- 啥都不做
    require("apisix.balancer").init_worker()
    load_balancer = require("apisix.balancer")
    -- 初始化管理接口的路由及响应函数
    require("apisix.admin.init").init_worker()
    -- 创建http,ssl路由对象,用来路由客户端请求,路由规则来源于上面的管理接口
    router.http_init_worker()
end

loadbalancer对象在后面章节再介绍,这里暂时略过,这里着重看路由对象的初始化过程。

管理接口路由

require("apisix.admin.init").init_worker()的代码简化如下:

local resources = {
    -- 几乎每个对象都实现了"GET", "PUT", "POST", "DELETE", "PATCH"五个方法
    routes          = require("apisix.admin.routes"),
}

local function run()
    local api_ctx = {}
    core.ctx.set_vars_meta(api_ctx)
    ngx.ctx.api_ctx = api_ctx

    local uri_segs = core.utils.split_uri(ngx.var.uri)
    -- /apisix/admin/routes分割后如下
    -- {"", "apisix", "admin", "route"}
    local seg_res, seg_id = uri_segs[4], uri_segs[5]
    local seg_sub_path = core.table.concat(uri_segs, "/", 6)
	-- 找到对应的资源对象
    local resource = resources[seg_res]
    local method = str_lower(get_method())
	-- 获取请求体, 如果有数据就JSON反序列化
    local req_body, err = core.request.get_body(MAX_REQ_BODY)
    if req_body then
        local data, err = core.json.decode(req_body)
        req_body = data
    end

    -- 直接调用对应的响应函数然后返回
    local code, data = resource[method](seg_id, req_body, seg_sub_path,
                                        uri_args)
    if code then
        data = strip_etcd_resp(data)
        core.response.exit(code, data)
    end
end

-- 路由列表
local uri_route = {
    {
        paths = [[/apisix/admin/*]],
        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
        handler = run,
    },
}

function _M.init_worker()
    router = route.new(uri_route)
end

注意lua的table对象的索引从1开始。

这里以官方的请求为例

curl "http://127.0.0.1:9080/apisix/admin/routes/1" -H "X-API-KEY: edd1c9f034335f136f87ad84b625c8f1" -X PUT -d '
{
  "methods": ["GET"],
  "host": "example.com",
  "uri": "/anything/*",
  "upstream": {
    "type": "roundrobin",
    "nodes": {
      "httpbin.org:80": 1
    }
  }
}'

所以对应的响应函数是require("apisix.admin.routes")["PUT"]

代码如下:

function _M.put(id, conf, sub_path, args)
    -- conf就是请求体的json对象
    -- 这里主要检查关联的plugins, upstream, services等对象是否存在, 如果关联了的话
    local id, err = check_conf(id, conf, true)

    -- 如果路由id已对应一个路由对象,那么在当前的配置文件中注入必要的配置信息,比如时间戳
    local key = "/routes/" .. id
    local ok, err = utils.inject_conf_with_prev_conf("route", key, conf)
	-- 将数据持久化到etcd
    local res, err = core.etcd.set(key, conf, args.ttl)
    return res.status, res.body
end

路由的创建还是比较简单的,主要流程就是检查配置文件是否合法,然后注入已存在的路由信息(如果存在的话),然后就是保存数据到etcd。

至此,我们可以通过管理接口增删改查路由对象以控制流量转发规则。

数据转发路由

router.http_init_worker()的代码简化如下:

-- 检查router对象是否有满足必要的接口
-- routes方法返回路由列表
-- init_worker用于初始化路由
local function attach_http_router_common_methods(http_router)
    if http_router.routes == nil then
        http_router.routes = function ()
            if not http_router.user_routes then
                return nil, nil
            end

            local user_routes = http_router.user_routes
            return user_routes.values, user_routes.conf_version
        end
    end

    if http_router.init_worker == nil then
        http_router.init_worker = function (filter)
            -- http的路由对象就是在这里初始化
            http_router.user_routes = http_route.init_worker(filter)
        end
    end
end

function _M.http_init_worker()
    -- 配置配置文件 config.yaml
    local conf = core.config.local_conf()
    local router_http_name = "radixtree_uri"
    local router_ssl_name = "radixtree_sni"

    -- 加载http(s)请求的路由并并初始化
    local router_http = require("apisix.http.router." .. router_http_name)
    attach_http_router_common_methods(router_http)
    router_http.init_worker(filter)
    _M.router_http = router_http

    local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
    router_ssl.init_worker()
    _M.router_ssl = router_ssl
	-- 没太看懂这个router用来干啥的
    _M.api = require("apisix.api_router")
end

这一部分就是创建router对象用于后续的流量转发,首先看看router_http, 根据上面的代码可以知道init_worker就是http_route.init_worker.

-- apisix\http\route.lua
function _M.init_worker(filter)
    local user_routes, err = core.config.new("/routes", {
            automatic = true,
            item_schema = core.schema.route,
            checker = check_route,
            filter = filter,
        })
    if not user_routes then
        error("failed to create etcd instance for fetching /routes : " .. err)
    end

    return user_routes
end

http的init_worker的主要功能就是将一个映射etcd路由列表数据的对象(user_routes)暴露出来,这个对象会不断的同步ectd的数据(/apisix/routes/*)到这个对象,这样,通过管理接口对路由的增删改查就能保持路由的动态更新,其他诸如services, upstream大体一直。

core.config.new("/routes"的代码在apisix\core\config_etcd.lua

代码如下:

local function sync_data(self)
   -- 这函数超级长, 大体逻辑就是将数据合并到self里面去
end

local function _automatic_fetch(premature, self)
    local i = 0
    -- 一个用于同步的循环, 默认每个循环最多32次,
    while not exiting() and self.running and i <= 32 do
        i = i + 1

        local ok, err = xpcall(function()
            if not self.etcd_cli then
                local etcd_cli, err = get_etcd()
                self.etcd_cli = etcd_cli
            end

            local ok, err = sync_data(self)
	-- 如果没有结束就继续递归的调用,再次循环
    if not exiting() and self.running then
        ngx_timer_at(0, _automatic_fetch, self)
    end
end

function _M.new(key, opts)
    local local_conf, err = config_local.local_conf()
    local etcd_conf = local_conf.etcd
    -- 存在etcd中的键值前缀/apisix
    local prefix = etcd_conf.prefix
    -- 同步时间间隔,默认5s
    local resync_delay = etcd_conf.resync_delay
    if not resync_delay or resync_delay < 0 then
        resync_delay = 5
    end
    -- etcd健康检查超时时间
    local health_check_timeout = etcd_conf.health_check_timeout
    if not health_check_timeout or health_check_timeout < 0 then
        health_check_timeout = 10
    end
	
    -- lua的对象继承写法
    local obj = setmetatable({
        etcd_cli = nil,
        key = key and prefix .. key,
		-- 各种参数...
    }, mt)

    if automatic then
        -- 同步etcd数据到obj对象的逻辑在这启动
        ngx_timer_at(0, _automatic_fetch, obj)
    end

    -- 将obj对象的应用放在本模块的created_obj对象中, 这样后续就可以获取已创建的obj对象了,
    if key then
        created_obj[key] = obj
    end

    return obj
end

至此路由规则的同步就完成了,apisix由此可以动态的增删改查路由规则。

小结

初始化的过程会比较复杂,这主要是因为组件比较多,但是每个组件的初始化其实有迹可循,首先是初始化模块里的局部变量,然后创建一个可以不断同步etcd数据的对象,然后创建必要的对象用于后续处理请求。

其中同步的逻辑会稍微复杂一些,再就是路由的创建也多了好几层抽象,但是初始化过程中并没有马上创建可以分发流量的路由对象,这是因为路由对象是在匹配中创建的,但是,也是在数据有变动的情况下才会再次创建,这部分在后面会提到。

路由转发

这里再次回顾一下路由转发设计的lua代码

access_by_lua_block -> balancer_by_lua_block -> header_filter_by_lua_block -> body_filter_by_lua_block -> log_by_lua_block

access_by_lua_block的代码如下:

function _M.http_access_phase()
    local ngx_ctx = ngx.ctx

    -- always fetch table from the table pool, we don't need a reused api_ctx
    local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
    ngx_ctx.api_ctx = api_ctx

    core.ctx.set_vars_meta(api_ctx)
	-- 路由匹配
    router.router_http.match(api_ctx)
    local route = api_ctx.matched_route
    -- 略过插件过滤及关联服务查询等逻辑
    
    -- 这里主要是检查相关参数
    -- 设置检查上游状态的checker对象(用于检查后端状态)
    -- 以及设置schema
    local code, err = set_upstream(route, api_ctx)
    -- 选择一个后端
    local server, err = load_balancer.pick_server(route, api_ctx)
	-- 后续的流量就会转发到这个选择的picked_server
    api_ctx.picked_server = server
	-- 设置必要的http 头信息
    set_upstream_headers(api_ctx, server)
end

这段代码的主要重点在于路由匹配及后端选取。

首先看看路由匹配,代码如下:

-- apisix\http\router\radixtree_uri.lua    
	local uri_routes = {}
    local uri_router
    local match_opts = {}
function _M.match(api_ctx)
    local user_routes = _M.user_routes
    local _, service_version = get_services()
    -- 判断是否需要重新创建路由对象
    if not cached_router_version or cached_router_version ~= user_routes.conf_version
        or not cached_service_version or cached_service_version ~= service_version
    then
        uri_router = base_router.create_radixtree_uri_router(user_routes.values,
                                                             uri_routes, false)
        cached_router_version = user_routes.conf_version
        cached_service_version = service_version
    end

    return base_router.match_uri(uri_router, match_opts, api_ctx)
end

router_http的匹配方法业务逻辑不多,重头戏放在了上一层目录的route.lua, 核心逻辑在base_router.

代码简化如下:

function _M.create_radixtree_uri_router(routes, uri_routes, with_parameter)
    routes = routes or {}

    core.table.clear(uri_routes)

    for _, route in ipairs(routes) do
        if type(route) == "table" then
            local status = core.table.try_read_attr(route, "value", "status")
            -- check the status
            if status and status == 0 then
                -- 用于lua没有continue关键字只能用这种蹩脚的方式。。。。。
                goto CONTINUE
            end

            local filter_fun, err

            local hosts = route.value.hosts or route.value.host
			-- 这段代码的核心逻辑就是基于route创建符合radixtree接口的对象
            core.table.insert(uri_routes, {
                paths = route.value.uris or route.value.uri,
                methods = route.value.methods,
                priority = route.value.priority,
                hosts = hosts,
                remote_addrs = route.value.remote_addrs
                               or route.value.remote_addr,
                vars = route.value.vars,
                filter_fun = filter_fun,
                -- 当底层对象匹配成功后就会调用这个函数
                -- 这个函数的功能就是设置匹配的路由
                handler = function (api_ctx, match_opts)
                    api_ctx.matched_params = nil
                    api_ctx.matched_route = route
                    api_ctx.curr_req_matched = match_opts.matched
                end
            })

            ::CONTINUE::
        end
    end

    if with_parameter then
        return radixtree.new(uri_routes)
    else
        -- 最终也是调用的resty.radixtree
        return router.new(uri_routes)
    end
end

但是最终干活的路由对象是radixtree模块, 关于它的接口可以查看: https://github.com/api7/lua-resty-radixtree

从这一段代码可知,路由在匹配成功之后就会设置matched_route对象,这样后续的流程可以继续,也基于此可以选择对应的后端。

然后再来看看load_balancer.pick_server(route, api_ctx)的代码

-- apisix\balancer.lua
local function pick_server(route, ctx)
    local up_conf = ctx.upstream_conf

    local nodes_count = #up_conf.nodes
    -- 如果只有一个后端,就直接返回了
    if nodes_count == 1 then
        local node = up_conf.nodes[1]
        ctx.balancer_ip = node.host
        ctx.balancer_port = node.port
        return node
    end

    local version = ctx.upstream_version
    local key = ctx.upstream_key
    local checker = ctx.up_checker

    ctx.balancer_try_count = (ctx.balancer_try_count or 0) + 1

    -- 由于可能出现重试的情况,所以整个请求中使用同一个picker, 所以这里做了缓存
    local server_picker = ctx.server_picker
    if not server_picker then
        -- 这里的create_server_picker相当于一个创建工厂函数
        server_picker = lrucache_server_picker(key, version,
                                               create_server_picker, up_conf, checker)
    end

    -- 默认使用的picker是
    local server, err = server_picker.get(ctx)
    ctx.balancer_server = server
	-- 如果是域名就将其解析成ip
    local domain = server_picker.addr_to_domain[server]
    local res, err = lrucache_addr(server, nil, parse_addr, server)

    res.domain = domain
    ctx.balancer_ip = res.host
    ctx.balancer_port = res.port
    ctx.server_picker = server_picker

    return res
end


local function create_server_picker(upstream, checker)
    local picker = pickers[upstream.type]
    if not picker then
        -- 默认是apisix.balancer.roundrobin
        pickers[upstream.type] = require("apisix.balancer." .. upstream.type)
        picker = pickers[upstream.type]
    end

    if picker then
        local nodes = upstream.nodes
        local addr_to_domain = {}
        for _, node in ipairs(nodes) do
            if node.domain then
                local addr = node.host .. ":" .. node.port
                addr_to_domain[addr] = node.domain
            end
        end
		-- 获取健康的后端并创建picker对象
        local up_nodes = fetch_health_nodes(upstream, checker)
        local server_picker = picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
        server_picker.addr_to_domain = addr_to_domain
        return server_picker
    end

    return nil, "invalid balancer type: " .. upstream.type, 0
end

picker对象的接口基本上差不多,实现了get方法,会基于创建时传入的后端列表选择一个后端返回,这里就不继续深入了。

当选择到了后端之后access_by_lua_block这阶段就完成了,流量继续往下就是balancer_by_lua_block阶段。

代码简化如下:

-- apisix\init.lua
function _M.http_balancer_phase()
    local api_ctx = ngx.ctx.api_ctx
    if not api_ctx then
        core.log.error("invalid api_ctx")
        return core.response.exit(500)
    end

    load_balancer.run(api_ctx.matched_route, api_ctx, common_phase)
end

-- apisix\balancer.lua
function _M.run(route, ctx, plugin_funcs)
    local server, err

    if ctx.picked_server then
        -- use the server picked in the access phase
        server = ctx.picked_server
        ctx.picked_server = nil
		-- 设置必要的参数,超时时间,重试次数等。
        set_balancer_opts(route, ctx)

    else
        -- 其他逻辑
    end
	-- 开始转发流量
    local ok, err = set_current_peer(server, ctx)

    ctx.proxy_passed = true
end

至此apisix的路由转发功能大致完成,而后续的各种过滤操作,这里就不看了。

SSL证书匹配

ssl的相关操作在ssl_certificate_by_lua_block阶段对应的代码是http_ssl_phase

再次之前先看看ssl路由的创建和初始化

function _M.init_worker()
    local err
    ssl_certificates, err = core.config.new("/ssl", {
        automatic = true,
        item_schema = core.schema.ssl,
        checker = function (item, schema_type)
            return apisix_ssl.check_ssl_conf(true, item)
        end,
        filter = ssl_filter,
    })
end

可以看到ssl初始逻辑和router_http差不多,也是创建一个对象用于同步etcd的数据。

然后继续看http_ssl_phase的代码

function _M.http_ssl_phase()
    local ngx_ctx = ngx.ctx
    local api_ctx = ngx_ctx.api_ctx
    local ok, err = router.router_ssl.match_and_set(api_ctx)
end


-- apisix\ssl\router\radixtree_sni.lua
function _M.match_and_set(api_ctx)
    local err
    -- 然后创建一个radixtree的路由对象
    if not radixtree_router or
       radixtree_router_ver ~= ssl_certificates.conf_version then
        radixtree_router, err = create_router(ssl_certificates.values)
        if not radixtree_router then
            return false, "failed to create radixtree router: " .. err
        end
        radixtree_router_ver = ssl_certificates.conf_version
    end

    local sni
    -- 获取客户端请求的域名用作路由匹配
    sni, err = apisix_ssl.server_name()
    if type(sni) ~= "string" then
        local advise = "please check if the client requests via IP or uses an outdated protocol" ..
                       ". If you need to report an issue, " ..
                       "provide a packet capture file of the TLS handshake."
        return false, "failed to find SNI: " .. (err or advise)
    end

    core.log.debug("sni: ", sni)

    local sni_rev = sni:reverse()
    -- 
    local ok = radixtree_router:dispatch(sni_rev, nil, api_ctx)

    if type(api_ctx.matched_sni) == "table" then
        local matched = false
        for _, msni in ipairs(api_ctx.matched_sni) do
            if sni_rev == msni or not str_find(sni_rev, ".", #msni) then
                matched = true
            end
        end
    end

    local matched_ssl = api_ctx.matched_ssl

    ngx_ssl.clear_certs()
    -- 基于匹配的ssl证书信息,设置服务端的ssl证书
    ok, err = set_pem_ssl_key(sni, matched_ssl.value.cert,
                              matched_ssl.value.key)
    return true
end

local function create_router(ssl_items)
    local ssl_items = ssl_items or {}

    local route_items = core.table.new(#ssl_items, 0)
    local idx = 0

    for _, ssl in config_util.iterate_values(ssl_items) do
        if ssl.value ~= nil and
            (ssl.value.status == nil or ssl.value.status == 1) then  -- compatible with old version

            local j = 0
            local sni
            if type(ssl.value.snis) == "table" and #ssl.value.snis > 0 then
                sni = core.table.new(0, #ssl.value.snis)
                for _, s in ipairs(ssl.value.snis) do
                    j = j + 1
                    sni[j] = s:reverse()
                end
            else
                sni = ssl.value.sni:reverse()
            end

            idx = idx + 1
            route_items[idx] = {
                paths = sni,
                -- 跟http路由差不多,也是基于ssl列表的数据创建一个handler函数用于设置匹配的ssl
                handler = function (api_ctx)
                    if not api_ctx then
                        return
                    end
                    api_ctx.matched_ssl = ssl
                    api_ctx.matched_sni = sni
                end
            }
        end
    end
    local router, err = router_new(route_items)

    return router
end

ssl证书的路由匹配其实和http路由差不多,不同之处在于处理的逻辑不一样,但是流程大致是一致的,首先创建一个可以与etcd同步的数据对象,时刻保持同步,然后基于已有的数据创建路由并创建对应的handler函数。

总结

apisix的核心逻辑不是太复杂,但是因为功能比较多,所以代码中有很多的验证逻辑,如果略过会拖慢阅读速度。

以后有空写一个简单的apisix原型^_^.