2
2
using rubberduckvba . Server . ContentSynchronization . Pipeline . Sections . Context ;
3
3
using rubberduckvba . Server . Model ;
4
4
using rubberduckvba . Server . Services ;
5
+ using rubberduckvba . Server . Services . rubberduckdb ;
5
6
using System . Threading . Tasks . Dataflow ;
6
7
7
8
namespace rubberduckvba . Server . ContentSynchronization . Pipeline . Sections . SyncTags ;
8
9
9
10
public class SyncTagsSection : PipelineSection < SyncContext >
10
11
{
11
- public SyncTagsSection ( IPipeline < SyncContext , bool > parent , CancellationTokenSource tokenSource , ILogger logger , IRubberduckDbService content , IGitHubClientService github , IStagingServices staging )
12
+ public SyncTagsSection ( IPipeline < SyncContext , bool > parent , CancellationTokenSource tokenSource , ILogger logger , TagServices tagServices , IGitHubClientService github , IStagingServices staging )
12
13
: base ( parent , tokenSource , logger )
13
14
{
14
- ReceiveRequest = new ReceiveRequestBlock ( this , tokenSource , logger ) ;
15
- BroadcastParameters = new BroadcastParametersBlock ( this , tokenSource , logger ) ;
16
- AcquireDbMainTag = new AcquireDbMainTagGraphBlock ( this , tokenSource , content , logger ) ;
17
- AcquireDbNextTag = new AcquireDbNextTagGraphBlock ( this , tokenSource , content , logger ) ;
18
- JoinDbTags = new DataflowJoinBlock < TagGraph , TagGraph > ( this , tokenSource , logger , nameof ( JoinDbTags ) ) ;
19
- LoadDbTags = new LoadDbLatestTagsBlock ( this , tokenSource , logger ) ;
20
- LoadGitHubTags = new LoadGitHubTagsBlock ( this , tokenSource , github , logger ) ;
21
- JoinTags = new DataflowJoinBlock < SyncContext , SyncContext > ( this , tokenSource , logger , nameof ( JoinTags ) ) ;
22
- BroadcastTags = new BroadcastTagsBlock ( this , tokenSource , logger ) ;
23
- StreamGitHubTags = new StreamGitHubTagsBlock ( this , tokenSource , logger ) ;
24
- GetTagAssets = new GetTagAssetsBlock ( this , tokenSource , github , logger ) ;
25
- TagBuffer = new TagBufferBlock ( this , tokenSource , logger ) ;
26
- AccumulateProcessedTags = new AccumulateProcessedTagsBlock ( this , tokenSource , logger ) ;
27
- SaveTags = new BulkSaveStagingBlock ( this , tokenSource , staging , logger ) ;
15
+ SynchronizeTags = new SynchronizeTagsBlock ( this , tokenSource , logger , tagServices , github ) ;
16
+ //ReceiveRequest = new ReceiveRequestBlock(this, tokenSource, logger);
17
+ //BroadcastParameters = new BroadcastParametersBlock(this, tokenSource, logger);
18
+ //AcquireDbMainTag = new AcquireDbMainTagGraphBlock(this, tokenSource, content, logger);
19
+ //AcquireDbNextTag = new AcquireDbNextTagGraphBlock(this, tokenSource, content, logger);
20
+ //JoinDbTags = new DataflowJoinBlock<TagGraph, TagGraph>(this, tokenSource, logger, nameof(JoinDbTags));
21
+ //LoadDbTags = new LoadDbLatestTagsBlock(this, tokenSource, logger);
22
+ //LoadGitHubTags = new LoadGitHubTagsBlock(this, tokenSource, github, logger);
23
+ //JoinTags = new DataflowJoinBlock<SyncContext, SyncContext>(this, tokenSource, logger, nameof(JoinTags));
24
+ //BroadcastTags = new BroadcastTagsBlock(this, tokenSource, logger);
25
+ //StreamGitHubTags = new StreamGitHubTagsBlock(this, tokenSource, logger);
26
+ //GetTagAssets = new GetTagAssetsBlock(this, tokenSource, github, logger);
27
+ //TagBuffer = new TagBufferBlock(this, tokenSource, logger);
28
+ //AccumulateProcessedTags = new AccumulateProcessedTagsBlock(this, tokenSource, logger);
29
+ //SaveTags = new BulkSaveStagingBlock(this, tokenSource, staging, logger);
28
30
}
29
31
30
- #region blocks
31
- private ReceiveRequestBlock ReceiveRequest { get ; }
32
- private BroadcastParametersBlock BroadcastParameters { get ; }
33
- private AcquireDbMainTagGraphBlock AcquireDbMainTag { get ; }
34
- private AcquireDbNextTagGraphBlock AcquireDbNextTag { get ; }
35
- private DataflowJoinBlock < TagGraph , TagGraph > JoinDbTags { get ; }
36
- private LoadDbLatestTagsBlock LoadDbTags { get ; }
37
- private LoadGitHubTagsBlock LoadGitHubTags { get ; }
38
- private DataflowJoinBlock < SyncContext , SyncContext > JoinTags { get ; }
39
- private BroadcastTagsBlock BroadcastTags { get ; }
40
- private StreamGitHubTagsBlock StreamGitHubTags { get ; }
41
- private GetTagAssetsBlock GetTagAssets { get ; }
42
- private TagBufferBlock TagBuffer { get ; }
43
- private AccumulateProcessedTagsBlock AccumulateProcessedTags { get ; }
44
- private BulkSaveStagingBlock SaveTags { get ; }
32
+ //#region blocks
33
+ private SynchronizeTagsBlock SynchronizeTags { get ; }
34
+ //private ReceiveRequestBlock ReceiveRequest { get; }
35
+ //private BroadcastParametersBlock BroadcastParameters { get; }
36
+ //private AcquireDbMainTagGraphBlock AcquireDbMainTag { get; }
37
+ //private AcquireDbNextTagGraphBlock AcquireDbNextTag { get; }
38
+ //private DataflowJoinBlock<TagGraph, TagGraph> JoinDbTags { get; }
39
+ //private LoadDbLatestTagsBlock LoadDbTags { get; }
40
+ //private LoadGitHubTagsBlock LoadGitHubTags { get; }
41
+ //private DataflowJoinBlock<SyncContext, SyncContext> JoinTags { get; }
42
+ //private BroadcastTagsBlock BroadcastTags { get; }
43
+ //private StreamGitHubTagsBlock StreamGitHubTags { get; }
44
+ //private GetTagAssetsBlock GetTagAssets { get; }
45
+ //private TagBufferBlock TagBuffer { get; }
46
+ //private AccumulateProcessedTagsBlock AccumulateProcessedTags { get; }
47
+ //private BulkSaveStagingBlock SaveTags { get; }
45
48
46
- public ITargetBlock < SyncRequestParameters > InputBlock => ReceiveRequest . Block ;
47
- public Task OutputTask => SaveTags . Block . Completion ;
49
+ public ITargetBlock < TagSyncRequestParameters > InputBlock => SynchronizeTags . Block ! ;
50
+ public Task OutputTask => SynchronizeTags . Block . Completion ;
48
51
49
52
protected override IReadOnlyDictionary < string , IDataflowBlock > Blocks => new Dictionary < string , IDataflowBlock >
50
53
{
51
- [ nameof ( ReceiveRequest ) ] = ReceiveRequest . Block ,
52
- [ nameof ( BroadcastParameters ) ] = BroadcastParameters . Block ,
53
- [ nameof ( AcquireDbMainTag ) ] = AcquireDbMainTag . Block ,
54
- [ nameof ( AcquireDbNextTag ) ] = AcquireDbNextTag . Block ,
55
- [ nameof ( JoinDbTags ) ] = JoinDbTags . Block ,
56
- [ nameof ( LoadDbTags ) ] = LoadDbTags . Block ,
57
- [ nameof ( LoadGitHubTags ) ] = LoadGitHubTags . Block ,
58
- [ nameof ( JoinTags ) ] = JoinTags . Block ,
59
- [ nameof ( BroadcastTags ) ] = BroadcastTags . Block ,
60
- [ nameof ( StreamGitHubTags ) ] = StreamGitHubTags . Block ,
61
- [ nameof ( GetTagAssets ) ] = GetTagAssets . Block ,
62
- [ nameof ( TagBuffer ) ] = TagBuffer . Block ,
63
- [ nameof ( AccumulateProcessedTags ) ] = AccumulateProcessedTags . Block ,
64
- [ nameof ( SaveTags ) ] = SaveTags . Block ,
54
+ [ nameof ( SynchronizeTags ) ] = SynchronizeTags . Block ,
55
+ // [nameof(ReceiveRequest)] = ReceiveRequest.Block,
56
+ // [nameof(BroadcastParameters)] = BroadcastParameters.Block,
57
+ // [nameof(AcquireDbMainTag)] = AcquireDbMainTag.Block,
58
+ // [nameof(AcquireDbNextTag)] = AcquireDbNextTag.Block,
59
+ // [nameof(JoinDbTags)] = JoinDbTags.Block,
60
+ // [nameof(LoadDbTags)] = LoadDbTags.Block,
61
+ // [nameof(LoadGitHubTags)] = LoadGitHubTags.Block,
62
+ // [nameof(JoinTags)] = JoinTags.Block,
63
+ // [nameof(BroadcastTags)] = BroadcastTags.Block,
64
+ // [nameof(StreamGitHubTags)] = StreamGitHubTags.Block,
65
+ // [nameof(GetTagAssets)] = GetTagAssets.Block,
66
+ // [nameof(TagBuffer)] = TagBuffer.Block,
67
+ // [nameof(AccumulateProcessedTags)] = AccumulateProcessedTags.Block,
68
+ // [nameof(SaveTags)] = SaveTags.Block,
65
69
} ;
66
- #endregion
70
+ // #endregion
67
71
68
72
public override void CreateBlocks ( )
69
73
{
70
- ReceiveRequest . CreateBlock ( ) ;
71
- BroadcastParameters . CreateBlock ( ReceiveRequest ) ;
72
- AcquireDbMainTag . CreateBlock ( BroadcastParameters ) ;
73
- AcquireDbNextTag . CreateBlock ( BroadcastParameters ) ;
74
- JoinDbTags . CreateBlock ( AcquireDbMainTag , AcquireDbNextTag ) ;
75
- LoadDbTags . CreateBlock ( JoinDbTags ) ;
76
- LoadGitHubTags . CreateBlock ( LoadDbTags ) ;
77
- JoinTags . CreateBlock ( LoadDbTags , LoadGitHubTags ) ;
78
- BroadcastTags . CreateBlock ( JoinTags ) ;
79
- StreamGitHubTags . CreateBlock ( BroadcastTags ) ;
80
- GetTagAssets . CreateBlock ( StreamGitHubTags ) ;
81
- TagBuffer . CreateBlock ( GetTagAssets ) ;
82
- AccumulateProcessedTags . CreateBlock ( TagBuffer ) ;
83
- SaveTags . CreateBlock ( AccumulateProcessedTags ) ;
74
+ SynchronizeTags . CreateBlock ( ) ;
75
+ //ReceiveRequest.CreateBlock();
76
+ //BroadcastParameters.CreateBlock(ReceiveRequest);
77
+ //AcquireDbMainTag.CreateBlock(BroadcastParameters);
78
+ //AcquireDbNextTag.CreateBlock(BroadcastParameters);
79
+ //JoinDbTags.CreateBlock(AcquireDbMainTag, AcquireDbNextTag);
80
+ //LoadDbTags.CreateBlock(JoinDbTags);
81
+ //LoadGitHubTags.CreateBlock(LoadDbTags);
82
+ //JoinTags.CreateBlock(LoadDbTags, LoadGitHubTags);
83
+ //BroadcastTags.CreateBlock(JoinTags);
84
+ //StreamGitHubTags.CreateBlock(BroadcastTags);
85
+ //GetTagAssets.CreateBlock(StreamGitHubTags);
86
+ //TagBuffer.CreateBlock(GetTagAssets);
87
+ //AccumulateProcessedTags.CreateBlock(TagBuffer);
88
+ //SaveTags.CreateBlock(AccumulateProcessedTags);
84
89
}
85
90
}
91
+
92
+ public class SynchronizeTagsBlock : ActionBlockBase < TagSyncRequestParameters , SyncContext >
93
+ {
94
+ private readonly IGitHubClientService _github ;
95
+ private readonly TagServices _tagServices ;
96
+
97
+ public SynchronizeTagsBlock ( PipelineSection < SyncContext > parent , CancellationTokenSource tokenSource , ILogger logger ,
98
+ TagServices tagServices ,
99
+ IGitHubClientService github )
100
+ : base ( parent , tokenSource , logger )
101
+ {
102
+ _tagServices = tagServices ;
103
+ _github = github ;
104
+ }
105
+
106
+ protected override async Task ActionAsync ( TagSyncRequestParameters input )
107
+ {
108
+ var getGithubTags = _github . GetAllTagsAsync ( ) ;
109
+ var dbMain = _tagServices . GetLatestTag ( false ) ;
110
+ var dbNext = _tagServices . GetLatestTag ( true ) ;
111
+
112
+ var githubTags = await getGithubTags ;
113
+ var ( gitHubMain , gitHubNext , _) = githubTags . GetLatestTags ( ) ;
114
+
115
+ var mergedMain = ( dbMain ?? gitHubMain with { InstallerDownloads = gitHubMain . InstallerDownloads } ) ! ;
116
+ var mergedNext = ( dbNext ?? gitHubNext with { InstallerDownloads = gitHubNext . InstallerDownloads } ) ! ;
117
+
118
+ var inserts = new List < TagGraph > ( ) ;
119
+ var updates = new List < TagGraph > ( ) ;
120
+
121
+ if ( dbMain is null )
122
+ {
123
+ inserts . Add ( mergedMain ) ;
124
+ }
125
+ else
126
+ {
127
+ updates . Add ( mergedMain ) ;
128
+ }
129
+
130
+ if ( dbNext is null )
131
+ {
132
+ inserts . Add ( mergedNext ) ;
133
+ }
134
+ else
135
+ {
136
+ updates . Add ( mergedNext ) ;
137
+ }
138
+
139
+ if ( inserts . Any ( ) )
140
+ {
141
+ _tagServices . Create ( inserts ) ;
142
+ }
143
+ if ( updates . Any ( ) )
144
+ {
145
+ _tagServices . Update ( updates ) ;
146
+ }
147
+ }
148
+ }
0 commit comments