From 0b9114e50cf8a3c2b3abe89afa47bc31700c1f5d Mon Sep 17 00:00:00 2001 From: "jar3b@outlook.com" Date: Thu, 23 Sep 2021 02:42:00 +0300 Subject: [PATCH] feat: WIP --- .gitignore | 3 + README.md | 10 ++ go.mod | 16 +++ go.sum | 63 ++++++++ protobuf.pb.go | 379 +++++++++++++++++++++++++++++++++++++++++++++++++ protobuf.proto | 26 ++++ request.go | 38 +++++ response.go | 52 +++++++ transport.go | 88 ++++++++++++ util.go | 13 ++ 10 files changed, 688 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 protobuf.pb.go create mode 100644 protobuf.proto create mode 100644 request.go create mode 100644 response.go create mode 100644 transport.go create mode 100644 util.go diff --git a/.gitignore b/.gitignore index 66fd13c..1df751a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ +.idea/ +main/ +.mitm/ \ No newline at end of file diff --git a/README.md b/README.md index 28cdccb..3ba40a5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,12 @@ # nats-transport HTTP to NATS (https://nats.io/) golang transport (http.RoundTripper) implementation + +inspired by and based on https://github.com/sohlich/nats-proxy + +## dev + +generate protobuf + +```bash +protoc --go_opt=paths=source_relative --go_out=. protobuf.proto +``` \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..08c156b --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/jar3b/nats-transport + +go 1.17 + +require ( + github.com/golang/protobuf v1.5.2 + github.com/nats-io/nats.go v1.12.3 + google.golang.org/protobuf v1.27.1 +) + +require ( + github.com/nats-io/nats-server/v2 v2.6.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6ce886c --- /dev/null +++ b/go.sum @@ -0,0 +1,63 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= +github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= +github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.6.0 h1:OAt+ef+9QaaNdn4uTyQC372bv1ZZqC0vZ1I9YxWqjwI= +github.com/nats-io/nats-server/v2 v2.6.0/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo= +github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/protobuf.pb.go b/protobuf.pb.go new file mode 100644 index 0000000..2f45a44 --- /dev/null +++ b/protobuf.pb.go @@ -0,0 +1,379 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.14.0 +// source: protobuf.proto + +package nats_transport + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Values struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Arr []string `protobuf:"bytes,1,rep,name=arr,proto3" json:"arr,omitempty"` +} + +func (x *Values) Reset() { + *x = Values{} + if protoimpl.UnsafeEnabled { + mi := &file_protobuf_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Values) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Values) ProtoMessage() {} + +func (x *Values) ProtoReflect() protoreflect.Message { + mi := &file_protobuf_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Values.ProtoReflect.Descriptor instead. +func (*Values) Descriptor() ([]byte, []int) { + return file_protobuf_proto_rawDescGZIP(), []int{0} +} + +func (x *Values) GetArr() []string { + if x != nil { + return x.Arr + } + return nil +} + +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Proto string `protobuf:"bytes,1,opt,name=Proto,proto3" json:"Proto,omitempty"` + Scheme string `protobuf:"bytes,2,opt,name=Scheme,proto3" json:"Scheme,omitempty"` + Host string `protobuf:"bytes,3,opt,name=Host,proto3" json:"Host,omitempty"` + URL string `protobuf:"bytes,4,opt,name=URL,proto3" json:"URL,omitempty"` + Method string `protobuf:"bytes,5,opt,name=Method,proto3" json:"Method,omitempty"` + RemoteAddr string `protobuf:"bytes,6,opt,name=RemoteAddr,proto3" json:"RemoteAddr,omitempty"` + Body []byte `protobuf:"bytes,7,opt,name=Body,proto3" json:"Body,omitempty"` + Header map[string]*Values `protobuf:"bytes,8,rep,name=Header,proto3" json:"Header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_protobuf_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_protobuf_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_protobuf_proto_rawDescGZIP(), []int{1} +} + +func (x *Request) GetProto() string { + if x != nil { + return x.Proto + } + return "" +} + +func (x *Request) GetScheme() string { + if x != nil { + return x.Scheme + } + return "" +} + +func (x *Request) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *Request) GetURL() string { + if x != nil { + return x.URL + } + return "" +} + +func (x *Request) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (x *Request) GetRemoteAddr() string { + if x != nil { + return x.RemoteAddr + } + return "" +} + +func (x *Request) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +func (x *Request) GetHeader() map[string]*Values { + if x != nil { + return x.Header + } + return nil +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode int32 `protobuf:"varint,1,opt,name=StatusCode,proto3" json:"StatusCode,omitempty"` + Header map[string]*Values `protobuf:"bytes,2,rep,name=Header,proto3" json:"Header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Body []byte `protobuf:"bytes,3,opt,name=Body,proto3" json:"Body,omitempty"` + Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"Error,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_protobuf_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_protobuf_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_protobuf_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetStatusCode() int32 { + if x != nil { + return x.StatusCode + } + return 0 +} + +func (x *Response) GetHeader() map[string]*Values { + if x != nil { + return x.Header + } + return nil +} + +func (x *Response) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +func (x *Response) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +var File_protobuf_proto protoreflect.FileDescriptor + +var file_protobuf_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x22, 0x1a, 0x0a, 0x06, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x72, + 0x72, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x03, 0x61, 0x72, 0x72, 0x22, 0x9b, 0x02, 0x0a, + 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x16, + 0x0a, 0x06, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x53, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x48, 0x6f, 0x73, 0x74, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x48, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, + 0x4c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x55, 0x52, 0x4c, 0x12, 0x16, 0x0a, 0x06, + 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x41, 0x64, + 0x64, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, + 0x41, 0x64, 0x64, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2c, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x18, 0x08, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x1a, 0x42, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1d, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xc7, 0x01, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x2d, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, + 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, + 0x1a, 0x42, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, + 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, + 0x79, 0x12, 0x1d, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x07, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x2c, 0x5a, 0x11, 0x2e, 0x2f, 0x3b, 0x6e, 0x61, 0x74, 0x73, 0x5f, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0xaa, 0x02, 0x16, 0x4e, 0x61, 0x74, 0x73, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_protobuf_proto_rawDescOnce sync.Once + file_protobuf_proto_rawDescData = file_protobuf_proto_rawDesc +) + +func file_protobuf_proto_rawDescGZIP() []byte { + file_protobuf_proto_rawDescOnce.Do(func() { + file_protobuf_proto_rawDescData = protoimpl.X.CompressGZIP(file_protobuf_proto_rawDescData) + }) + return file_protobuf_proto_rawDescData +} + +var file_protobuf_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_protobuf_proto_goTypes = []interface{}{ + (*Values)(nil), // 0: Values + (*Request)(nil), // 1: Request + (*Response)(nil), // 2: Response + nil, // 3: Request.HeaderEntry + nil, // 4: Response.HeaderEntry +} +var file_protobuf_proto_depIdxs = []int32{ + 3, // 0: Request.Header:type_name -> Request.HeaderEntry + 4, // 1: Response.Header:type_name -> Response.HeaderEntry + 0, // 2: Request.HeaderEntry.value:type_name -> Values + 0, // 3: Response.HeaderEntry.value:type_name -> Values + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_protobuf_proto_init() } +func file_protobuf_proto_init() { + if File_protobuf_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_protobuf_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Values); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protobuf_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protobuf_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_protobuf_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_protobuf_proto_goTypes, + DependencyIndexes: file_protobuf_proto_depIdxs, + MessageInfos: file_protobuf_proto_msgTypes, + }.Build() + File_protobuf_proto = out.File + file_protobuf_proto_rawDesc = nil + file_protobuf_proto_goTypes = nil + file_protobuf_proto_depIdxs = nil +} diff --git a/protobuf.proto b/protobuf.proto new file mode 100644 index 0000000..80e8929 --- /dev/null +++ b/protobuf.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +option go_package = "./;nats_transport"; +option csharp_namespace = "NatsTransport.Messages"; + + +message Values { + repeated string arr = 1; +} + +message Request { + string Proto = 1; + string Scheme = 2; + string Host = 3; + string URL = 4; + string Method = 5; + string RemoteAddr = 6; + bytes Body = 7; + map Header = 8; +} + +message Response { + int32 StatusCode = 1; + map Header = 2; + bytes Body = 3; + string Error = 4; +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..5e534ae --- /dev/null +++ b/request.go @@ -0,0 +1,38 @@ +package nats_transport + +import ( + "errors" + "fmt" + "io/ioutil" + "net/http" +) + +func (r *Request) FromHTTP(req *http.Request) error { + if req == nil { + return errors.New("nats_transport: request cannot be nil") + } + + buf, err := ioutil.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("nats_transport: cannot read request body") + } + // bufReader := ioutil.NopCloser(bytes.NewBuffer(buf)) + + r.Proto = req.Proto + r.Scheme = req.URL.Scheme + r.Host = req.Host + r.URL = req.URL.String() + r.Method = req.Method + r.Header = copyMap(req.Header) + r.RemoteAddr = req.RemoteAddr + r.Body = buf + + return nil +} + +func NewRequest() *Request { + return &Request{ + Header: make(map[string]*Values), + Body: make([]byte, 0), + } +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..84500b7 --- /dev/null +++ b/response.go @@ -0,0 +1,52 @@ +package nats_transport + +import ( + "bytes" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "io/ioutil" + "net/http" +) + +// NewResponse creates blank initialized Response object. +func NewResponse() *Response { + return &Response{ + StatusCode: int32(200), + Header: make(map[string]*Values, 0), + Body: make([]byte, 0), + } +} + +func (resp *Response) ReadFrom(responseData []byte) error { + if responseData == nil || len(responseData) == 0 { + return errors.New("response content is empty") + } + if err := proto.Unmarshal(responseData, resp); err != nil { + return err + } + return nil +} + +func (resp *Response) ToHTTPResponse(r *http.Request) (*http.Response, error) { + httpResponse := http.Response{ + Status: fmt.Sprintf("%d %s", resp.StatusCode, http.StatusText(int(resp.StatusCode))), + StatusCode: int(resp.StatusCode), + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Body: ioutil.NopCloser(bytes.NewBuffer(resp.Body)), + ContentLength: int64(len(resp.Body)), + Request: r, + Header: make(http.Header, 0), + } + + // copy headers + for headerName, headerValues := range resp.Header { + for _, headerValue := range headerValues.Arr { + httpResponse.Header.Add(headerName, headerValue) + } + } + + return &httpResponse, nil +} diff --git a/transport.go b/transport.go new file mode 100644 index 0000000..272a133 --- /dev/null +++ b/transport.go @@ -0,0 +1,88 @@ +package nats_transport + +import ( + "fmt" + "github.com/golang/protobuf/proto" + "github.com/nats-io/nats.go" + "net/http" + "time" +) + +// SubjectResolveFunc resolves the NATS subject (based on http.Request struct) +type SubjectResolveFunc func(r *http.Request) string + +// ModifyRequestHookFunc is the hook to modify Request struct (protobuf) +type ModifyRequestHookFunc = func(req *http.Request, r *Request) + +type NatsTransport struct { + // NatsConnection connection to NATS server + NatsConnection *nats.Conn + + // Subject NATS subject to push wrapped HTTP request on + Subject string + + // SubjectResolver used only if Subject is not specified + SubjectResolver SubjectResolveFunc + + // ModifyRequestHook calls after Request struct parsed from HTTP and before sent to NATS + ModifyRequestHook ModifyRequestHookFunc + + // Timeout NATS request timeout + Timeout time.Duration +} + +func (nt NatsTransport) RoundTrip(r *http.Request) (*http.Response, error) { + request := NewRequest() + + if err := request.FromHTTP(r); err != nil { + return nil, fmt.Errorf("nats_transport: cannot parse HTTP request: %v", err) + } + + // call modify request hook + if nt.ModifyRequestHook != nil { + nt.ModifyRequestHook(r, request) + } + + // Serialize the request. + requestBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("nats_transport: cannot serialize request") + } + + // get the outgoing NATS subject + natsSubject := nt.Subject + if natsSubject == "" { + natsSubject = nt.SubjectResolver(r) + } + if natsSubject == "" { + return nil, fmt.Errorf("nats_transport: cannot detect NATS subject") + } + + // Send Request to NATS server + msg, err := nt.NatsConnection.Request( + natsSubject, + requestBytes, + nt.Timeout, + ) + if err != nil { + return nil, fmt.Errorf("nats_transport: cannot send NATS request") + } + + // Get Response object from NATS message + response := NewResponse() + if err := response.ReadFrom(msg.Data); err != nil { + return nil, fmt.Errorf("nats_transport: %v", err) + } + + if response.Error != "" { + return nil, fmt.Errorf("nats_transport: %s", response.Error) + } + + // prepare HTTP response + httpResponse, err := response.ToHTTPResponse(r) + if err != nil { + return nil, fmt.Errorf("nats_transport: %v", err) + } + + return httpResponse, nil +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..fc180ba --- /dev/null +++ b/util.go @@ -0,0 +1,13 @@ +package nats_transport + +// copy the values into protocol buffer +// struct +func copyMap(values map[string][]string) map[string]*Values { + headerMap := make(map[string]*Values, 0) + for k, v := range values { + headerMap[k] = &Values{ + Arr: v, + } + } + return headerMap +}