Skip to content

Commit

Permalink
PDI-1122 : Parallel CSV input running on the master of a clustered ex…
Browse files Browse the repository at this point in the history
…ecution only processes a portion of the input file

git-svn-id: svn://source.pentaho.org/svnkettleroot/Kettle/trunk@7614 5fb7f6ec-07c1-534a-b4ca-9155e429e800
  • Loading branch information
mcasters committed May 5, 2008
1 parent 53e8f85 commit 0752075
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 27 deletions.
3 changes: 3 additions & 0 deletions src-core/org/pentaho/di/core/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ public class Const

/** The slave transformation number */
public static final String INTERNAL_VARIABLE_STEP_UNIQUE_NUMBER = INTERNAL_VARIABLE_PREFIX+".Step.Unique.Number";

/** Is this transformation running clustered, on the master? */
public static final String INTERNAL_VARIABLE_CLUSTER_MASTER = INTERNAL_VARIABLE_PREFIX+".Cluster.Master";

/** The size of the cluster : number of slaves */
public static final String INTERNAL_VARIABLE_STEP_UNIQUE_COUNT = INTERNAL_VARIABLE_PREFIX+".Step.Unique.Count";
Expand Down
5 changes: 4 additions & 1 deletion src/org/pentaho/di/trans/Trans.java
Original file line number Diff line number Diff line change
Expand Up @@ -2048,7 +2048,8 @@ public static final void executeClustered(final TransSplitter transSplitter, fin
TransConfiguration transConfiguration = new TransConfiguration(master, executionConfiguration);
Map<String, String> variables = transConfiguration.getTransExecutionConfiguration().getVariables();
variables.put(Const.INTERNAL_VARIABLE_CLUSTER_SIZE, Integer.toString(slaves.length));

variables.put(Const.INTERNAL_VARIABLE_CLUSTER_MASTER, "Y");

String masterReply = masterServer.sendXML(transConfiguration.getXML(), AddTransServlet.CONTEXT_PATH+"/?xml=Y");
WebResult webResult = WebResult.fromXMLString(masterReply);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK))
Expand Down Expand Up @@ -2081,6 +2082,8 @@ public void run() {
variables.put(Const.INTERNAL_VARIABLE_SLAVE_SERVER_NUMBER, Integer.toString(index));
variables.put(Const.INTERNAL_VARIABLE_SLAVE_SERVER_NAME, slaves[index].getName());
variables.put(Const.INTERNAL_VARIABLE_CLUSTER_SIZE, Integer.toString(slaves.length));
variables.put(Const.INTERNAL_VARIABLE_CLUSTER_MASTER, "N");

String slaveReply = slaves[index].sendXML(transConfiguration.getXML(), AddTransServlet.CONTEXT_PATH+"/?xml=Y");
WebResult webResult = WebResult.fromXMLString(slaveReply);
if (!webResult.getResult().equalsIgnoreCase(WebResult.STRING_OK))
Expand Down
4 changes: 3 additions & 1 deletion src/org/pentaho/di/trans/step/BaseStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ public boolean init(StepMetaInterface smi, StepDataInterface sdi)

String slaveNr = transMeta.getVariable(Const.INTERNAL_VARIABLE_SLAVE_SERVER_NUMBER);
String clusterSize = transMeta.getVariable(Const.INTERNAL_VARIABLE_CLUSTER_SIZE);
if (!Const.isEmpty(slaveNr) && !Const.isEmpty(clusterSize))
boolean master = "Y".equalsIgnoreCase(transMeta.getVariable(Const.INTERNAL_VARIABLE_CLUSTER_MASTER));

if (!Const.isEmpty(slaveNr) && !Const.isEmpty(clusterSize) && !master)
{
this.slaveNr = Integer.parseInt(slaveNr);
this.clusterSize = Integer.parseInt(clusterSize);
Expand Down
83 changes: 70 additions & 13 deletions src/org/pentaho/di/www/Carte.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@

public class Carte
{
private String hostname;
private int port;
private WebServer webServer;

public Carte(String hostname, int port, boolean join) throws Exception {
this.hostname = hostname;
this.port = port;

TransformationMap transformationMap = new TransformationMap();
JobMap jobMap = new JobMap();

Trans trans = generateTestTransformation();
transformationMap.addTransformation(trans.getName(), trans, new TransConfiguration(trans.getTransMeta(), new TransExecutionConfiguration()));

this.webServer = new WebServer(transformationMap, jobMap, hostname, port, join);
}

public static void main(String[] args) throws Exception
{
if (args.length<2 || (Const.isEmpty(args[0]) && Const.isEmpty(args[1])) )
Expand All @@ -42,32 +59,30 @@ public static void main(String[] args) throws Exception
System.exit(1);
}

runCarte(args[0], args[1]);
}

public static void runCarte(String hostnameArgument, String portArgument) throws Exception {
init();

TransformationMap transformationMap = new TransformationMap(Thread.currentThread().getName());
JobMap jobMap = new JobMap();

Trans trans = generateTestTransformation();
transformationMap.addTransformation(trans.getName(), trans, new TransConfiguration(trans.getTransMeta(), new TransExecutionConfiguration()));

String hostname = args[0];
String hostname = hostnameArgument;
int port = WebServer.PORT;
if (args.length>=2)
if (!Const.isEmpty(portArgument))
{
try
{
port = Integer.parseInt(args[1]);
port = Integer.parseInt(portArgument);
}
catch(Exception e)
{
System.out.println(Messages.getString("Carte.Error.CanNotPartPort",args[0],""+port));
System.out.println(Messages.getString("Carte.Error.CanNotPartPort", hostnameArgument, ""+port));

}
}
new WebServer(transformationMap, jobMap, hostname, port);
}
new Carte(hostname, port, true);
}

private static void init() throws Exception
private static void init() throws Exception
{
EnvUtil.environmentInit();
LogWriter.getInstance( LogWriter.LOG_LEVEL_BASIC );
Expand Down Expand Up @@ -123,4 +138,46 @@ public static Trans generateTestTransformation()
return new Trans(transMeta);

}

/**
* @return the webServer
*/
public WebServer getWebServer() {
return webServer;
}

/**
* @param webServer the webServer to set
*/
public void setWebServer(WebServer webServer) {
this.webServer = webServer;
}

/**
* @return the port
*/
public int getPort() {
return port;
}

/**
* @param port the port to set
*/
public void setPort(int port) {
this.port = port;
}

/**
* @return the hostname
*/
public String getHostname() {
return hostname;
}

/**
* @param hostname the hostname to set
*/
public void setHostname(String hostname) {
this.hostname = hostname;
}
}
21 changes: 9 additions & 12 deletions src/org/pentaho/di/www/TransformationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ public class TransformationMap
private Map<String, TransConfiguration> configurationMap;
private Map<String, Appender> loggingMap;

private String parentThreadName;

private Map<String, Integer> serverSocketPorts;

/**
* @param parentThreadName
* @deprecated The parent thread name is no longer used.
*/
public TransformationMap(String parentThreadName)
{
this.parentThreadName = parentThreadName;

this();
}

public TransformationMap()
{
transformationMap = new Hashtable<String, Trans>();
configurationMap = new Hashtable<String, TransConfiguration>();
loggingMap = new Hashtable<String, Appender>();
Expand Down Expand Up @@ -100,14 +105,6 @@ public String[] getTransformationNames()
return keySet.toArray(new String[keySet.size()]);
}

/**
* @return the parentThreadName
*/
public String getParentThreadName()
{
return parentThreadName;
}

/**
* @return the configurationMap
*/
Expand Down
101 changes: 101 additions & 0 deletions testfiles/customers-100.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
id;name;firstname;zip;city;birthdate;street;housenr;stateCode;state
1;jwcdf-name;fsj-firstname; 13520;oem-city;1954/02/07;amrb-street; 145;AK;ALASKA
2;flhxu-name;tum-firstname; 17520;buo-city;1966/04/24;wfyz-street; 96;GA;GEORGIA
3;xthfg-name;gfe-firstname; 12560;vtz-city;1990/01/11;doxx-street; 46;NJ;NEW JERSEY
4;ulzrz-name;bnl-firstname; 11620;prz-city;1966/08/02;bxqn-street; 104;NY;NEW YORK
5;oxhyr-name;onx-firstname; 15180;bpn-city;1970/11/14;pksn-street; 133;IN;INDIANA
6;fiqjz-name;sce-firstname; 16020;fnn-city;1954/09/24;wbhg-street; 35;MD;MARYLAND
7;tkiat-name;xti-firstname; 12720;stt-city;1966/08/11;tvnf-street; 21;PA;PENNSYLVANIA
8;kljcz-name;uqd-firstname; 13340;ntt-city;1987/01/15;jyje-street; 10;PW;PALAU
9;pgunz-name;hcm-firstname; 16680;gxh-city;1970/11/08;shbe-street; 184;NC;NORTH CAROLINA
10;oyjha-name;uhj-firstname; 18880;uyg-city;1966/04/10;bjgw-street; 176;AR;ARKANSAS
11;igxbd-name;uph-firstname; 13480;ndh-city;1962/12/03;jdcd-street; 151;NH;NEW HAMPSHIRE
12;vnaov-name;wha-firstname; 13120;egm-city;1954/03/28;hpep-street; 20;CA;CALIFORNIA
13;dauuz-name;hwg-firstname; 13740;khn-city;1958/05/15;etqx-street; 5;OK;OKLAHOMA
14;gkuuo-name;kkb-firstname; 13560;xdt-city;1962/04/07;sdoj-street; 35;MT;MONTANA
15;wdhze-name;jjk-firstname; 16900;due-city;1970/07/17;pmmu-street; 174;AS;AMERICAN SAMOA
16;ncayz-name;ynb-firstname; 15720;lxj-city;1974/04/27;mdtb-street; 109;MA;MASSACHUSETTS
17;rdjin-name;hhu-firstname; 14480;lpc-city;1958/11/16;wxik-street; 145;KY;KENTUCKY
18;nxzij-name;bdl-firstname; 10740;avx-city;1958/02/20;nybz-street; 138;WI;WISCONSIN
19;xgrzc-name;dxw-firstname; 18900;vpq-city;1990/11/16;wzjh-street; 58;ME;MAINE
20;ehgrn-name;vbe-firstname; 17500;cik-city;1978/05/21;ucnw-street; 135;MD;MARYLAND
21;gctjx-name;upx-firstname; 11960;yqr-city;1958/03/03;rlko-street; 141;TN;TENNESSEE
22;ptzmg-name;hva-firstname; 15740;gux-city;1978/05/04;pugy-street; 122;VI;VIRGIN ISLANDS
23;eyeti-name;gnw-firstname; 17420;eko-city;1962/10/26;ylph-street; 61;NC;NORTH CAROLINA
24;wccwo-name;zpj-firstname; 16600;uim-city;1962/09/29;ygih-street; 26;WA;WASHINGTON
25;bwkoe-name;ayl-firstname; 18660;rtw-city;1978/07/16;mzww-street; 179;CA;CALIFORNIA
26;rezku-name;zio-firstname; 19080;nvt-city;1982/07/14;wwkd-street; 91;CA;CALIFORNIA
27;mjlsk-name;ecx-firstname; 10800;yxu-city;1950/12/11;vttb-street; 195;MO;MISSOURI
28;wdjsi-name;aoq-firstname; 13660;smo-city;1954/02/01;kako-street; 7;NV;NEVADA
29;mwfnd-name;nyb-firstname; 19760;bbu-city;1986/09/23;apdi-street; 91;MS;MISSISSIPPI
30;vtuoz-name;jhh-firstname; 17620;vad-city;1982/05/05;kzup-street; 79;GA;GEORGIA
31;rhhxk-name;ndr-firstname; 16760;fub-city;1978/11/12;regd-street; 55;OK;OKLAHOMA
32;lpstk-name;mqz-firstname; 18940;tnr-city;1982/09/16;cdhf-street; 4;SD;SOUTH DAKOTA
33;ldhyr-name;yts-firstname; 12000;auk-city;1986/11/14;abph-street; 147;IN;INDIANA
34;cjdml-name;iti-firstname; 16900;wkq-city;1970/06/05;npow-street; 96;NH;NEW HAMPSHIRE
35;cpenz-name;sbi-firstname; 16380;ssl-city;1962/08/19;kilz-street; 44;MS;MISSISSIPPI
36;rxtbg-name;anr-firstname; 14720;bqc-city;1958/08/10;pudg-street; 140;NV;NEVADA
37;udblf-name;raa-firstname; 11500;wli-city;1978/12/13;xomd-street; 41;PW;PALAU
38;vvyce-name;gep-firstname; 13740;gtd-city;1982/05/23;kwbv-street; 123;undefined;undefined
39;kwfnz-name;ucu-firstname; 10580;sns-city;1978/08/18;nnun-street; 20;OK;OKLAHOMA
40;zxydx-name;tml-firstname; 14680;jda-city;1974/05/29;wfjn-street; 157;DC;DISTRICT OF COLUMBIA
41;bfscx-name;jnl-firstname; 16920;yyg-city;1970/11/30;cgfh-street; 178;CO;COLORADO
42;qitur-name;yra-firstname; 15560;ijp-city;1978/01/30;fonc-street; 155;AK;ALASKA
43;msixi-name;ynb-firstname; 12720;ksl-city;1958/07/17;zpjw-street; 46;VI;VIRGIN ISLANDS
44;wzkjq-name;rgh-firstname; 19000;hkm-city;1974/08/12;yixf-street; 134;CA;CALIFORNIA
45;dqfmf-name;yxr-firstname; 13840;vie-city;1962/10/23;stvx-street; 39;TX;TEXAS
46;biluz-name;uqe-firstname; 17760;wkq-city;1962/07/27;embn-street; 183;PW;PALAU
47;wahfx-name;zwd-firstname; 13240;vic-city;1974/03/27;axpw-street; 131;UT;UTAH
48;denwt-name;bta-firstname; 17300;hhj-city;1986/12/20;orwy-street; 11;WV;WEST VIRGINIA
49;akdmy-name;ybz-firstname; 14560;wtx-city;1962/11/08;nwba-street; 123;MP;NORTHERN MARIANA ISLANDS
50;hqafg-name;nht-firstname; 16080;gfu-city;1951/01/12;spsq-street; 45;LA;LOUISIANA
51;zhmbl-name;lnw-firstname; 17460;hse-city;1986/12/21;scis-street; 97;GA;GEORGIA
52;snwnj-name;jyy-firstname; 16400;hsz-city;1966/02/15;imhl-street; 42;NC;NORTH CAROLINA
53;fuyla-name;mmp-firstname; 11840;hgu-city;1986/08/16;ixiz-street; 145;NC;NORTH CAROLINA
54;yvfqz-name;prz-firstname; 11260;wjl-city;1982/05/06;fbzd-street; 97;MO;MISSOURI
55;usbgq-name;vhd-firstname; 14080;dsb-city;1958/04/01;ggoc-street; 54;KS;KANSAS
56;yaeni-name;zpy-firstname; 19100;sen-city;1954/12/10;sbsw-street; 158;HI;HAWAII
57;fgxvr-name;vzi-firstname; 17520;lcf-city;1958/11/01;nbdv-street; 10;GU;GUAM
58;tqpbq-name;rwr-firstname; 19140;zpd-city;1978/08/23;npvb-street; 190;DC;DISTRICT OF COLUMBIA
59;ieigg-name;ayq-firstname; 12960;ljc-city;1962/07/05;dnjz-street; 163;FL;FLORIDA
60;rfvzu-name;edm-firstname; 13340;kvz-city;1954/12/08;eijd-street; 4;RI;RHODE ISLAND
61;pduwm-name;gqb-firstname; 14240;cyr-city;1954/07/03;ndux-street; 13;SD;SOUTH DAKOTA
62;yyixf-name;yzt-firstname; 18020;lwx-city;1974/01/29;iede-street; 120;NV;NEVADA
63;dkszq-name;ytd-firstname; 14700;zwh-city;1979/01/11;nbjz-street; 65;AS;AMERICAN SAMOA
64;slkzv-name;zbg-firstname; 19880;oee-city;1978/11/01;sphg-street; 119;OK;OKLAHOMA
65;nvxim-name;phc-firstname; 19220;vgg-city;1991/01/24;juok-street; 106;FM;FEDERATED STATES OF MICRONESIA
66;piyfg-name;xtn-firstname; 13760;nde-city;1954/07/22;vfrv-street; 11;NY;NEW YORK
67;jnusz-name;mjw-firstname; 12640;nwb-city;1986/08/23;kcsa-street; 138;VA;VIRGINIA
68;jnypj-name;ioq-firstname; 17000;zqy-city;1986/01/09;croe-street; 119;PW;PALAU
69;uohts-name;btx-firstname; 13480;dal-city;1990/10/22;llyw-street; 150;WA;WASHINGTON
70;aavpj-name;pvw-firstname; 13780;lai-city;1954/09/23;nygu-street; 171;FL;FLORIDA
71;nbjcj-name;rsf-firstname; 12000;kjl-city;1986/06/30;ijsb-street; 123;ID;IDAHO
72;syjxh-name;gkq-firstname; 19960;rmd-city;1978/10/26;qmyp-street; 161;MN;MINNESOTA
73;vkojz-name;ryo-firstname; 14300;bmz-city;1954/09/11;gcpj-street; 71;ND;NORTH DAKOTA
74;pqzfw-name;kld-firstname; 16400;qvq-city;1962/09/09;dhbv-street; 92;ND;NORTH DAKOTA
75;owvjk-name;fez-firstname; 19740;ldb-city;1978/06/14;kabf-street; 87;VA;VIRGINIA
76;qsfih-name;ixe-firstname; 16860;qvr-city;1987/01/07;qean-street; 159;CO;COLORADO
77;slixq-name;gmb-firstname; 19980;ftt-city;1982/06/22;xinx-street; 111;VT;VERMONT
78;eegsa-name;xlc-firstname; 12680;byk-city;1954/04/23;beul-street; 56;MD;MARYLAND
79;phevp-name;ihs-firstname; 16120;adc-city;1978/04/25;voig-street; 98;NM;NEW MEXICO
80;njfoe-name;tag-firstname; 16580;tnr-city;1966/12/04;dhky-street; 108;LA;LOUISIANA
81;bdncx-name;hcd-firstname; 11260;xcl-city;1970/07/02;jvlp-street; 49;GA;GEORGIA
82;ikedo-name;tks-firstname; 17460;odl-city;1958/08/25;iaaq-street; 8;GU;GUAM
83;iafxy-name;vur-firstname; 11480;hgt-city;1962/08/03;hmec-street; 164;TX;TEXAS
84;lafhf-name;ssz-firstname; 19560;wwp-city;1951/01/25;mxmq-street; 96;IN;INDIANA
85;okyny-name;hbu-firstname; 16800;yok-city;1978/03/28;ipjz-street; 135;NV;NEVADA
86;hznby-name;fwy-firstname; 13680;wbi-city;1970/07/25;mxui-street; 170;CT;CONNECTICUT
87;ztpoa-name;rzk-firstname; 18500;qum-city;1970/07/26;blqr-street; 152;ME;MAINE
88;gitxz-name;axt-firstname; 11800;fck-city;1974/01/12;tmjw-street; 189;SD;SOUTH DAKOTA
89;ziomm-name;mcv-firstname; 12940;iwq-city;1950/10/22;hqgj-street; 140;DC;DISTRICT OF COLUMBIA
90;otncg-name;tuy-firstname; 16540;ulk-city;1971/01/24;yuia-street; 166;TX;TEXAS
91;cnabb-name;hoq-firstname; 16300;tuw-city;1962/06/17;ujvv-street; 61;ME;MAINE
92;ucogf-name;ggc-firstname; 14500;fsj-city;1978/02/08;asfi-street; 53;WV;WEST VIRGINIA
93;lbpmf-name;sdt-firstname; 10780;ewj-city;1978/03/08;hxsp-street; 102;NV;NEVADA
94;tieqq-name;uyu-firstname; 17740;wea-city;1966/10/31;abpl-street; 187;MO;MISSOURI
95;fsgwf-name;vjd-firstname; 12460;ads-city;1970/11/29;yeou-street; 10;MA;MASSACHUSETTS
96;reeba-name;kzs-firstname; 13100;zhc-city;1966/07/08;abmv-street; 88;FL;FLORIDA
97;shybc-name;gcp-firstname; 10660;ahg-city;1950/12/15;hrqy-street; 174;KS;KANSAS
98;phszr-name;sst-firstname; 13080;ydd-city;1954/09/23;quqn-street; 2;RI;RHODE ISLAND
99;jteco-name;fxc-firstname; 19760;agr-city;1986/05/06;dzxc-street; 108;MD;MARYLAND
100;qvaar-name;icx-firstname; 16120;boc-city;1978/08/04;bfzf-street; 12;NM;NEW MEXICO

0 comments on commit 0752075

Please sign in to comment.