NATS JetStream

The nats integration provides an opinionated way to interact with NATS for helix services. It uses JetStream only for distributed key-value store and higher Quality of Service (QoS).

Trace attributes

The nats integration sets the following trace attributes:

  • span.kind

When applicable, these attributes can be set as well:

  • nats.message.subject
  • nats.subscription.subject
  • nats.subscription.queue
  • nats.jetstream.consumer.name
  • nats.jetstream.consumer.ordered
  • nats.jetstream.consumer.subjects
  • nats.jetstream.stream.name
  • nats.jetstream.stream.subjects
  • nats.jetstream.kv.key
  • nats.jetstream.kv.bucket.name

Example:

nats.message.subject: "demo"
nats.subscription.queue: "demo-queue"
nats.subscription.subject: "_INBOX.92V248IJYsOunA5qe5I22c"
span.kind: "consumer"

Usage

The integration uses the official Go library maintained by the NATS / Synadia team.

Install the Go module with:

$ go get go.nunchi.studio/helix/integration/nats

Simple example on how to import, configure, and use the integration:

import (
  "context"

  natsinte "go.nunchi.studio/helix/integration/nats"
  "go.nunchi.studio/helix/service"

  "github.com/nats-io/nats.go"
)

func main() {
  cfg := nats.Config{
    Addresses: []string{"nats://localhost:4222"},
  }

  js, err := nats.Connect(cfg)
  if err != nil {
    return err
  }

  ctx := context.Background()
  js.Publish(ctx, &nats.Msg{
    Subject: "demo",
    Sub: &nats.Subscription{
    	Queue: "demo-queue",
    },
    Data: []byte(`{ "hello": "world" }`),
  })

  if err := service.Start(); err != nil {
    panic(err)
  }

  if err := service.Close(); err != nil {
    panic(err)
  }
}

Is something missing?

If you notice something we've missed or could be improved on, please follow this link and submit a pull request to the repository. Once we merge it, the changes will be reflected on the website the next time it is deployed. Thank you for your contributions!
Built by
Nunchi