Jump to content

Benthos

From Wikitech


Benthos defines itself as fancy stream processing made operationally mundane.

n.b. Benthos has been acquired by Redpanda and rebranded as Redpanda Connect. The licence is not longer OSI compatible.

An open-source fork called Bento has been created: https://github.com/warpstreamlabs/bento

Benthos on centrallog

As of Nov 2022 it is used at WMF to provide a real time (sampled) stream of Varnish requests (webrequest). The initial groundwork has been done as part of bug T319214 by Luca Toscano and Filippo Giunchedi. The plan is to expand its use for simple and stateless stream processing, inside and outside SRE.

Observability into the system is provided by the the Benthos Grafana dashboard.

The webrequest pipeline runs on centrallog hosts, reads the webrequest Kafka topics, samples and enriches the stream and writes back to webrequest_sampled Kafka topic for general consumption. The sampled topic then powers (via Druid) the Turnilo dashboard and the Superset dashboard (NDA-only links)

A second Benthos instance consumes the same webrequest topic to generate Prometheus metrics.

Benthos on cache hosts

As of Apr 2024 Benthos is also used to replace VarnishKafka on cache (cp) hosts, starting with the ulsfo DC as guinea pig. The activity is tracked on various tickets, mainly bug T351117 and bug T358109.

The original log flow started with Varnish logs on cache hosts, read by VarnishKafka (using the Varnish API to read the log ring buffer) and sent to Kafka Jumbo cluster. This leads to some inconveniences, mainly the lack of observability at HAProxy level.

Then new flow starts with HAProxy writing logs (in syslog format) to a special UDP socket. A Benthos instance on each cache host reads from this socket, parses the logs and produces the required metrics, while sending the formatted message to the appropriate Kafka topic.

This will completely replace VarnishKafka, as Benthos message format is a superset of VarnishKafka ones, so downstream pipeline is already able to accept/process new messages from HAProxy.

How to add a new instance

Adding a new instance is simple enough:

  • Add an entry to profile::benthos::instances in hieradata/role/common/syslog/centralserver.yaml:
      # The name of the instance
      foobar:
        # This is passed to your template as an env variable you can reference as $PORT.
        # Make sure to set this value to a port that is not used already by other Benthos
        # instances!
        port: 4151
        # Any additional env variables you want to be able to modify the behaviour of
        # your instance
        env_variables:
          # Yes, this is a troll about how bad yaml is
          - pinkunicorn: no
        kafka:
          cluster: jumbo
          # Be careful - each instance will only point to the kafka in one DC.
          # If you need both, either subscribe to the replicated topics or add
          # a second instance, or again run separately in the two sites
          site: eqiad
          topics:
          - "foobar"
          - "anothertopic"
    
  • Add the configuration for your benthos instance as a file modules/profile/files/benthos/instances/foobar.yaml

Clearing consumer group lag

Throws away data (kinda)

If Benthos is lagging behind and you want to reset it to consume latest data, skipping over any unprocessed data, run this on kafka-logging1001.eqiad.wmnet:

kafka-consumer-groups --bootstrap-server localhost:9092 --group benthos-mw-accesslog-metrics --topic mediawiki.httpd.accesslog --reset-offsets --to-latest --dry-run

Run with --execute instead of --dry-run to actually do it.