11package org .testcontainers .redpanda ;
22
33import com .github .dockerjava .api .command .InspectContainerResponse ;
4+ import freemarker .template .Configuration ;
5+ import freemarker .template .Template ;
6+ import lombok .AllArgsConstructor ;
7+ import lombok .Cleanup ;
8+ import lombok .Data ;
9+ import lombok .SneakyThrows ;
410import org .testcontainers .containers .GenericContainer ;
511import org .testcontainers .containers .wait .strategy .Wait ;
612import org .testcontainers .images .builder .Transferable ;
713import org .testcontainers .utility .ComparableVersion ;
814import org .testcontainers .utility .DockerImageName ;
15+ import org .testcontainers .utility .MountableFile ;
16+
17+ import java .io .ByteArrayOutputStream ;
18+ import java .io .OutputStreamWriter ;
19+ import java .io .Writer ;
20+ import java .nio .charset .StandardCharsets ;
21+ import java .util .ArrayList ;
22+ import java .util .HashMap ;
23+ import java .util .HashSet ;
24+ import java .util .List ;
25+ import java .util .Map ;
26+ import java .util .Set ;
27+ import java .util .function .Supplier ;
28+ import java .util .stream .Collectors ;
929
1030/**
1131 * Testcontainers implementation for Redpanda.
1434 * <ul>
1535 * <li>Broker: 9092</li>
1636 * <li>Schema Registry: 8081</li>
37+ * <li>Proxy: 8082</li>
1738 * </ul>
1839 */
1940public class RedpandaContainer extends GenericContainer <RedpandaContainer > {
@@ -30,9 +51,21 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {
3051
3152 private static final int REDPANDA_PORT = 9092 ;
3253
54+ private static final int REDPANDA_ADMIN_PORT = 9644 ;
55+
3356 private static final int SCHEMA_REGISTRY_PORT = 8081 ;
3457
35- private static final String STARTER_SCRIPT = "/testcontainers_start.sh" ;
58+ private static final int REST_PROXY_PORT = 8082 ;
59+
60+ private boolean enableAuthorization ;
61+
62+ private String authenticationMethod = "none" ;
63+
64+ private String schemaRegistryAuthenticationMethod = "none" ;
65+
66+ private final List <String > superusers = new ArrayList <>();
67+
68+ private final Set <Supplier <Listener >> listenersValueSupplier = new HashSet <>();
3669
3770 public RedpandaContainer (String image ) {
3871 this (DockerImageName .parse (image ));
@@ -47,33 +80,198 @@ public RedpandaContainer(DockerImageName imageName) {
4780 throw new IllegalArgumentException ("Redpanda version must be >= v22.2.1" );
4881 }
4982
50- withExposedPorts (REDPANDA_PORT , SCHEMA_REGISTRY_PORT );
83+ withExposedPorts (REDPANDA_PORT , REDPANDA_ADMIN_PORT , SCHEMA_REGISTRY_PORT , REST_PROXY_PORT );
5184 withCreateContainerCmdModifier (cmd -> {
52- cmd .withEntrypoint ("sh" );
85+ cmd .withEntrypoint ();
86+ cmd .withUser ("root:root" );
5387 });
54- waitingFor (Wait .forLogMessage (".*Started Kafka API server.*" , 1 ));
55- withCommand ("-c" , "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT );
88+ waitingFor (Wait .forLogMessage (".*Successfully started Redpanda!.*" , 1 ));
89+ withCopyFileToContainer (
90+ MountableFile .forClasspathResource ("testcontainers/entrypoint-tc.sh" , 0700 ),
91+ "/entrypoint-tc.sh"
92+ );
93+ withCommand ("/entrypoint-tc.sh" , "redpanda" , "start" , "--mode=dev-container" , "--smp=1" , "--memory=1G" );
94+ }
95+
96+ @ Override
97+ protected void configure () {
98+ this .listenersValueSupplier .stream ()
99+ .map (Supplier ::get )
100+ .map (Listener ::getAddress )
101+ .forEach (this ::withNetworkAliases );
56102 }
57103
104+ @ SneakyThrows
58105 @ Override
59106 protected void containerIsStarting (InspectContainerResponse containerInfo ) {
60107 super .containerIsStarting (containerInfo );
61108
62- String command = "#!/bin/bash\n " ;
63-
64- command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G " ;
65- command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 " ;
66- command +=
67- "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost () + ":" + getMappedPort (9092 );
109+ Configuration cfg = new Configuration (Configuration .DEFAULT_INCOMPATIBLE_IMPROVEMENTS );
110+ cfg .setClassLoaderForTemplateLoading (getClass ().getClassLoader (), "testcontainers" );
111+ cfg .setDefaultEncoding ("UTF-8" );
68112
69- copyFileToContainer (Transferable .of (command , 0777 ), STARTER_SCRIPT );
113+ copyFileToContainer (getBootstrapFile (cfg ), "/etc/redpanda/.bootstrap.yaml" );
114+ copyFileToContainer (getRedpandaFile (cfg ), "/etc/redpanda/redpanda.yaml" );
70115 }
71116
117+ /**
118+ * Returns the bootstrap servers address.
119+ * @return the bootstrap servers address
120+ */
72121 public String getBootstrapServers () {
73122 return String .format ("PLAINTEXT://%s:%s" , getHost (), getMappedPort (REDPANDA_PORT ));
74123 }
75124
125+ /**
126+ * Returns the schema registry address.
127+ * @return the schema registry address
128+ */
76129 public String getSchemaRegistryAddress () {
77130 return String .format ("http://%s:%s" , getHost (), getMappedPort (SCHEMA_REGISTRY_PORT ));
78131 }
132+
133+ /**
134+ * Returns the admin address.
135+ * @return the admin address
136+ */
137+ public String getAdminAddress () {
138+ return String .format ("http://%s:%s" , getHost (), getMappedPort (REDPANDA_ADMIN_PORT ));
139+ }
140+
141+ /**
142+ * Returns the rest proxy address.
143+ * @return the rest proxy address
144+ */
145+ public String getRestProxyAddress () {
146+ return String .format ("http://%s:%s" , getHost (), getMappedPort (REST_PROXY_PORT ));
147+ }
148+
149+ /**
150+ * Enables authorization.
151+ * @return this {@link RedpandaContainer} instance
152+ */
153+ public RedpandaContainer enableAuthorization () {
154+ this .enableAuthorization = true ;
155+ return this ;
156+ }
157+
158+ /**
159+ * Enables SASL.
160+ * @return this {@link RedpandaContainer} instance
161+ */
162+ public RedpandaContainer enableSasl () {
163+ this .authenticationMethod = "sasl" ;
164+ return this ;
165+ }
166+
167+ /**
168+ * Enables Http Basic Auth for Schema Registry.
169+ * @return this {@link RedpandaContainer} instance
170+ */
171+ public RedpandaContainer enableSchemaRegistryHttpBasicAuth () {
172+ this .schemaRegistryAuthenticationMethod = "http_basic" ;
173+ return this ;
174+ }
175+
176+ /**
177+ * Register username as a superuser.
178+ * @param username username to register as a superuser
179+ * @return this {@link RedpandaContainer} instance
180+ */
181+ public RedpandaContainer withSuperuser (String username ) {
182+ this .superusers .add (username );
183+ return this ;
184+ }
185+
186+ /**
187+ * Add a {@link Supplier} that will provide a listener with format {@code host:port}.
188+ * Host will be added as a network alias.
189+ * <p>
190+ * The listener will be added to the default listeners.
191+ * <p>
192+ * Default listeners:
193+ * <ul>
194+ * <li>0.0.0.0:9092</li>
195+ * <li>0.0.0.0:9093</li>
196+ * </ul>
197+ * <p>
198+ * Default advertised listeners:
199+ * <ul>
200+ * <li>{@code container.getHost():container.getMappedPort(9092)}</li>
201+ * <li>127.0.0.1:9093</li>
202+ * </ul>
203+ * @param listenerSupplier a supplier that will provide a listener
204+ * @return this {@link RedpandaContainer} instance
205+ */
206+ public RedpandaContainer withListener (Supplier <String > listenerSupplier ) {
207+ String [] parts = listenerSupplier .get ().split (":" );
208+ this .listenersValueSupplier .add (() -> new Listener (parts [0 ], Integer .parseInt (parts [1 ])));
209+ return this ;
210+ }
211+
212+ private Transferable getBootstrapFile (Configuration cfg ) {
213+ Map <String , Object > kafkaApi = new HashMap <>();
214+ kafkaApi .put ("enableAuthorization" , this .enableAuthorization );
215+ kafkaApi .put ("superusers" , this .superusers );
216+
217+ Map <String , Object > root = new HashMap <>();
218+ root .put ("kafkaApi" , kafkaApi );
219+
220+ String file = resolveTemplate (cfg , "bootstrap.yaml.ftl" , root );
221+
222+ return Transferable .of (file , 0700 );
223+ }
224+
225+ private Transferable getRedpandaFile (Configuration cfg ) {
226+ List <Map <String , Object >> listeners =
227+ this .listenersValueSupplier .stream ()
228+ .map (Supplier ::get )
229+ .map (listener -> {
230+ Map <String , Object > listenerMap = new HashMap <>();
231+ listenerMap .put ("address" , listener .getAddress ());
232+ listenerMap .put ("port" , listener .getPort ());
233+ return listenerMap ;
234+ })
235+ .collect (Collectors .toList ());
236+
237+ Map <String , Object > kafkaApi = new HashMap <>();
238+ kafkaApi .put ("authenticationMethod" , this .authenticationMethod );
239+ kafkaApi .put ("enableAuthorization" , this .enableAuthorization );
240+ kafkaApi .put ("advertisedHost" , getHost ());
241+ kafkaApi .put ("advertisedPort" , getMappedPort (9092 ));
242+ kafkaApi .put ("listeners" , listeners );
243+
244+ Map <String , Object > schemaRegistry = new HashMap <>();
245+ schemaRegistry .put ("authenticationMethod" , this .schemaRegistryAuthenticationMethod );
246+
247+ Map <String , Object > root = new HashMap <>();
248+ root .put ("kafkaApi" , kafkaApi );
249+ root .put ("schemaRegistry" , schemaRegistry );
250+
251+ String file = resolveTemplate (cfg , "redpanda.yaml.ftl" , root );
252+
253+ return Transferable .of (file , 0700 );
254+ }
255+
256+ @ SneakyThrows
257+ private String resolveTemplate (Configuration cfg , String template , Map <String , Object > data ) {
258+ Template temp = cfg .getTemplate (template );
259+
260+ @ Cleanup
261+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream ();
262+ @ Cleanup
263+ Writer out = new OutputStreamWriter (byteArrayOutputStream , StandardCharsets .UTF_8 );
264+ temp .process (data , out );
265+
266+ return new String (byteArrayOutputStream .toByteArray (), StandardCharsets .UTF_8 );
267+ }
268+
269+ @ Data
270+ @ AllArgsConstructor
271+ private static class Listener {
272+
273+ private String address ;
274+
275+ private int port ;
276+ }
79277}
0 commit comments