Skip to content

Commit

Permalink
Enhance file data source and flow rule manager
Browse files Browse the repository at this point in the history
- Some refinement for file data source
- Add resource name checking in FlowRuleManager to avoid NPE

Signed-off-by: Eric Zhao <sczyh16@gmail.com>
  • Loading branch information
sczyh30 committed Sep 5, 2018
1 parent 175452a commit 50a5610
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ private static Map<String, List<FlowRule>> loadFlowConf(List<FlowRule> list) {
}

for (FlowRule rule : list) {
if (!isValid(rule)) {
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(FlowRule.LIMIT_APP_DEFAULT);
}
Expand Down Expand Up @@ -199,4 +202,7 @@ public void configLoad(List<FlowRule> conf) {

}

private static boolean isValid(FlowRule rule) {
return rule != null && !StringUtil.isBlank(rule.getResource());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
/**
* A {@link ReadableDataSource} automatically fetches the backend data.
*
* @param <S>
* source data type
* @param <T>
* target data type
* @param <S> source data type
* @param <T> target data type
* @author Carpenter Lee
*/
public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, T> {
Expand All @@ -52,12 +50,12 @@ public AutoRefreshDataSource(Converter<S, T> configParser, final long recommendR

private void startTimerService() {
service = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true));
new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true));
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!refresh()) {
if (!isModified()) {
return;
}
T newValue = loadConfig();
Expand All @@ -77,7 +75,7 @@ public void close() throws Exception {
}
}

protected boolean refresh() {
protected boolean isModified() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* <p>
* A {@link ReadableDataSource} based on file. This class will automatically
* fetches the backend file every refresh period.
* fetches the backend file every isModified period.
* </p>
* <p>
* Limitations: Default read buffer size is 1 MB. If file size is greater than
Expand All @@ -46,16 +46,15 @@ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String,
private byte[] buf;
private final Charset charset;
private final File file;

private long lastModified = 0L;

/**
* Create a file based {@link ReadableDataSource} whose read buffer size is
* 1MB, charset is UTF8, and read interval is 3 seconds.
*
* @param file
* the file to read
* @param configParser
* the config decoder (parser)
* @param file the file to read
* @param configParser the config decoder (parser)
*/
public FileRefreshableDataSource(File file, Converter<String, T> configParser) throws FileNotFoundException {
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
Expand All @@ -66,30 +65,31 @@ public FileRefreshableDataSource(String fileName, Converter<String, T> configPar
}

public FileRefreshableDataSource(File file, Converter<String, T> configParser, int bufSize)
throws FileNotFoundException {
throws FileNotFoundException {
this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET);
}

public FileRefreshableDataSource(File file, Converter<String, T> configParser, Charset charset)
throws FileNotFoundException {
throws FileNotFoundException {
this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset);
}

public FileRefreshableDataSource(File file, Converter<String, T> configParser, long recommendRefreshMs, int bufSize,
Charset charset) throws FileNotFoundException {
Charset charset) throws FileNotFoundException {
super(configParser, recommendRefreshMs);
if (bufSize <= 0 || bufSize > MAX_SIZE) {
throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get");
}
if (file == null) {
throw new IllegalArgumentException("file can't be null");
if (file == null || file.isDirectory()) {
throw new IllegalArgumentException("File can't be null or a directory");
}
if (charset == null) {
throw new IllegalArgumentException("charset can't be null");
}
this.buf = new byte[bufSize];
this.file = file;
this.charset = charset;
// If the file does not exist, the last modified will be 0.
this.lastModified = file.lastModified();
firstLoad();
}
Expand All @@ -105,13 +105,17 @@ private void firstLoad() {

@Override
public String readSource() throws Exception {
if (!file.exists()) {
// Will throw FileNotFoundException later.
RecordLog.warn(String.format("[FileRefreshableDataSource] File does not exist: %s", file.getAbsolutePath()));
}
FileInputStream inputStream = null;
try {
inputStream = new FileInputStream(file);
FileChannel channel = inputStream.getChannel();
if (channel.size() > buf.length) {
throw new IllegalStateException(file.getAbsolutePath() + " file size=" + channel.size()
+ ", is bigger than bufSize=" + buf.length + ". Can't read");
+ ", is bigger than bufSize=" + buf.length + ". Can't read");
}
int len = inputStream.read(buf);
return new String(buf, 0, len, charset);
Expand All @@ -126,8 +130,8 @@ public String readSource() throws Exception {
}

@Override
protected boolean refresh() {
long curLastModified = new File(file.getAbsolutePath()).lastModified();
protected boolean isModified() {
long curLastModified = file.lastModified();
if (curLastModified != this.lastModified) {
this.lastModified = curLastModified;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,63 @@

import java.io.File;
import java.io.FileOutputStream;
import java.nio.charset.Charset;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.alibaba.csp.sentinel.log.RecordLog;

/**
* A {@link WritableDataSource} based on file.
*
* @param <T>
* data type
* @param <T> data type
* @author Eric Zhao
* @since 0.2.0
*/
public class FileWritableDataSource<T> implements WritableDataSource<T> {

private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

private final Converter<T, String> configEncoder;
private File file;
private final File file;
private final Charset charset;

private final Lock lock = new ReentrantLock(true);

public FileWritableDataSource(String filePath, Converter<T, String> configEncoder) {
this(new File(filePath), configEncoder);
}

public FileWritableDataSource(File file, Converter<T, String> configEncoder) {
this(file, configEncoder, DEFAULT_CHARSET);
}

public FileWritableDataSource(File file, Converter<T, String> configEncoder, Charset charset) {
if (file == null || file.isDirectory()) {
throw new IllegalArgumentException("Bad file");
}
if (configEncoder == null) {
throw new IllegalArgumentException("Config encoder cannot be null");
}
if (charset == null) {
throw new IllegalArgumentException("Charset cannot be null");
}
this.configEncoder = configEncoder;
this.file = file;
this.charset = charset;
}

@Override
public void write(T value) throws Exception {
if (configEncoder == null) {
throw new NullPointerException("configEncoder is null Can't write");
}
synchronized (file) {
lock.lock();
try {
String convertResult = configEncoder.convert(value);
FileOutputStream outputStream = null;
try {
outputStream = new FileOutputStream(file);
byte[] bytesArray = convertResult.getBytes();
byte[] bytesArray = convertResult.getBytes(charset);

RecordLog.info(String.format("[FileWritableDataSource] Writing to file %s: %s", file.toString(), convertResult));
outputStream.write(bytesArray);
outputStream.flush();
} finally {
Expand All @@ -68,6 +85,8 @@ public void write(T value) throws Exception {
}
}
}
} finally {
lock.unlock();
}
}

Expand Down

0 comments on commit 50a5610

Please sign in to comment.