如何使用nginx为influxdb提供API网关服务

目录:

InfluxDB是一个开源的时序数据库,使用GO语言开发,特别适合用于处理和分析资源监控数据这种时序相关数据。而InfluxDB自带的各种特殊函数如求标准差,随机取样数据,统计数据变化比等,使数据统计和实时分析变得十分方便。关于influxdb的基础知识本文就不做详细介绍了,这里主要是针对influxdb在开发中的痛点提供一个API的转换渠道。

Influxdb本身自带有HTTP接口,但是语法可能比较冷门,使用的是Line Protocol。这种协议在实际开发中对开发人员可能相当不友好,一个不小心就会出现数据或字段的转义错误。本文提供一种基于JSON格式的语法转换渠道,基于nginx的njs模块提供JSON到Line Protocol的转换。

基本思路:APP/Server ----json----> nginx ----line protocols----> influxdb

关于如何编译带njs模块的nginx,请参考如何在CentOS 7.2上使用nginx 1.15.0

1 如何通过POST一个简单的JSON报文往influxdb里插入数据?

influxdb的line protocol支持单条插入与批量插入,本文以插入单条数据为例。例如我们现在有一个用户APP端的行为监控需求,需要从APP端不断发送用户在APP上的操作记录或者收集用户资料。

示例:插入数据的请求报文协议

HTTP协议Header与URL(PS:这个URL格式需根据自身的业务场景自行定制)

名称 格式 示例 备注
HTTP方法 固定值 POST  
URL /analysis/${business-type}/${measurement} /analysis/action-monitor/start_apply_event  
├─ ${business-type} /a-zA-Z0-9\-/ action-monitor 业务类型:与influxdb的数据库一一映射。譬如我们有action-monitor库,balabala库,foobarfoobar库等等分别对应不同的业务。
└─ ${measurement} /a-zA-Z0-9_\-/ start_apply_event 监控行为,即influxdb的measurement。譬如我们有一张表叫start_apply_event……
Content-Type 固定值 application/json;charset=UTF-8 HTTP header

写入接口消息体JSON格式(PS:这个消息体格式可以通用)

名称 类型 必选 备注
tags Array 要监控上报的tag数组,tag用于对记录分类/分组
└─ tags[name] Object   键值对,对应influxdb的tag字段名和值;支持string、bool、number等类型
fields Array 要监控上报的field数组,field用于存储各种记录值
└─ fields[name] Object   键值对,对应influxdb的field字段名和值;支持string、bool、number等类型
time Date 时间戳:该记录的写入时间,默认值为当前influxdb的系统时间,精确到毫秒。

请求JSON示例

{
    "tags":{
        "role":"Sales Agent",
    },
    "fields":{
        "user-id":"SA20163389",
        "age":32,
        "married":true,
        "address":"广东省深圳市福田区新浩E都3楼"
    },
    "time":"2018-07-07T11:12:54.111817682Z" //可选
}

成功返回JSON示例

{
    "ret":0,
    "msg":"success",
    "status":204
}

注:请求的JSON报文如果不合法,nginx会直接返回400;其他错误则返回influxdb原始JSON错误消息。注:influxdb的HTTP协议本身在处理错误请求时返回的也是JSON报文。


2 如何通过简单URL构成的GET请求去查询influxdb?

查询influxdb的语法本身是通过在URL的query string中构建sql语句实现的,但是现实中我们往往不能直接将sql接口暴露给前端使用。所以这里提供的了一种从URL到sql的转换方法,其中sql中的各种参数,需在URL中用正则式限定,防止sql注入。

示例:查询用户最近一次写入记录的时间

根据指定的用户ID与行为,查询一段时间内,该用户最近一次上报/写入数据的时间。

查询接口说明,见下表

名称 格式 示例 备注
HTTP方法 固定值 GET  
URL /analysis-last-${time}/${business-type}/${measurement}/${uid} /analysis-last-30d/action-monitor/start_apply_event/6h20031  
├─ ${time} /\d+[wdhms]/ 30d 要查询的时间范围,如30天内则为30d,24小时则为24h,支持周w、天d、时h、分m、秒s。
├─ ${business-type} /a-zA-Z0-9\-/ action-monitor 业务类型:与influxdb的数据库一一映射。譬如我们有action-monitor库,balabala库,foobarfoobar库等等分别对应不同的业务。
├─ ${measurement} /a-zA-Z0-9_\-/ start_apply_event 监控行为,及influxdb的measurement。譬如我们有一张表叫start_apply_event……
└─ ${uid} /a-zA-Z0-9_\-/ 6h20031 用户ID

返回的JSON示例: 返回的JSON消息nginx不做任何转换处理,直接返回influxdb原始JSON串。influxdb API参考链接

{
    "results": [
        {
            "statement_id": 0,
            "series": [
                {
                    "name": "start_apply_event",
                    "columns": [
                        "time",
                        "uid"
                    ],
                    "values": [
                        [
                            "2018-07-07T11:12:54.111817682Z",
                            "6h20031"
                        ]
                    ]
                }
            ]
        }
    ]
}

3 nginx配置说明与njs模块代码

示例:nginx配置文件

注意:该配置文件中,数据库名称采用映射模式可以对前端隐藏库名;而表名没有做映射,而是简单的采用白名单模式;

#允许访问的influxdb库&映射关系,在该map里列出允许访问的库(凡是没有映射的库,nginx会拦截相应的请求)
map $analysis_biz_type $influx_db {
    default                 "NotFound";
    "action-monitor"        "action_monitor_db";
    "data-statistics"       "data_statistics_db";
    "cs-monitor"            "cs_monitor_db";
}

#允许访问的influxdb表,在该map里列出允许访问的表(凡是没有列出的表,nginx会拦截相应的请求)
map $analysis_table $is_valid_influx_table {
    default                        0;
    "start_apply_event"            1;
    "submit_apply_event"           1;
    "modify_pwd_event"             1;
    "activate_event"               1;
    "launch_event"                 1;
    "contact_event"                1;
    "cs_login"                     1;
    "cs_record_commit"             1;
    "dau_event"                    1;
}

# influxdb HTTP转发接口:提供权限控制、数据库过滤、表过滤、数据过滤以及数据格式转换等
server {
    listen  80 default_server;
    server_name  localhost;
    
    access_log  logs/influxdb.access.log  main;

    # 行为监控-提交数据API
    location ~ ^/analysis/(?<analysis_biz_type>[a-zA-Z0-9\-]+)/(?<analysis_table>[a-zA-Z0-9_]+)$ {
        #拦截不符合报文规范的请求
        limit_except POST {
            deny all;
        }
        if ( $influx_db = "NotFound" ) {
            return 400 "Invalid business type";
        }
        if ( $is_valid_influx_table = 0 ) {
            return 400 "Invalid measurement";
        }
        
        #2MB buffer for large JSON
        client_body_buffer_size 2048k;

        #执行njs模块的json转换函数
        js_content insertInfluxDB;
    }

    # 行为监控-查询最近一次提交数据时间API
    location ~ ^/analysis-last-(?<analysis_time>[0-9]+[wdhms])/(?<analysis_biz_type>[a-zA-Z0-9\-]+)/(?<analysis_table>[a-zA-Z0-9_]+)/(?<analysis_uid>[a-zA-Z0-9_\-]+)$ {
        limit_except GET {
            deny all;
        }
        if ( $influx_db = "NotFound" ) {
            return 400 "Invalid business type";
        }
        if ( $is_valid_influx_table = 0 ) {
            return 400 "Invalid measurement";
        }
        js_content queryInfluxDB;
    }

    # influxdb nginScript内部转发接口,用来接收从njs模块处理后的请求
    location /influxdb {
        internal;
        rewrite ^\/influxdb(\/.*)$ $1 break;
        proxy_set_header X-Real-Ip $real_ip_or_remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_pass http://192.168.10.100:8086; #这里是你的influxdb的服务器地址,可以用upstream去定义集群
    }
}

示例:njs模块配置文件

注意:该配置文件中,数据库名称采用映射模式可以对前端隐藏库名;而表名没有做映射,而是简单的采用白名单模式;

//==============================================InfluxDB==============================================

//字符串辅助函数(注:njs暂不支持arguments以及prototype扩展)
function format(template, args) {
    return template.replace(/{(\d+)}/g, function (match, number) {
        return typeof args[number] != 'undefined'
            ? args[number].toString()
            : match;
    });
};

//njs函数,提供json到line protocal的转换
function insertInfluxDB(r) {
    try {
        var s = r.variables.analysis_table, data = JSON.parse(r.requestBody);
        for (var t in data.tags) {
            s += format(",{0}={1}", [t, data.tags[t].replace(/[,=\s]/g, "\\$&")]);
        }
        s += " ";

        for (var f in data.fields) {
            if (typeof data.fields[f] == "string") {
                s += format('{0}="{1}",', [f, data.fields[f].replace(/[\\"]/g,'\\$&')]);
            } else {
                s += format('{0}={1},', [f, data.fields[f]]);
            }
        }
        s = s.slice(0, -1);

        //如果时间是字符串格式,则转换成influxdb使用的数值(UTC时间)
        if (typeof data.time == "string") {
            s += " " + (new Date(data.time)).getTime() * 1000000;
        }

        r.subrequest("/influxdb/write", { method: 'POST', body: s, args: "db=" + encodeURIComponent(r.variables.influx_db) }, function (res) {
            if (res.status == 204) {
                r.headersOut["Content-Type"] = "application/json;charset=UTF-8";
                r.return(200, JSON.stringify({
                    "ret": 0,
                    "msg": "success",
                    "status": res.status
                }));
            } else {
                r.return(res.status, res.responseBody);
            }
        });
    } catch (error) {
        r.error("insertInfluxDB exception: " + error);
        r.return(400, "Invalid JSON format");
    }
}

//njs函数,提供URL中的参数到sql的转换(注:如果参数没有在nginx的conf文件中限定格式,这里就需要防止sql注入)
function queryInfluxDB(r) {
    try {
        var sql = 'SELECT last("uid") AS "uid"  FROM "{0}" WHERE time > now() - {1} AND "uid"=\'{2}\'';
        sql = format(sql, [r.variables.analysis_table, r.variables.analysis_time, r.variables.analysis_uid]);
        var queryString = 'pretty=true&db=' + encodeURIComponent(r.variables.influx_db) + '&q=' + encodeURIComponent(sql);
        r.internalRedirect("/influxdb/query?" + queryString);
    } catch (error) {
        r.error("queryInfluxDB exception: " + error);
        r.return(400, "Invalid query format");
    }
}

附录:Influxdb与Chronograf的安装说明

Influxdb的安装方法

CentOS上可使用wget从官网上下载influxdb 1.6.0,然后通过yum localinstall本地安装RPM包。安装完毕后会自动注册服务,由systemd监管。更多细节可以参考官方文档

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.0.x86_64.rpm
sudo yum localinstall influxdb-1.6.0.x86_64.rpm

Influxdb安装后的基本信息

名称
操作系统 CentOS 7.2
应用版本 Influxdb 1.6.0
数据目录 /var/lib/influxdb
配置文件 /etc/influxdb/influxdb.conf
日志文件 /var/log/influxdb/influxd.log
启动命令 systemctl start influxdb.service
停止命令 systemctl stop influxdb.service

Chronograf的安装方法

CentOS上可使用wget从官网上下载Chronograf 1.6.0,然后通过yum localinstall本地安装RPM包。安装完毕后会自动注册服务,由systemd监管。更多细节可以参考官方文档

wget https://dl.influxdata.com/chronograf/releases/chronograf-1.6.0.x86_64.rpm
sudo yum localinstall chronograf-1.6.0.x86_64.rpm

Chronograf安装后的基本信息

名称
操作系统 CentOS 7.2
应用版本 Chronograf 1.6.0
数据目录 /var/lib/chronograf
配置文件 /etc/chronograf/chronograf.conf
日志文件 /var/log/chronograf/chronograf.log
启动命令 systemctl start chronograf.service
停止命令 systemctl stop chronograf.service

[返回首页]