diff --git a/.gitignore b/.gitignore index 66fd13c..1df751a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ +.idea/ +main/ +.mitm/ \ No newline at end of file diff --git a/README.md b/README.md index 28cdccb..3ba40a5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,12 @@ # nats-transport HTTP to NATS (https://nats.io/) golang transport (http.RoundTripper) implementation + +inspired by and based on https://github.com/sohlich/nats-proxy + +## dev + +generate protobuf + +```bash +protoc --go_opt=paths=source_relative --go_out=. protobuf.proto +``` \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..08c156b --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/jar3b/nats-transport + +go 1.17 + +require ( + github.com/golang/protobuf v1.5.2 + github.com/nats-io/nats.go v1.12.3 + google.golang.org/protobuf v1.27.1 +) + +require ( + github.com/nats-io/nats-server/v2 v2.6.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6ce886c --- /dev/null +++ b/go.sum @@ -0,0 +1,63 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.6.0 h1:OAt+ef+9QaaNdn4uTyQC372bv1ZZqC0vZ1I9YxWqjwI= +github.com/nats-io/nats-server/v2 v2.6.0/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= +github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/protobuf.pb.go b/protobuf.pb.go new file mode 100644 index 0000000..2f45a44 --- /dev/null +++ b/protobuf.pb.go @@ -0,0 +1,379 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.14.0 +// source: protobuf.proto + +package nats_transport + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Values struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Arr []string `protobuf:"bytes,1,rep,name=arr,proto3" json:"arr,omitempty"` +} + +func (x *Values) Reset() { + *x = Values{} + if protoimpl.UnsafeEnabled { + mi := &file_protobuf_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Values) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Values) ProtoMessage() {} + +func (x *Values) ProtoReflect() protoreflect.Message { + mi := &file_protobuf_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Values.ProtoReflect.Descriptor instead. +func (*Values) Descriptor() ([]byte, []int) { + return file_protobuf_proto_rawDescGZIP(), []int{0} +} + +func (x *Values) GetArr() []string { + if x != nil { + return x.Arr + } + return nil +} + +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Proto string `protobuf:"bytes,1,opt,name=Proto,proto3" json:"Proto,omitempty"` + Scheme string `protobuf:"bytes,2,opt,name=Scheme,proto3" json:"Scheme,omitempty"` + Host string `protobuf:"bytes,3,opt,name=Host,proto3" json:"Host,omitempty"` + URL string `protobuf:"bytes,4,opt,name=URL,proto3" json:"URL,omitempty"` + Method string `protobuf:"bytes,5,opt,name=Method,proto3" json:"Method,omitempty"` + RemoteAddr string `protobuf:"bytes,6,opt,name=RemoteAddr,proto3" json:"RemoteAddr,omitempty"` + Body []byte `protobuf:"bytes,7,opt,name=Body,proto3" json:"Body,omitempty"` + Header map[string]*Values `protobuf:"bytes,8,rep,name=Header,proto3" json:"Header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_protobuf_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_protobuf_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_protobuf_proto_rawDescGZIP(), []int{1} +} + +func (x *Request) GetProto() string { + if x != nil { + return x.Proto + } + return "" +} + +func (x *Request) GetScheme() string { + if x != nil { + return x.Scheme + } + return "" +} + +func (x *Request) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *Request) GetURL() string { + if x != nil { + return x.URL + } + return "" +} + +func (x *Request) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (x *Request) GetRemoteAddr() string { + if x != nil { + return x.RemoteAddr + } + return "" +} + +func (x *Request) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +func (x *Request) GetHeader() map[string]*Values { + if x != nil { + return x.Header + } + return nil +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode int32 `protobuf:"varint,1,opt,name=StatusCode,proto3" json:"StatusCode,omitempty"` + Header map[string]*Values `protobuf:"bytes,2,rep,name=Header,proto3" json:"Header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Body []byte `protobuf:"bytes,3,opt,name=Body,proto3" json:"Body,omitempty"` + Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"Error,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_protobuf_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_protobuf_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_protobuf_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetStatusCode() int32 { + if x != nil { + return x.StatusCode + } + return 0 +} + +func (x *Response) GetHeader() map[string]*Values { + if x != nil { + return x.Header + } + return nil +} + +func (x *Response) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +func (x *Response) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +var File_protobuf_proto protoreflect.FileDescriptor + +var file_protobuf_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x22, 0x1a, 0x0a, 0x06, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x72, + 0x72, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x03, 0x61, 0x72, 0x72, 0x22, 0x9b, 0x02, 0x0a, + 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x16, + 0x0a, 0x06, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x48, 0x6f, 0x73, 0x74, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x48, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, + 0x4c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x16, 0x0a, 0x06, + 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x41, 0x64, + 0x64, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, + 0x41, 0x64, 0x64, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2c, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x1a, 0x42, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1d, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xc7, 0x01, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, + 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, + 0x1a, 0x42, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, + 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, + 0x79, 0x12, 0x1d, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x07, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x2c, 0x5a, 0x11, 0x2e, 0x2f, 0x3b, 0x6e, 0x61, 0x74, 0x73, 0x5f, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0xaa, 0x02, 0x16, 0x4e, 0x61, 0x74, 0x73, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_protobuf_proto_rawDescOnce sync.Once + file_protobuf_proto_rawDescData = file_protobuf_proto_rawDesc +) + +func file_protobuf_proto_rawDescGZIP() []byte { + file_protobuf_proto_rawDescOnce.Do(func() { + file_protobuf_proto_rawDescData = protoimpl.X.CompressGZIP(file_protobuf_proto_rawDescData) + }) + return file_protobuf_proto_rawDescData +} + +var file_protobuf_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_protobuf_proto_goTypes = []interface{}{ + (*Values)(nil), // 0: Values + (*Request)(nil), // 1: Request + (*Response)(nil), // 2: Response + nil, // 3: Request.HeaderEntry + nil, // 4: Response.HeaderEntry +} +var file_protobuf_proto_depIdxs = []int32{ + 3, // 0: Request.Header:type_name -> Request.HeaderEntry + 4, // 1: Response.Header:type_name -> Response.HeaderEntry + 0, // 2: Request.HeaderEntry.value:type_name -> Values + 0, // 3: Response.HeaderEntry.value:type_name -> Values + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_protobuf_proto_init() } +func file_protobuf_proto_init() { + if File_protobuf_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_protobuf_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Values); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protobuf_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protobuf_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_protobuf_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_protobuf_proto_goTypes, + DependencyIndexes: file_protobuf_proto_depIdxs, + MessageInfos: file_protobuf_proto_msgTypes, + }.Build() + File_protobuf_proto = out.File + file_protobuf_proto_rawDesc = nil + file_protobuf_proto_goTypes = nil + file_protobuf_proto_depIdxs = nil +} diff --git a/protobuf.proto b/protobuf.proto new file mode 100644 index 0000000..80e8929 --- /dev/null +++ b/protobuf.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +option go_package = "./;nats_transport"; +option csharp_namespace = "NatsTransport.Messages"; + + +message Values { + repeated string arr = 1; +} + +message Request { + string Proto = 1; + string Scheme = 2; + string Host = 3; + string URL = 4; + string Method = 5; + string RemoteAddr = 6; + bytes Body = 7; + map Header = 8; +} + +message Response { + int32 StatusCode = 1; + map Header = 2; + bytes Body = 3; + string Error = 4; +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..5e534ae --- /dev/null +++ b/request.go @@ -0,0 +1,38 @@ +package nats_transport + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" +) + +func (r *Request) FromHTTP(req *http.Request) error { + if req == nil { + return errors.New("nats_transport: request cannot be nil") + } + + buf, err := ioutil.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("nats_transport: cannot read request body") + } + // bufReader := ioutil.NopCloser(bytes.NewBuffer(buf)) + + r.Proto = req.Proto + r.Scheme = req.URL.Scheme + r.Host = req.Host + r.URL = req.URL.String() + r.Method = req.Method + r.Header = copyMap(req.Header) + r.RemoteAddr = req.RemoteAddr + r.Body = buf + + return nil +} + +func NewRequest() *Request { + return &Request{ + Header: make(map[string]*Values), + Body: make([]byte, 0), + } +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..84500b7 --- /dev/null +++ b/response.go @@ -0,0 +1,52 @@ +package nats_transport + +import ( + "bytes" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "io/ioutil" + "net/http" +) + +// NewResponse creates blank initialized Response object. +func NewResponse() *Response { + return &Response{ + StatusCode: int32(200), + Header: make(map[string]*Values, 0), + Body: make([]byte, 0), + } +} + +func (resp *Response) ReadFrom(responseData []byte) error { + if responseData == nil || len(responseData) == 0 { + return errors.New("response content is empty") + } + if err := proto.Unmarshal(responseData, resp); err != nil { + return err + } + return nil +} + +func (resp *Response) ToHTTPResponse(r *http.Request) (*http.Response, error) { + httpResponse := http.Response{ + Status: fmt.Sprintf("%d %s", resp.StatusCode, http.StatusText(int(resp.StatusCode))), + StatusCode: int(resp.StatusCode), + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Body: ioutil.NopCloser(bytes.NewBuffer(resp.Body)), + ContentLength: int64(len(resp.Body)), + Request: r, + Header: make(http.Header, 0), + } + + // copy headers + for headerName, headerValues := range resp.Header { + for _, headerValue := range headerValues.Arr { + httpResponse.Header.Add(headerName, headerValue) + } + } + + return &httpResponse, nil +} diff --git a/transport.go b/transport.go new file mode 100644 index 0000000..272a133 --- /dev/null +++ b/transport.go @@ -0,0 +1,88 @@ +package nats_transport + +import ( + "fmt" + "github.com/golang/protobuf/proto" + "github.com/nats-io/nats.go" + "net/http" + "time" +) + +// SubjectResolveFunc resolves the NATS subject (based on http.Request struct) +type SubjectResolveFunc func(r *http.Request) string + +// ModifyRequestHookFunc is the hook to modify Request struct (protobuf) +type ModifyRequestHookFunc = func(req *http.Request, r *Request) + +type NatsTransport struct { + // NatsConnection connection to NATS server + NatsConnection *nats.Conn + + // Subject NATS subject to push wrapped HTTP request on + Subject string + + // SubjectResolver used only if Subject is not specified + SubjectResolver SubjectResolveFunc + + // ModifyRequestHook calls after Request struct parsed from HTTP and before sent to NATS + ModifyRequestHook ModifyRequestHookFunc + + // Timeout NATS request timeout + Timeout time.Duration +} + +func (nt NatsTransport) RoundTrip(r *http.Request) (*http.Response, error) { + request := NewRequest() + + if err := request.FromHTTP(r); err != nil { + return nil, fmt.Errorf("nats_transport: cannot parse HTTP request: %v", err) + } + + // call modify request hook + if nt.ModifyRequestHook != nil { + nt.ModifyRequestHook(r, request) + } + + // Serialize the request. + requestBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("nats_transport: cannot serialize request") + } + + // get the outgoing NATS subject + natsSubject := nt.Subject + if natsSubject == "" { + natsSubject = nt.SubjectResolver(r) + } + if natsSubject == "" { + return nil, fmt.Errorf("nats_transport: cannot detect NATS subject") + } + + // Send Request to NATS server + msg, err := nt.NatsConnection.Request( + natsSubject, + requestBytes, + nt.Timeout, + ) + if err != nil { + return nil, fmt.Errorf("nats_transport: cannot send NATS request") + } + + // Get Response object from NATS message + response := NewResponse() + if err := response.ReadFrom(msg.Data); err != nil { + return nil, fmt.Errorf("nats_transport: %v", err) + } + + if response.Error != "" { + return nil, fmt.Errorf("nats_transport: %s", response.Error) + } + + // prepare HTTP response + httpResponse, err := response.ToHTTPResponse(r) + if err != nil { + return nil, fmt.Errorf("nats_transport: %v", err) + } + + return httpResponse, nil +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..fc180ba --- /dev/null +++ b/util.go @@ -0,0 +1,13 @@ +package nats_transport + +// copy the values into protocol buffer +// struct +func copyMap(values map[string][]string) map[string]*Values { + headerMap := make(map[string]*Values, 0) + for k, v := range values { + headerMap[k] = &Values{ + Arr: v, + } + } + return headerMap +}