RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
/** rotate file with Date,every month create a new file
* format:yyyymm.txt
*/
FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
RotationAction action = new NewFileAction();
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://127.0.0.1:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(action);
然后分别来看各个策略的类。
FileRotationPolicy
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 计数以改变Hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更改名字取决于 “TimesFileNameFormat”
* 这个类是线程安全
*/
public class CountStrRotationPolicy implements FileRotationPolicy {
private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");
private String date = null;
private int count = 0;
public CountStrRotationPolicy(){
this.date = df.format(new Date());
// this.date = df.format(new Date());
}
/**
* Called for every tuple the HdfsBolt executes.
*
* @param tuple The tuple executed.
* @param offset current offset of file being written
* @return true if a file rotation should be performed
*/
@Override
public boolean mark(Tuple tuple, long offset) {
count ++;
if(count == 10) {
System.out.print("num :" +count + " ");
count = 0;
return true;
}
else {
return false;
}
}
/**
* Called after the HdfsBolt rotates a file.
*/
@Override
public void reset() {
}
@Override
public FileRotationPolicy copy() {
return new CountStrRotationPolicy();
}
}
FileNameFormat
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
/**
* 决定重新写入文件时候的名字
* 这里会返回是第几次转换写入文件,将这个第几次做为文件名
*/
public class TimesFileNameFormat implements FileNameFormat {
//默认路径
private String path = "/storm";
//默认后缀
private String extension = ".txt";
private Long times = new Long(0);
public TimesFileNameFormat withPath(String path){
this.path = path;
return this;
}
@Override
public void prepare(Map conf, TopologyContext topologyContext) {
}
@Override
public String getName(long rotation, long timeStamp) {
times ++ ;
//返回文件名,文件名为更换写入文件次数
return times.toString() + this.extension;
}
public String getPath(){
return this.path;
}
}
RotationAction
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
/**
当转换写入文件时候调用的 hook ,这里仅写入日志。
*/
public class NewFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
LOG.info("Hdfs change the written file!!");
return;
}
}