helix

NATS JetStream

The nats integration provides an opinionated way to interact with NATS for helix services. It uses JetStream only for distributed key-value store and higher Quality of Service (QoS).

Trace attributes

The nats integration sets the following trace attributes:

  • span.kind

When applicable, these attributes can be set as well:

  • nats.message.subject
  • nats.subscription.subject
  • nats.subscription.queue
  • nats.jetstream.consumer.name
  • nats.jetstream.consumer.ordered
  • nats.jetstream.consumer.subjects
  • nats.jetstream.stream.name
  • nats.jetstream.stream.subjects
  • nats.jetstream.kv.key
  • nats.jetstream.kv.bucket.name

Example:

nats.message.subject: "demo"
nats.subscription.queue: "demo-queue"
nats.subscription.subject: "_INBOX.92V248IJYsOunA5qe5I22c"
span.kind: "consumer"

Usage

The integration uses the official Go library maintained by the NATS / Synadia team.

Install the Go module with:

$ go get go.nunchi.studio/helix/integration/nats

Simple example on how to import, configure, and use the integration:

import (
  "context"

  "go.nunchi.studio/helix/integration/nats"
  "go.nunchi.studio/helix/service"
)

func main() {
  cfg := nats.Config{
    Addresses: []string{"nats://localhost:4222"},
  }

  js, err := nats.Connect(cfg)
  if err != nil {
    return err
  }

  ctx := context.Background()
  js.Publish(ctx, &nats.Msg{
    Subject: "demo",
    Sub: &nats.Subscription{
    	Queue: "demo-queue",
    },
    Data: []byte(`{ "hello": "world" }`),
  })

  if err := service.Start(); err != nil {
    panic(err)
  }

  if err := service.Close(); err != nil {
    panic(err)
  }
}

Is something missing?

Built by
Nunchi