@@ -11,17 +11,14 @@ use std::borrow::Cow;
11
11
use std:: cmp:: Ordering ;
12
12
use std:: collections:: HashMap ;
13
13
use std:: fmt;
14
- use std:: fs:: File ;
15
- use std:: io:: Read ;
14
+ use std:: fs;
16
15
use std:: net:: SocketAddr ;
17
16
use std:: path:: Path ;
18
17
use std:: str;
19
18
use std:: sync:: atomic:: { AtomicBool , Ordering as AtomicOrdering } ;
20
19
use std:: sync:: Arc ;
21
20
22
- use failure:: Error ;
23
21
use futures:: { self , Future , Stream } ;
24
- use futures_cpupool:: CpuPool ;
25
22
use headers:: CacheControl ;
26
23
use headers:: Header ;
27
24
use headers:: { Authorization , ContentLength , ContentType } ;
@@ -686,8 +683,32 @@ pub fn handle_collected(
686
683
687
684
struct Server {
688
685
data : Arc < RwLock < InputData > > ,
689
- pool : CpuPool ,
690
- updating : Arc < AtomicBool > ,
686
+ updating : UpdatingStatus ,
687
+ }
688
+
689
+ struct UpdatingStatus ( Arc < AtomicBool > ) ;
690
+
691
+ struct IsUpdating ( Arc < AtomicBool > ) ;
692
+
693
+ impl Drop for IsUpdating {
694
+ fn drop ( & mut self ) {
695
+ self . 0 . store ( false , AtomicOrdering :: Release ) ;
696
+ }
697
+ }
698
+
699
+ impl UpdatingStatus {
700
+ fn new ( ) -> Self {
701
+ UpdatingStatus ( Arc :: new ( AtomicBool :: new ( false ) ) )
702
+ }
703
+
704
+ // Returns previous state
705
+ fn set_updating ( & self ) -> bool {
706
+ self . 0 . compare_and_swap ( false , true , AtomicOrdering :: AcqRel )
707
+ }
708
+
709
+ fn release_on_drop ( & self ) -> IsUpdating {
710
+ IsUpdating ( self . 0 . clone ( ) )
711
+ }
691
712
}
692
713
693
714
macro_rules! check_http_method {
@@ -808,7 +829,7 @@ impl Server {
808
829
let data = self . data . clone ( ) ;
809
830
let gh_header = req. headers ( ) . get ( "X-Hub-Signature" ) . cloned ( ) ;
810
831
let gh_header = gh_header. and_then ( |g| g. to_str ( ) . ok ( ) . map ( |s| s. to_owned ( ) ) ) ;
811
- Box :: new ( self . pool . spawn_fn ( move || {
832
+ Box :: new (
812
833
req. into_body ( )
813
834
. concat2 ( )
814
835
. map_err ( |e| ServerError ( format ! ( "{:?}" , e) ) )
@@ -868,15 +889,13 @@ impl Server {
868
889
. body ( hyper:: Body :: from ( err) )
869
890
. unwrap ( ) ,
870
891
}
871
- } )
872
- } ) )
892
+ } ) ,
893
+ )
873
894
}
874
895
875
896
fn handle_push ( & self , _req : Request ) -> ServerFut {
876
897
// set to updating
877
- let was_updating = self
878
- . updating
879
- . compare_and_swap ( false , true , AtomicOrdering :: AcqRel ) ;
898
+ let was_updating = self . updating . set_updating ( ) ;
880
899
881
900
if was_updating {
882
901
return Box :: new ( futures:: future:: ok (
@@ -895,14 +914,14 @@ impl Server {
895
914
debug ! ( "received onpush hook" ) ;
896
915
897
916
let rwlock = self . data . clone ( ) ;
898
- let updating = self . updating . clone ( ) ;
899
- let response = self . pool . spawn_fn ( move || -> Result < ( ) , Error > {
900
- let repo_path = get_repo_path ( ) ? ;
917
+ let updating = self . updating . release_on_drop ( ) ;
918
+ let _ = std :: thread :: spawn ( move || {
919
+ let repo_path = get_repo_path ( ) . unwrap ( ) ;
901
920
902
- git:: update_repo ( & repo_path) ? ;
921
+ git:: update_repo ( & repo_path) . unwrap ( ) ;
903
922
904
923
info ! ( "updating from filesystem..." ) ;
905
- let new_data = InputData :: from_fs ( & repo_path) ? ;
924
+ let new_data = InputData :: from_fs ( & repo_path) . unwrap ( ) ;
906
925
debug ! ( "last date = {:?}" , new_data. last_date) ;
907
926
908
927
// Retrieve the stored InputData from the request.
@@ -911,29 +930,12 @@ impl Server {
911
930
// Write the new data back into the request
912
931
* data = new_data;
913
932
914
- updating. store ( false , AtomicOrdering :: Release ) ;
915
-
916
- Ok ( ( ) )
933
+ std:: mem:: drop ( updating) ;
917
934
} ) ;
918
935
919
- let updating = self . updating . clone ( ) ;
920
- Box :: new (
921
- response
922
- . map ( |_| Response :: new ( hyper:: Body :: from ( "Successfully updated!" ) ) )
923
- . or_else ( move |err| {
924
- updating. store ( false , AtomicOrdering :: Release ) ;
925
- futures:: future:: ok (
926
- http:: Response :: builder ( )
927
- . status ( StatusCode :: INTERNAL_SERVER_ERROR )
928
- . header_typed ( ContentType :: text_utf8 ( ) )
929
- . body ( hyper:: Body :: from ( format ! (
930
- "Internal Server Error: {:?}" ,
931
- err
932
- ) ) )
933
- . unwrap ( ) ,
934
- )
935
- } ) ,
936
- )
936
+ Box :: new ( futures:: future:: ok ( Response :: new ( hyper:: Body :: from (
937
+ "Queued update" ,
938
+ ) ) ) )
937
939
}
938
940
}
939
941
@@ -972,12 +974,10 @@ impl Server {
972
974
}
973
975
974
976
if Path :: new ( & fs_path) . is_file ( ) {
975
- return Box :: new ( self . pool . spawn_fn ( move || {
976
- let mut f = File :: open ( & fs_path) . unwrap ( ) ;
977
- let mut source = Vec :: new ( ) ;
978
- f. read_to_end ( & mut source) . unwrap ( ) ;
979
- futures:: future:: ok ( Response :: new ( hyper:: Body :: from ( source) ) )
980
- } ) ) ;
977
+ let source = fs:: read ( & fs_path) . unwrap ( ) ;
978
+ return Box :: new ( futures:: future:: ok ( Response :: new ( hyper:: Body :: from (
979
+ source,
980
+ ) ) ) ) ;
981
981
}
982
982
983
983
match req. uri ( ) . path ( ) {
@@ -1018,8 +1018,7 @@ fn verify_gh_sig(cfg: &Config, header: &str, body: &[u8]) -> Option<bool> {
1018
1018
pub fn start ( data : InputData , port : u16 ) {
1019
1019
let server = Arc :: new ( Server {
1020
1020
data : Arc :: new ( RwLock :: new ( data) ) ,
1021
- pool : CpuPool :: new_num_cpus ( ) ,
1022
- updating : Arc :: new ( AtomicBool :: new ( false ) ) ,
1021
+ updating : UpdatingStatus :: new ( ) ,
1023
1022
} ) ;
1024
1023
let mut server_address: SocketAddr = "0.0.0.0:2346" . parse ( ) . unwrap ( ) ;
1025
1024
server_address. set_port ( port) ;
0 commit comments