Skip to content

feat: add support for Consul prepared queries #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# GRPC consul resolver

Feature rich and easy-to-use resolver which return endpoints for service from the [Hashicorp Consul](https://www.consul.io) and watch for the changes.
Feature rich and easy-to-use resolver which return endpoints for service or prepared query from the [Hashicorp Consul](https://www.consul.io) and watch for the changes.

This library is *production ready* and will always *save backward-compatibility*
This library is _production ready_ and will always _save backward-compatibility_

## Quick Start

Expand All @@ -11,26 +11,30 @@ For using resolving endpoints from your [Hashicorp Consul](https://www.consul.io
For full example see [this section](#example)

## Connection string

`consul://[user:password@]127.0.0.127:8555/my-service?[healthy=]&[wait=]&[near=]&[insecure=]&[limit=]&[tag=]&[token=]`

*Parameters:*

| Name | Format | Description |
|--------------------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------|
| tag | string | Select endpoints only with this tag |
| healthy | true/false | Return only endpoints which pass all health-checks. Default: false |
| wait | as in time.ParseDuration | Wait time for watch changes. Due this time period endpoints will be force refreshed. Default: inherits agent property |
| insecure | true/false | Allow insecure communication with Consul. Default: true |
| near | string | Sort endpoints by response duration. Can be efficient combine with `limit` parameter default: "_agent" |
| limit | int | Limit number of endpoints for the service. Default: no limit |
| timeout | as in time.ParseDuration | Http-client timeout. Default: 60s |
| max-backoff | as in time.ParseDuration | Max backoff time for reconnect to consul. Reconnects will start from 10ms to _max-backoff_ exponentialy with factor 2. Default: 1s |
| token | string | Consul token |
| dc | string | Consul datacenter to choose. Optional |
| allow-stale | true/false | Allow stale results from the agent. https://www.consul.io/api/features/consistency.html#stale |
| require-consistent | true/false | RequireConsistent forces the read to be fully consistent. This is more expensive but prevents ever performing a stale read. |
_Parameters:_

| Name | Format | Description |
| ------------------ | ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------- |
| type | service/prepared_query | Whether to query a Consul service or a prepared query. Default: service |
| tag | string | Select endpoints only with this tag. Only when type=service. |
| healthy | true/false | Return only endpoints which pass all health-checks. Only when type=service. Default: false |
| wait | as in time.ParseDuration | Wait time for watch changes. Due this time period endpoints will be force refreshed. Only when type=service. Default: inherits agent property |
| insecure | true/false | Allow insecure communication with Consul. Default: true |
| near | string | Sort endpoints by response duration. Can be efficient combine with `limit` parameter default: "\_agent" |
| limit | int | Limit number of endpoints for the service. Default: no limit |
| timeout | as in time.ParseDuration | Http-client timeout. Default: 60s |
| max-backoff | as in time.ParseDuration | Max backoff time for reconnect to consul. Reconnects will start from 10ms to _max-backoff_ exponentialy with factor 2. Default: 1s |
| poll-interval | as in time.ParseDuration | How often to poll prepared queries. Only when type=prepared_query. Default: 30s |
| token | string | Consul token |
| dc | string | Consul datacenter to choose. Optional |
| allow-stale | true/false | Allow stale results from the agent. https://www.consul.io/api/features/consistency.html#stale |
| require-consistent | true/false | RequireConsistent forces the read to be fully consistent. This is more expensive but prevents ever performing a stale read. |

## Example

```go
package main

Expand Down
8 changes: 7 additions & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolv

ctx, cancel := context.WithCancel(context.Background())
pipe := make(chan []string)
go watchConsulService(ctx, cli.Health(), tgt, pipe)
switch tgt.Type {
case targetTypeService:
go watchConsulService(ctx, cli.Health(), tgt, pipe)
case targetTypePreparedQuery:
go watchPreparedQuery(ctx, cli.PreparedQuery(), tgt, pipe)
}

go populateEndpoints(ctx, cc, pipe)

return &resolvr{cancelFunc: cancel}, nil
Expand Down
94 changes: 92 additions & 2 deletions consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ func (r *resolvr) Close() {
}

//go:generate mockgen -package mocks -destination internal/mocks/resolverClientConn.go google.golang.org/grpc/resolver ClientConn
//go:generate mockgen -package mocks -destination internal/mocks/servicer.go -source consul.go servicer
//go:generate mockgen -package mocks -destination internal/mocks/consul.go -source consul.go servicer querier
type servicer interface {
Service(string, string, bool, *api.QueryOptions) ([]*api.ServiceEntry, *api.QueryMeta, error)
}

type querier interface {
Execute(string, *api.QueryOptions) (*api.PreparedQueryExecuteResponse, *api.QueryMeta, error)
}

func watchConsulService(ctx context.Context, s servicer, tgt target, out chan<- []string) {
res := make(chan []string)
quit := make(chan struct{})
Expand All @@ -50,7 +54,7 @@ func watchConsulService(ctx context.Context, s servicer, tgt target, out chan<-
var lastIndex uint64
for {
ss, meta, err := s.Service(
tgt.Service,
tgt.Target,
tgt.Tag,
tgt.Healthy,
&api.QueryOptions{
Expand Down Expand Up @@ -125,6 +129,92 @@ func watchConsulService(ctx context.Context, s servicer, tgt target, out chan<-
}
}

func watchPreparedQuery(ctx context.Context, q querier, tgt target, out chan<- []string) {
res := make(chan []string)
quit := make(chan struct{})
bck := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 10 * time.Millisecond,
Max: tgt.MaxBackoff,
}
go func() {
ticker := time.NewTicker(tgt.PollInterval)
defer ticker.Stop()

for {
ss, meta, err := q.Execute(
tgt.Target,
&api.QueryOptions{
Near: tgt.Near,
Datacenter: tgt.Dc,
AllowStale: tgt.AllowStale,
RequireConsistent: tgt.RequireConsistent,
},
)
if err != nil {
// No need to continue if the context is done/cancelled.
// We check that here directly because the check for the closed quit channel
// at the end of the loop is not reached when calling continue here.
select {
case <-quit:
return
default:
grpclog.Errorf("[Consul resolver] Couldn't fetch endpoints. target={%s}; error={%v}", tgt.String(), err)
time.Sleep(bck.Duration())
continue
}
}
bck.Reset()
grpclog.Infof("[Consul resolver] %d endpoints fetched in %s for target={%s}",
len(ss.Nodes),
meta.RequestTime,
tgt.String(),
)

ee := make([]string, 0, len(ss.Nodes))
for _, s := range ss.Nodes {
address := s.Service.Address
if s.Service.Address == "" {
address = s.Node.Address
}
ee = append(ee, fmt.Sprintf("%s:%d", address, s.Service.Port))
}

if tgt.Limit != 0 && len(ee) > tgt.Limit {
ee = ee[:tgt.Limit]
}
select {
case res <- ee:
<-ticker.C
continue
case <-quit:
return
}
}
}()

for {
// If in the below select both channels have values that can be read,
// Go picks one pseudo-randomly.
// But when the context is canceled we want to act upon it immediately.
if ctx.Err() != nil {
// Close quit so the goroutine returns and doesn't leak.
// Do NOT close res because that can lead to panics in the goroutine.
// res will be garbage collected at some point.
close(quit)
return
}
select {
case ee := <-res:
out <- ee
case <-ctx.Done():
close(quit)
return
}
}
}

func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, input <-chan []string) {
for {
select {
Expand Down
76 changes: 73 additions & 3 deletions consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestWatchConsulService(t *testing.T) {
errorFromService error
want []string
}{
{"simple", target{Service: "svc", Wait: time.Second},
{"simple", target{Target: "svc", Wait: time.Second},
[]*api.ServiceEntry{
&api.ServiceEntry{
Service: &api.AgentService{Address: "127.0.0.1", Port: 1024},
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestWatchConsulService(t *testing.T) {
}
}()
fconsul := mocks.NewMockservicer(ctrl)
fconsul.EXPECT().Service(tt.tgt.Service, tt.tgt.Tag, tt.tgt.Healthy, &api.QueryOptions{
fconsul.EXPECT().Service(tt.tgt.Target, tt.tgt.Tag, tt.tgt.Healthy, &api.QueryOptions{
WaitIndex: 0,
Near: tt.tgt.Near,
WaitTime: tt.tgt.Wait,
Expand All @@ -112,7 +112,7 @@ func TestWatchConsulService(t *testing.T) {
}).
Times(1).
Return(tt.services, &api.QueryMeta{LastIndex: 1}, tt.errorFromService)
fconsul.EXPECT().Service(tt.tgt.Service, tt.tgt.Tag, tt.tgt.Healthy, &api.QueryOptions{
fconsul.EXPECT().Service(tt.tgt.Target, tt.tgt.Tag, tt.tgt.Healthy, &api.QueryOptions{
WaitIndex: 1,
Near: tt.tgt.Near,
WaitTime: tt.tgt.Wait,
Expand All @@ -137,3 +137,73 @@ func TestWatchConsulService(t *testing.T) {
})
}
}

func TestWatchPeparedQuery(t *testing.T) {
tests := []struct {
name string
tgt target
responses []*api.PreparedQueryExecuteResponse
errorFromService error
want [][]string
}{
{"simple", target{Target: "myquery", PollInterval: 100 * time.Millisecond},
[]*api.PreparedQueryExecuteResponse{
{
Nodes: []api.ServiceEntry{
{
Service: &api.AgentService{Address: "127.0.0.1", Port: 1024},
},
},
},
{
Nodes: []api.ServiceEntry{
{
Service: &api.AgentService{Address: "127.0.0.2", Port: 1024},
},
},
},
},
nil,
[][]string{
{"127.0.0.1:1024"},
{"127.0.0.2:1024"},
},
},
// TODO: Add more tests-cases
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctrl := gomock.NewController(t)
defer ctrl.Finish()

i := 0
var out = make(chan []string, 1)
fconsul := mocks.NewMockquerier(ctrl)
fconsul.EXPECT().Execute(tt.tgt.Target, &api.QueryOptions{
Near: tt.tgt.Near,
Datacenter: tt.tgt.Dc,
AllowStale: tt.tgt.AllowStale,
RequireConsistent: tt.tgt.RequireConsistent,
}).
Times(len(tt.responses)).
DoAndReturn(func(arg0 string, arg1 *api.QueryOptions) (*api.PreparedQueryExecuteResponse, *api.QueryMeta, error) {
v := tt.responses[i]
i++
return v, &api.QueryMeta{}, tt.errorFromService
})

go watchPreparedQuery(ctx, fconsul, tt.tgt, out)

for _, want := range tt.want {
select {
case <-ctx.Done():
return
case got := <-out:
require.Equal(t, want, got)
}
}
})
}
}
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.13
require (
github.com/armon/go-metrics v0.3.2 // indirect
github.com/go-playground/form v3.1.4+incompatible
github.com/golang/mock v1.1.1
github.com/golang/mock v1.6.0
github.com/google/btree v1.0.0 // indirect
github.com/hashicorp/consul/api v1.3.0
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
Expand All @@ -16,9 +16,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/genproto v0.0.0-20200210034751-acff78025515 // indirect
google.golang.org/grpc v1.27.1
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
Expand Down
Loading