From 99bd668c58403d2676218ca669b38bf7f98f46fb Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 26 Jan 2024 09:47:20 -0800 Subject: [PATCH] feat: Improve sync service (#97) Signed-off-by: Kavindu Dodanduwa --- sync/go.mod | 18 ++--- sync/go.sum | 40 ++++++----- sync/pkg/file_watcher.go | 150 +++++++++++++++++++++++++++++++++++++++ sync/pkg/server.go | 134 ++++++++++++---------------------- 4 files changed, 226 insertions(+), 116 deletions(-) create mode 100644 sync/pkg/file_watcher.go diff --git a/sync/go.mod b/sync/go.mod index 892586e..ee7999a 100644 --- a/sync/go.mod +++ b/sync/go.mod @@ -3,18 +3,18 @@ module github.com/open-feature/test-harness/sync go 1.19 require ( - buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20230822184021-85780df4e019.1 - buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.31.0-20230822184021-85780df4e019.1 + buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20231031123731-ac2ec0f39838.2 + buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.32.0-20231031123731-ac2ec0f39838.1 github.com/fsnotify/fsnotify v1.7.0 - golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 - google.golang.org/grpc v1.60.1 + golang.org/x/exp v0.0.0-20240119083558-1b970713d09a + google.golang.org/grpc v1.61.0 ) require ( github.com/golang/protobuf v1.5.3 // indirect - golang.org/x/net v0.16.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect - google.golang.org/protobuf v1.31.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/protobuf v1.32.0 // indirect ) diff --git a/sync/go.sum b/sync/go.sum index 9b6d5ae..052309e 100644 --- a/sync/go.sum +++ b/sync/go.sum @@ -1,11 +1,14 @@ -buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.3.0-20220906183531-bc28b723cd77.1/go.mod h1:9Ec7rvBnjfZvU/TnWjtcSGgiLQ4B+U3B+6SnZgVTA7A= +buf.build/gen/go/grpc-ecosystem/grpc-gateway/grpc/go v1.3.0-20220906183531-bc28b723cd77.2/go.mod h1:9Ec7rvBnjfZvU/TnWjtcSGgiLQ4B+U3B+6SnZgVTA7A= buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.28.1-20220906183531-bc28b723cd77.4/go.mod h1:92ejKVTiuvnKoAtRlpJpIxKfloI935DDqhs0NCRx+KM= buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.31.0-20220906183531-bc28b723cd77.1/go.mod h1:/j/LOrpev/FdyGhdj/sOc0peUf2KR0y4nMmLp4t1g14= -buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20230822184021-85780df4e019.1 h1:HLshnel7u38r612EiBfIW8teClLMZP/n9nngsWWv54w= -buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20230822184021-85780df4e019.1/go.mod h1:0h1E7p4VbvjuwvKhBkpLvHdLLDoeSsCvPJ7hRNOvGOk= -buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20230822184021-85780df4e019.4/go.mod h1:+Bnrjo56uVn/aBcLWchTveR8UeCj+KSJN4fE0xSmBNc= +buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20220906183531-bc28b723cd77.1/go.mod h1:XdxZqtnJr4q2vF8RLEhC903d6Nxxtz1vEMTyTAyvSUs= +buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20231031123731-ac2ec0f39838.2 h1:DCww6WQNaepShZVh/jDVpIfCHQy5QwrpKl8iYAZeaV8= +buf.build/gen/go/open-feature/flagd/grpc/go v1.3.0-20231031123731-ac2ec0f39838.2/go.mod h1:NmrKm2OIzFV3sUPs9cWMCmbYeCM3xVEzt4YzFgY5HO4= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.28.1-20231031123731-ac2ec0f39838.4/go.mod h1:+Bnrjo56uVn/aBcLWchTveR8UeCj+KSJN4fE0xSmBNc= buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.31.0-20230822184021-85780df4e019.1 h1:wUBsJv3Ay8PI+6acTN7DV2oVYQb+YjJd6z5bm5dNjxE= buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.31.0-20230822184021-85780df4e019.1/go.mod h1:kmgvCcQsQkpZ5yNLgO2B22ysHjnfm3IyzymmFHB60vY= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.32.0-20231031123731-ac2ec0f39838.1 h1:3lJuvwwH33XzkBJiLTNDfRHd5lc/MtQzXjCTQFLr8pU= +buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.32.0-20231031123731-ac2ec0f39838.1/go.mod h1:dDONV5ZtaBXX8FrzzvKWRAiSp6eTyhGXJ/+6POKBnMY= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -434,8 +437,6 @@ github.com/envoyproxy/go-control-plane v0.10.3/go.mod h1:fJJn/j26vwOu972OllsvAgJ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= -github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -606,10 +607,10 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= -golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 h1:+iq7lrkxmFNBM7xx+Rae2W6uyPfhPeDWD+n+JgppptE= golang.org/x/exp v0.0.0-20231219180239-dc181d75b848/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -689,10 +690,10 @@ golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -799,13 +800,12 @@ golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -822,10 +822,10 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1063,10 +1063,10 @@ google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZV google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97/go.mod h1:v7nGkzlmW8P3n/bKmWBn2WpBjpOEx8Q6gMueudAmKfY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1105,10 +1105,10 @@ google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= +google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= +google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -1127,6 +1127,8 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/sync/pkg/file_watcher.go b/sync/pkg/file_watcher.go new file mode 100644 index 0000000..f313a4f --- /dev/null +++ b/sync/pkg/file_watcher.go @@ -0,0 +1,150 @@ +package sync + +import ( + "encoding/json" + "github.com/fsnotify/fsnotify" + "golang.org/x/exp/maps" + "log" + "os" + "sync" + "time" +) + +// fileWatcher watches given file paths for updates and allow subscriptions for updates +type fileWatcher struct { + data []byte + paths []string + subs map[interface{}]chan<- []byte + mu sync.Mutex +} + +func (w *fileWatcher) init() error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + + for _, filePath := range w.paths { + err := watcher.Add(filePath) + if err != nil { + log.Printf("Error watching file %s, caused by %s\n", filePath, err.Error()) + return err + } + } + + // initial read & store + w.data, err = readFlags(w.paths) + if err != nil { + log.Printf("Error reading flag data: %v\n", err) + return err + } + + // run watcher in background + go func() { + // Start listening for file events. + for { + select { + case event, ok := <-watcher.Events: + if !ok { + log.Println("Unable to process file event, continuing") + continue + } + + if event.Has(fsnotify.Write) { + marshalled, err := readFlags(w.paths) + if err != nil { + log.Printf("Error reading flags: %s, continuing \n", err.Error()) + continue + } + + // store latest + w.updateData(marshalled) + } + case err := <-watcher.Errors: + log.Printf("Error in file watcher: %s, exiting watcher\n", err.Error()) + return + } + } + }() + + return nil +} + +func (w *fileWatcher) getCurrentData() []byte { + w.mu.Lock() + defer w.mu.Unlock() + + return w.data +} + +func (w *fileWatcher) updateData(data []byte) { + w.mu.Lock() + defer w.mu.Unlock() + + w.data = data + + // push to subs + for _, v := range w.subs { + v <- w.data + } +} + +func (w *fileWatcher) subscribe(channel chan<- []byte) { + w.mu.Lock() + defer w.mu.Unlock() + w.subs[channel] = channel +} + +func (w *fileWatcher) unSubscribe(channel chan<- []byte) { + w.mu.Lock() + defer w.mu.Unlock() + + delete(w.subs, channel) +} + +// readFlags is a helper to read given files and combine flags in them +func readFlags(filePaths []string) ([]byte, error) { + flags := make(map[string]any) + evaluators := make(map[string]any) + + for _, path := range filePaths { + bytes, err := os.ReadFile(path) + if err != nil { + log.Printf("File read error %s\n", err.Error()) + return nil, err + } + + for len(bytes) == 0 { + // this is a fitly hack + // file writes are NOT atomic and often when they are occur they have transitional empty states + // this "re-reads" the file in these cases a bit later + log.Printf("File content not ready for %s, busy wait\n", path) + time.Sleep(100 * time.Millisecond) + bytes, err = os.ReadFile(path) + if err != nil { + log.Printf("File read error %s\n", err.Error()) + return nil, err + } + } + parsed := make(map[string]map[string]any) + err = json.Unmarshal(bytes, &parsed) + if err != nil { + log.Printf("JSON unmarshal error %s\n", err.Error()) + return nil, err + } + maps.Copy(flags, parsed["flags"]) + maps.Copy(evaluators, parsed["$evaluators"]) + } + + payload := make(map[string]any) + payload["flags"] = flags + payload["$evaluators"] = evaluators + marshalled, err := json.Marshal(payload) + if err != nil { + log.Printf("JSON marshal error %s\n", err.Error()) + return nil, err + } + + log.Println("Update complete") + return marshalled, nil +} diff --git a/sync/pkg/server.go b/sync/pkg/server.go index 701c7c9..ea0c5d9 100644 --- a/sync/pkg/server.go +++ b/sync/pkg/server.go @@ -1,23 +1,16 @@ package sync import ( + "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" + v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" "context" "crypto/tls" - "encoding/json" - "errors" "fmt" - "log" - "net" - "os" - "time" - - "github.com/fsnotify/fsnotify" - "golang.org/x/exp/maps" - - "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" - v1 "buf.build/gen/go/open-feature/flagd/protocolbuffers/go/sync/v1" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "log" + "net" + "sync" ) type Server struct { @@ -39,6 +32,11 @@ func (s *Server) Start() { server := grpc.NewServer(options...) sync, err := NewSyncImpl(s.Config.Files.Array) + if err != nil { + log.Printf("Error configuring the server : %s\n", err.Error()) + return + } + syncv1grpc.RegisterFlagSyncServiceServer(server, &sync) fmt.Printf("Server listening : %s\n", s.Config.Host+":"+s.Config.Port) @@ -67,108 +65,68 @@ func (s *Server) buildOptions() ([]grpc.ServerOption, error) { // SyncImpl implements the flagd Sync contract type SyncImpl struct { - filePaths []string - watcher *fsnotify.Watcher + fw *fileWatcher } func NewSyncImpl(filePaths []string) (SyncImpl, error) { - watcher, err := fsnotify.NewWatcher() - for _, filePath := range filePaths { + fw := &fileWatcher{ + paths: filePaths, + subs: make(map[interface{}]chan<- []byte), + mu: sync.Mutex{}, + } - if err != nil { - return SyncImpl{}, err - } - watcher.Add(filePath) + err := fw.init() + if err != nil { + log.Printf("Error starting file watcher %s\n", err.Error()) + return SyncImpl{}, err } + return SyncImpl{ - filePaths, - watcher, + fw, }, nil } func (s *SyncImpl) SyncFlags(req *v1.SyncFlagsRequest, stream syncv1grpc.FlagSyncService_SyncFlagsServer) error { - log.Printf("Requesting flags for provider : %s", req.ProviderId) + log.Printf("Requesting flags for provider: %s\n", req.ProviderId) - marshalled, err := s.readFlags() - if err != nil { - log.Println("error reading flags:", err) - } - err = stream.Send(&v1.SyncFlagsResponse{ - FlagConfiguration: string(marshalled), + // initial read + err := stream.Send(&v1.SyncFlagsResponse{ + FlagConfiguration: string(s.fw.getCurrentData()), State: v1.SyncState_SYNC_STATE_ALL, }) if err != nil { - log.Println("error sending initial stream:", err) + log.Printf("Error sending initial stream: %v\n", err) + return err } + listener := make(chan []byte) + s.fw.subscribe(listener) + // Start listening for events. for { select { - case event, ok := <-s.watcher.Events: - if !ok { - message := "unable to process file event" + case data := <-listener: + err = stream.Send(&v1.SyncFlagsResponse{ + FlagConfiguration: string(data), + State: v1.SyncState_SYNC_STATE_ALL, + }) + if err != nil { + // this is probably a close + message := fmt.Sprintf("error sending stream, likely closed: %v", err) log.Println(message) - return errors.New(message) - } - if event.Has(fsnotify.Write) { - marshalled, err := s.readFlags() - if err != nil { - log.Println("error reading flags:", err) - } - err = stream.Send(&v1.SyncFlagsResponse{ - FlagConfiguration: string(marshalled), - State: v1.SyncState_SYNC_STATE_ALL, - }) - if err != nil { - // this is probably a close - message := fmt.Sprintf("error sending stream, likely closed: %v", err) - log.Println(message) - return nil - } + return nil } - case err, _ := <-s.watcher.Errors: - log.Println("error in file watcher:", err) + case <-stream.Context().Done(): + s.fw.unSubscribe(listener) + log.Printf("Stream completed for provider: %s\n", req.ProviderId) + return nil } } } -func (s *SyncImpl) readFlags() ([]byte, error) { - flags := make(map[string]any) - evaluators := make(map[string]any) - - for _, path := range s.filePaths { - bytes, err := os.ReadFile(path) - for len(bytes) == 0 { - // this is a fitly hack - // file writes are NOT atomic and often when they are occur they have transitional empty states - // this "re-reads" the file in these cases a bit later - time.Sleep(10 * time.Millisecond) - bytes, err = os.ReadFile(path) - } - if err != nil { - return nil, err - } - parsed := make(map[string]map[string]any) - json.Unmarshal(bytes, &parsed) - maps.Copy(flags, parsed["flags"]) - maps.Copy(evaluators, parsed["$evaluators"]) - } - - payload := make(map[string]any) - payload["flags"] = flags - payload["$evaluators"] = evaluators - marshalled, err := json.Marshal(payload) - if err != nil { - return nil, err - } - return marshalled, nil -} - func (s *SyncImpl) FetchAllFlags(context.Context, *v1.FetchAllFlagsRequest) (*v1.FetchAllFlagsResponse, error) { - marshalled, err := s.readFlags() - if err != nil { - log.Println("error reading flags:", err) - } + marshalled := s.fw.getCurrentData() + return &v1.FetchAllFlagsResponse{ FlagConfiguration: string(marshalled), }, nil