利用 Vector 从日志创建指标来提高系统的可观测性

可观测性(observability)

控制理论中的可观测性是指系统可以由其外部输出推断其内部状态的程度。系统的可观测性和可控制性是数学上对偶的概念,最早由匈牙利裔工程师鲁道夫·卡尔曼针对线性动态系统提出的概念。以信号流图来看,如果所有的内部状态都可以输出到输出信号,此系统即有可观测性。

现场可靠性工程(Site reliability enginerring,SRE)

现场可靠性工程是一门将软件工程应用于基础设施以及运营的学科,该概念由 Google 于 2003 年提出。现场可靠性工程的主要目标是创建可扩展和高可用性的软件系统。

阿卡迈(Akamai)

阿卡迈科技是一家总部位于美国马萨诸塞州剑桥市的内容分发网络和云服务提供商,是世界上最大的分布式计算平台之一,承担了全球 15% ~ 30% 的网络流量。该公司经营着分布于世界各地的服务器网络,主要业务为出租服务器资源(如带宽和存储空间)给那些希望通过从靠近用户的服务器来分发内容来提高自己网站访问速度的客户。当用户浏览到 Akamai 客户的网站时,用户的浏览器将被重定向到 Akamai 服务器上的副本,以达到提速的目的。此操作对大部分客户而言完全透明,并且可以降低遭到分布式拒绝服务攻击的影响。

背景

现场可靠性工程师的一个关键职责是提高应用的可观测性。理想情况下,应用从开始开发的第一天,就应该有完备的日志和指标机制,从而可以根据它们创建告警。但是现实情况却总是缺这少那,比如受到业务驱动的因素占比过高,以至于为了快速上线,根本没有考虑到系统的指标机制。本文展示了一种方法,可以根据应用的日志来创建指标。

具体案例:阿卡迈数据流

阿卡迈非常好的一点是它为不同的主题提供了大量的能力,比如卸掉负载以及安全等。但是由于它提供的一些报告不是实时的,从而在可观测性上存在一些延迟。而且,如果你想调试一些东西,你需要访问 Luna UI,但并不是所有人都有权限访问,也不是所有人都会使用它。在这种情况下,可以使用阿卡迈数据流来克服这种情况。阿卡迈数据流是阿卡迈提供的一个产品,可以将它们的边缘服务器产生的原始日志推送到你指定的目的地,比如 AWS S3、Datadog、Azure 或者 Google Cloud 等等。你还可以从官网文档 https://developer.akamai.com/akamai-datastream 了解更多关于阿卡迈数据流的信息。

思路是消费从数据流过来的日志,并让它们对现场可靠性工程师可用。然后,基于这些日志创建出一些指标,并且以 prometheus 指标的形式暴露出来。

首先看下数据流日志的格式:

  "version": 1,
  "cp": "123456",
  "reqId": "1239f220",
  "reqTimeSec": "1573840000",
  "bytes": "4995",
  "cliIP": "128.147.28.68",
  "statusCode": "206",
  "proto": "HTTPS",
  "reqHost": "test.hostname.net",
  "reqMethod": "GET",
  "reqPath": "/path1/path2/file.ext",
  "reqPort": "443",
  "rspContentLen": "5000",
  "rspContentType": "text/html",
  "UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29",
  "tlsOverheadTimeMSec": "0",
  "tlsVersion": "TLSv1",
  "objSize": "484",
  "uncompressedSize": "484",
  "overheadBytes": "232",
  "totalBytes": "0",
  "queryStr": "param=value",
  "accLang": "en-US",
  "cookie": "cookie-content",
  "range": "37334-42356",
  "referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest",
  "xForwardedFor": "8.47.28.38",
  "maxAgeSec": "3600",
  "reqEndTimeMSec": "3",
  "errorCode": "ERR_ACCESS_DENIED|fwd_acl",
  "turnAroundTimeMSec": "11",
  "transferTimeMSec": "125",
  "dnsLookupTimeMSec": "50",
  "customField": "any-custom-value",
  "cacheStatus": "1",
  "country": "US",
  "city": "HERNDON"
}


可以看到日志提供了大量有用的信息,比如被命中的路径、请求是否击中缓存、请求是否被屏蔽等等。如果将这些信息索引到 Elastic search 会很有帮助,不过还是先看看怎么从日志抽取出指标吧。

vector.dev

如果你从没听说过 vector,那么可以先把它看成是一个高性能的可观测性路由器。它可以从多个数据源读取数据,做必要的转换,然后推送到多个目的地。它使用 Rust 编写,效率特别高。你可以从官网 https://vector.dev/ 了解更多。

要使用 vector,需要实现如下的 ETL:

利用 Vector 从日志创建指标来提高系统的可观测性
利用 Vector 从日志创建指标来提高系统的可观测性

从上图可以看出,阿卡迈会把日志推给 S3,然后 vector 会读取这些日志,创建指标并通过 Prometheus 实例暴露出来,并且把日志推到 Kafka。

现在来看如何解析从 S3 来的日志:

[sources.s3]
  type = "aws_s3"
  region = "your_region"
  auth.access_key_id = "access_key_id"
  auth.secret_access_key = "secret_access_key"
  compression = "gzip"
  sqs.delete_message = true # optional, default
  sqs.poll_secs = 15 # optional, default, seconds
  sqs.queue_url = "SQS_URL" # required

以上是为解析 S3 来的日志所做的基本配置。注意 vector 是需要一个 SQS 队列的,用来水平扩展 vector:每次阿卡迈写入日志到 S3 桶,就会触发一个事件到 SQS。Vector 实例被通知到有新的日志被写入了,就会进行从 S3 桶读取的过程。

下一步就是解析存储在日志里的 json 数据了,这很简单,只需要使用如下代码片段:

[transforms.parsing]
  type = "remap" # required
  inputs = ["s3"] # required
  source = '''
  . = parse_json!(string!(.message))
  '''

这里使用上一步的 S3 作为输入,解析 json,并分割到不同的字段。

在这个转换中,也可以添加(或者删除)更多的字段到消息中。比如下面的代码片段展示了如何往消息中添加更多的字段:添加简单的字符串、使用支持的函数添加时间戳、以及删除不感兴趣的字段:

[transforms.parsing]
  type = "remap" # required
  inputs = ["s3"] # required
  source = '''
  . = parse_json!(string!(.message))
  .service.id = "123456789"
 .timestamp = now()
  del(.myunusedfield)
  '''


这些转换都是基于 Vector Remap 语言,语法简单,内置函数丰富。

以下代码片段把日志写到 Kafka 里:

[sinks.kafka]
  type = "kafka" # required
  inputs = ["parsing"] # required
  bootstrap_servers = "Kafka_servers"
  compression = "lz4" # optional, default
  topic = "destination_topic" # required
# Encoding
  encoding.codec = "json" # required
# Healthcheck
  healthcheck.enabled = true


这里写入 Kafka 遵循了常见的模式:写入一个 Kafka 主题,该主题将用作 Elastic Search 的争用指标。假如日志可以快速写入,Elastic Search 不会被负载打垮,那么我们可以比较慢地消费它们。

好了,完整的样例如下所示:

[sources.s3]
  type = "aws_s3"
  region = "your_region"
  auth.access_key_id = "access_key_id"
  auth.secret_access_key = "secret_access_key"
  compression = "gzip"
  sqs.delete_message = true # optional, default
  sqs.poll_secs = 15 # optional, default, seconds
  sqs.queue_url = "SQS_URL" # required
[transforms.parsing]
  type = "remap" # required
  inputs = ["s3"] # required
  source = '''
  . = parse_json!(string!(.message))
  '''
[sinks.kafka]
  type = "kafka" # required
  inputs = ["parsing"] # required
  bootstrap_servers = "Kafka_servers"
  compression = "lz4" # optional, default
  topic = "destination_topic" # required
# Encoding
  encoding.codec = "json" # required
# Healthcheck
  healthcheck.enabled = true

这是一个 vector 流水线的基本轮廓,包含了定义数据源的阶段、将 JSON 分割到不同字段的转换阶段,以及指示 vector 把结果保存到哪里的下沉阶段。

指标

我们从数据流读取了日志并且发送到了 Kafka,所有的 SRE 都可以按照想要的方式使用了。现在来看看如何基于日志创建指标。为了验证这样可行,我们来创建两个不同的指标:缓存命中和缓存未命中。

为此,我们添加一些转换阶段,第一、筛选出感兴趣的日志:

[transforms.cache_misses]
  type = "filter" # required
  inputs = ["parsing"] # required
  condition = '.cacheStatus == "0"'

在这一阶段中,我们筛选出 cacheStatus 为 0 的记录作为输入。接下来,使用这些被筛选出的日志创建指标出来:

[transforms.logs2metrics-cache_misses]
  # General
  type = "log_to_metric" # required
  inputs = ["cache_misses"] # required
# Metrics
  [[transforms.logs2metrics-cache_misses.metrics]]
    # General
    field = "cacheStatus" # required
    name = "cache_misses" # optional, no default
    namespace = "website" # optional, no default
    type = "counter" # required

我们对前一阶段的筛选后的日志,使用了 log_to_metric 转换。在这个转换中,我们指示 vector 基于 cacheStatus 字段来创建指标,命名为 cache_misses。在 website 的命名空间中,将类型指定为 counter 计数器,因为我们要统计未命中的缓存的次数。通过这样,vector 会产生如下的指标值:

website_cache_misses 50

你也可以添加标签到指标中。假设你有几个市场并且想要为每个市场创建如上的指标。如果不同市场由 accept-language 请求头区分,就可以使用如下代码片段来添加指标:

[transforms.logs2metrics-cache_misses]
  # General
  type = "log_to_metric" # required
  inputs = ["cache_misses"] # required
# Metrics
  [[transforms.logs2metrics-cache_misses.metrics]]
    # General
    field = "cacheStatus" # required
    name = "cache_misses" # optional, no default
    namespace = "website" # optional, no default
    type = "counter" # required
    tags.country = "{{message.accept-language}}"

以上代码片段的结果如下:

website_cache_misses{country="es-ES"} 50

通过复制这个过程同样可以创建命中缓存指标,最后,通过添加新的下沉阶段暴露这些新的指标:

[sinks.prometheus_exporter]
  type = "prometheus_exporter" # required
  inputs = ["logs2metrics*"] # required
  address = "0.0.0.0:9080" # required


使用这个新的下沉阶段,vector 将指标以开放指标的格式暴露在 9080 端口,而后可由 Prometheus 使用。总结以上步骤,完整的 vector 配置如下:

[sources.s3]
  type = "aws_s3"
  region = "your_region"
  auth.access_key_id = "access_key_id"
  auth.secret_access_key = "secret_access_key"
  compression = "gzip"
  sqs.delete_message = true # optional, default
  sqs.poll_secs = 15 # optional, default, seconds
  sqs.queue_url = "SQS_URL" # required
[transforms.parsing]
  type = "remap" # required
  inputs = ["s3"] # required
  source = '''
  . = parse_json!(string!(.message))
  '''
[transforms.cache_misses]
  type = "filter" # required
  inputs = ["parsing"] # required
  condition = '.cacheStatus == "0"'
[transforms.logs2metrics-cache_misses]
  # General
  type = "log_to_metric" # required
  inputs = ["cache_misses"] # required
# Metrics
  [[transforms.logs2metrics-cache_misses.metrics]]
    # General
    field = "cacheStatus" # required
    name = "cache_misses" # optional, no default
    namespace = "website" # optional, no default
    type = "counter" # required
[transforms.cache_hits]
  type = "filter" # required
  inputs = ["parsing"] # required
  condition = '.cacheStatus == "1"'
[transforms.logs2metrics-cache_hits]
  # General
  type = "log_to_metric" # required
  inputs = ["cache_hits"] # required
# Metrics
  [[transforms.logs2metrics-cache_hits.metrics]]
    # General
    field = "cacheStatus" # required
    name = "cache_hits" # optional, no default
    namespace = "website" # optional, no default
    type = "counter" # required
[sinks.prometheus_exporter]
  type = "prometheus_exporter" # required
  inputs = ["logs2metrics*"] # required
  address = "0.0.0.0:9080" # required
[sinks.kafka]
  type = "kafka" # required
  inputs = ["parsing"] # required
  bootstrap_servers = "Kafka_servers"
  compression = "lz4" # optional, default
  topic = "destination_topic" # required
# Encoding
  encoding.codec = "json" # required
# Healthcheck
  healthcheck.enabled = true

总结

Vector 是一个很强大的工具,不仅允许你消费日志,还能基于日志快速创建出指标。 

发表评论

登录后才能评论
服务中心
服务中心
联系客服
联系客服
投诉举报
返回顶部