Skip to content

Commit

Permalink
Refactor binlog to use nio file API
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshilliard committed Jun 21, 2020
1 parent 59b3ea4 commit eb66e96
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 117 deletions.
177 changes: 95 additions & 82 deletions modules/binlog/src/main/java/org/jpos/binlog/BinLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@

import org.jpos.iso.ISOUtil;

import java.io.*;
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.*;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardOpenOption.*;

/**
* General purpose binary log
Expand Down Expand Up @@ -58,7 +66,7 @@
*/
public abstract class BinLog implements AutoCloseable {
private static final int FILE_MAGIC = 0xFC;
private static final int VERSION = 0x0001;
private static final short VERSION = 0x0001;
private static final int RESERVED_LEN = 232;
private static final int MAX_CREATE_ATTEMPTS = 100;
protected static final int STATUS_OFFSET = Integer.BYTES + Short.BYTES;
Expand All @@ -69,12 +77,12 @@ public abstract class BinLog implements AutoCloseable {
private static final long CREATE_DELAY = 100L;
protected static final long FIRST_EVENT_OFFSET = TAIL_OFFSET + RESERVED_LEN + Long.BYTES;
private static SecureRandom numberGenerator = new SecureRandom();
private static Pattern filePattern = Pattern.compile("^[\\d]{8}.dat$");
private String mode;
private static String filePattern = "^[\\d]{8}.dat$";
private OpenOption[] mode;
private static Map<String,Object> mutexs = Collections.synchronizedMap(new HashMap<>());
protected File dir;
protected Path dir;
protected int fileNumber;
protected RandomAccessFile raf;
protected FileChannel raf;
protected final Object mutex;

/**
Expand All @@ -84,15 +92,19 @@ public abstract class BinLog implements AutoCloseable {
* @param create create directory if not exists
* @throws IOException on error
*/
protected BinLog(File dir, boolean create) throws IOException {
mutexs.putIfAbsent(dir.getAbsolutePath(), new Object());
mutex = mutexs.get(dir.getAbsolutePath());
if ((dir.exists() && !dir.isDirectory())|| (!dir.exists() && !create))
protected BinLog(Path dir, boolean create) throws IOException {
mutexs.putIfAbsent(dir.toAbsolutePath().toString(), new Object());
mutex = mutexs.get(dir.toAbsolutePath().toString());
if ((Files.exists(dir) && !Files.isDirectory(dir))|| (!Files.exists(dir) && !create))
throw new IOException ("Invalid directory '" + dir.toString() + "'");
else
dir.mkdirs();
Files.createDirectories(dir);
this.dir = dir;
mode = create ? "rw" : "r";
if (create) {
mode = new OpenOption[]{READ, WRITE, CREATE};
} else {
mode = new OpenOption[]{READ};
}
}

/**
Expand All @@ -102,17 +114,17 @@ protected BinLog(File dir, boolean create) throws IOException {
* @return a RandomAccessFile
* @throws IOException on error
*/
protected RandomAccessFile openOrCreateFile(File dir, int fileNumber) throws IOException {
File file = new File (dir, toFileName(fileNumber));
for (int i=0; !file.exists() && i<MAX_CREATE_ATTEMPTS; i++) {
File tempFile = new File(file.getPath() + "-" + Integer.toString(numberGenerator.nextInt(), 16) + ".tmp");
try (RandomAccessFile r = new RandomAccessFile(tempFile, "rwd")) {
protected FileChannel openOrCreateFile(Path dir, int fileNumber) throws IOException {
Path file = dir.resolve(toFileName(fileNumber));
for (int i=0; !Files.exists(file) && i<MAX_CREATE_ATTEMPTS; i++) {
Path tempFile = Paths.get(file.toString() + "-" + Integer.toString(numberGenerator.nextInt(), 16) + ".tmp");
try (FileChannel r = FileChannel.open(tempFile, READ, WRITE, CREATE, SYNC)) {
writeHeader (r, fileNumber);
}
if (tempFile.renameTo(file)) {
break;
} else {
tempFile.delete();
try {
Files.move(tempFile, file, ATOMIC_MOVE);
} catch (Exception e) {
Files.delete(tempFile);
}
ISOUtil.sleep (ThreadLocalRandom.current().nextLong(1L, CREATE_DELAY));
}
Expand All @@ -130,8 +142,8 @@ protected RandomAccessFile openOrCreateFile(File dir, int fileNumber) throws IOE
protected boolean checkCutover (boolean findLast) throws IOException {
boolean changed = false;
while (readStatus() == Status.CLOSED) {
raf.seek(NEXT_LOG_INDEX_OFFSET);
int next = raf.readInt();
MappedByteBuffer index = raf.map(FileChannel.MapMode.READ_ONLY, NEXT_LOG_INDEX_OFFSET, 4);
int next = index.getInt();
if (next <= fileNumber)
throw new IOException ("Circular reference in BinLog index " + fileNumber + "/" + next);
synchronized (mutex) {
Expand All @@ -154,7 +166,7 @@ protected boolean checkCutover (boolean findLast) throws IOException {
* @return RandomAccessFile opened in the default mode
* @throws IOException on error
*/
protected RandomAccessFile open (File dir, int fileNumber) throws IOException {
protected FileChannel open (Path dir, int fileNumber) throws IOException {
return open (dir, fileNumber, mode);
}

Expand All @@ -167,21 +179,19 @@ protected RandomAccessFile open (File dir, int fileNumber) throws IOException {
* @return RandomAccessFile opened in the specified mode
* @throws IOException on error
*/
private RandomAccessFile open (File dir, int fileNumber, String mode) throws IOException {
File file = new File (dir, toFileName(fileNumber));
RandomAccessFile raf = null;
if (!file.canRead()) {
switch (mode) {
case "r":
throw new EOFException(file + " not available");
case "rw":
raf = new RandomAccessFile(file, mode);
writeHeader(raf, fileNumber);
break;
private FileChannel open (Path dir, int fileNumber, OpenOption[] mode) throws IOException {
Path file = dir.resolve(toFileName(fileNumber));
FileChannel raf = null;
if (!Files.isReadable(file)) {
if (Arrays.stream(mode).anyMatch(WRITE::equals)) {
raf = FileChannel.open(file, mode);
writeHeader(raf, fileNumber);
} else {
throw new EOFException(file + " not available");
}
}
if (raf == null)
raf = new RandomAccessFile(file, mode);
raf = FileChannel.open(file, mode);
verifyHeader(file, raf);
return raf;
}
Expand All @@ -207,129 +217,132 @@ public int getFileNumber() {
* @throws IOException on error
*/
public boolean isClosed() throws IOException {
raf.seek(STATUS_OFFSET);
return Status.valueOf(raf.readShort()) == Status.CLOSED;
MappedByteBuffer status = raf.map(FileChannel.MapMode.READ_ONLY, STATUS_OFFSET, 2);
return Status.valueOf(status.getShort()) == Status.CLOSED;
}

protected long readTailOffset(RandomAccessFile raf) throws IOException {
protected long readTailOffset(FileChannel raf) throws IOException {
synchronized(mutex) {
raf.seek(TAIL_OFFSET);
return raf.readLong();
MappedByteBuffer tail = raf.map(FileChannel.MapMode.READ_ONLY, TAIL_OFFSET, 8);
return tail.getLong();
}
}

protected Status readStatus() throws IOException {
raf.seek(STATUS_OFFSET);
return Status.valueOf(raf.readShort());
MappedByteBuffer status = raf.map(FileChannel.MapMode.READ_ONLY, STATUS_OFFSET, 2);
return Status.valueOf(status.getShort());

}
protected void writeTailOffset(long pos) throws IOException {
synchronized(mutex) {
long currentTailOffset = readTailOffset(raf);
if (currentTailOffset > pos)
throw new IOException ("Invalid tailoffset " + fileNumber + "/" + pos + "/" + currentTailOffset);
raf.seek(TAIL_OFFSET);
raf.writeLong(pos);
MappedByteBuffer tail = raf.map(FileChannel.MapMode.READ_WRITE, TAIL_OFFSET, 8);
tail.putLong(pos);
}
}

protected int getFileNumber(String s) {
return s != null ? Integer.parseInt(s.substring(0,8)) : 0;
}

protected String getLastClosed (File dir) throws IOException {
protected String getLastClosed (Path dir) throws IOException {
for (String s : getFilesReversed(dir)) {
if (isClosed(new File(dir, s)))
if (isClosed(dir.resolve(s)))
return s;
}
return null;
}

protected String getFirst (File dir) {
return Arrays.stream(dir.list())
.filter(filePattern.asPredicate())
protected String getFirst (Path dir) throws IOException {
return StreamSupport.stream(Files.newDirectoryStream(dir, filePattern).spliterator(), false)
.map(Objects::toString)
.sorted(String::compareTo)
.findFirst()
.orElse(null);
}

private void verifyHeader(File file, RandomAccessFile raf) throws IOException {
private void verifyHeader(Path file, FileChannel raf) throws IOException {
synchronized(mutex) {
if (raf.length() < TAIL_OFFSET + Long.BYTES)
throw new IOException ("Invalid jPOS BinLog file " + fileNumber + " (" + file.toString() + ": " + raf.length() + "/" + TAIL_OFFSET + Long.BYTES + ")");
raf.seek(0);
int magic = raf.readInt();
if (raf.size() < TAIL_OFFSET + Long.BYTES)
throw new IOException ("Invalid jPOS BinLog file " + fileNumber + " (" + file.toString() + ": " + raf.size() + "/" + TAIL_OFFSET + Long.BYTES + ")");
MappedByteBuffer magicbuf = raf.map(FileChannel.MapMode.READ_ONLY, 0, 4);
int magic = magicbuf.getInt();
if (!(FILE_MAGIC == magic))
throw new IOException ("Invalid jPOS BinLog version " + fileNumber);
long pos = readTailOffset(raf);
if (pos < TAIL_OFFSET + Long.BYTES)
throw new IOException ("Invalid jPOS BinLog header " + fileNumber + "/" + file.toString());
long rafLength = raf.length();
long rafLength = raf.size();
if (pos > rafLength)
throw new IOException ("Truncated jPOS BinLog file " + fileNumber + " (" + pos + "/" + rafLength + ")");
}
}

private void lock (long timeout) throws IOException, InterruptedException {
long end = System.currentTimeMillis() + timeout;
FileLock lock = raf.getChannel().lock();
FileLock lock = raf.tryLock();
while (System.currentTimeMillis() < end) {
Thread.sleep (10);
}
lock.release();
}

private List<String> getFiles(File dir) {
return Arrays.stream(dir.list())
.filter(filePattern.asPredicate())
private List<String> getFiles(Path dir) throws IOException {
return StreamSupport.stream(Files.newDirectoryStream(dir, filePattern).spliterator(), false)
.map(Objects::toString)
.sorted(String::compareTo)
.collect(Collectors.toList());
}
private List<String> getFilesReversed(File dir) {
return Arrays.stream(dir.list())
.filter(filePattern.asPredicate())
private List<String> getFilesReversed(Path dir) throws IOException {
return StreamSupport.stream(Files.newDirectoryStream(dir, filePattern).spliterator(), false)
.map(Objects::toString)
.sorted((s1, s2) -> -s1.compareTo(s2))
.collect(Collectors.toList());
}

private boolean isClosed (File f) throws IOException {
if (f.exists()) {
try (RandomAccessFile raf = new RandomAccessFile(f, "r")) {
raf.seek(STATUS_OFFSET);
return Status.valueOf(raf.readShort()) == Status.CLOSED;
private boolean isClosed (Path f) throws IOException {
if (Files.exists(f)) {
try (FileChannel raf = FileChannel.open(f, READ)) {
MappedByteBuffer status = raf.map(FileChannel.MapMode.READ_ONLY, STATUS_OFFSET, 2);
return Status.valueOf(status.getShort()) == Status.CLOSED;
}
}
return false;
}

private void writeHeader (RandomAccessFile r, int i) throws IOException {
r.seek(0);
r.writeInt (FILE_MAGIC);
r.writeShort (VERSION);
r.writeShort (Status.OPEN.intValue());
r.writeLong(FIRST_EVENT_OFFSET);
r.writeInt(i); // this Log Number
r.writeInt(0); // next Log Number
r.write (new byte[RESERVED_LEN]);
private void writeHeader (FileChannel r, int i) throws IOException {
MappedByteBuffer header = r.map(FileChannel.MapMode.READ_WRITE, 0, 256);
header.putInt(FILE_MAGIC);
header.putShort(VERSION);
header.putShort(Status.OPEN.val);
header.putLong(FIRST_EVENT_OFFSET);
header.putInt(i); // this Log Number
header.putInt(0); // next Log Number
header.put(new byte[RESERVED_LEN]);
}

public enum Status {
OPEN(0),
CLOSED(1);
OPEN((short)0),
CLOSED((short)1);

private int val;
private short val;
private static Map<Integer,Status> map = new HashMap<>();
static {
for (Status s : Status.values()) {
map.put (s.intValue(), s);
}
}
Status (int val) {
Status (short val) {
this.val = val;
}
public int intValue() {
return val;
}
public short shortValue() {
return val;
}
public static Status valueOf (int i) {
return map.get(i);
}
Expand Down
Loading

0 comments on commit eb66e96

Please sign in to comment.