Writing into multiple files using multithreading and limiting the write count to 20000 in each file
I have a complicated situation here,..
I am developing an multithreaded application There are 5 threads which i am creating and using these threads i want to write a program to write into 5 files simultaneously and also limiting the number of data to 20000. I am getting data from DB. The data is different for all the 5 threads. So the correct data should go into each file. Also, each thread also needs to access correct filewriter inorder to write correct data in correct file and have counter for each file.
public class AuditMissingRelationshipGuidEntitiesToXml
{
private Logger logger = ERDLoggerUtility.getLogger( this.getClass().getName());
private DbPartitionedConnections myDbPConn = null;
private RelationshipRepositoryDao myRRDao = null;
private MrdPartitionedConnections myMRDPartitionConn = null;
private MasterRecordDao myMRDRDao = null;
private String myCollectionName = null;
private String myOutputDir = null;
private String myOutputFileName = null;
private EntityFileWriter myEntityFileWriter = null;
private NovusDocumentGuidCaseFinder novusDocGuidCaseFinder = null;
//private int entityFileWriterCount = 0;
private static List <String> collections = new ArrayList<String>();
private static final String DBDRIVER = "oracle.jdbc.driver.OracleDriver";
private BasicDataSource ds;
private static final int NTHREADS = 30;
private void setupDatabases() throws Exception {
ErdProperties props = ErdProperties.getProps();
this.myDbPConn = new RrdPartitionedConnections();
this.myRRDao = new OracleRelationshipRepositoryDao(this.myDbPConn);
this.myMRDPartitionConn = new MrdPartitionedConnections();
this.myMRDRDao = new OracleMasterRecordDao(this.myMRDPartitionConn);
ds = new BasicDataSource();
ds.setDriverClassName(DBDRIVER);
ds.setUrl(props.getProperty("wfdb.uri"));
ds.setUsername(props.getProperty("wfdb.user"));
ds.setPassword(props.getProperty("wfdb.password"));
novusDocGuidCaseFinder = new NovusDocumentGuidCaseFinder(ds);
novusDocGuidCaseFinder.setNovusCollection(myCollectionName);
}
private void setCommandLineProperties(String[] theArgs) throws Exception {
if (theArgs == null || theArgs.length != 3 ||
theArgs[0] == null || theArgs[0].length() == 0 ||
theArgs[1] == null || theArgs[1].length() == 0 ||
theArgs[2] == null || theArgs[2].length() == 0) {
System.err.println("You must provide 3 arguments! See below.");
System.err.println("\t1st Argument) -- Output directory location. ex: \"/ct-data/RRDGuids\"");
System.err.println("\t2nd Argument) -- Output file name. ex: \"rrd_seq_guid\"");
System.err.println("\t3rd Argument) -- Collection name. ex: \"N_ARREST\"");
throw new Exception("Invalid Arguments, see console output.");
}
else {
// Set the output directory location
this.myOutputDir = theArgs[0];
System.out.println("setCommandLineProperties() -- 1st Argument) Input directory location = [" + this.myOutputDir + "]");
// Set the output file name
this.myOutputFileName = theArgs[1];
System.out.println("setCommandLineProperties() -- 1st Argument) Input file name = [" + this.myOutputFileName + "]");
// Set the output file name
//this.myCollectionName = theArgs[2];
//System.out.println("setCommandLineProperties() -- 3nd Argument) Collection name = [" + this.myCollectionName + "]");
}
}
/**
* Add the rel seq and entity guid.
*
* @param theRelSeq
* @param theEntityGuid
* @throws Exception
*/
private synchronized void addEntity(String theRelSeq, String theEntityGuid,
String theRelGuid, EntityFileWriter myEntityFileWriter) throws Exception {
/*
if (this.myEntityFileWriter == null) {
System.out.println("file for :"+collectionName);
this.myEntityFileWriter = new EntityFileWriter(new File(myOutputDir), myOutputFileName, collectionName, System.currentTimeMillis());
}
*/
myEntityFileWriter.addEntity(theRelSeq, theEntityGuid, theRelGuid);
}
private Set<String> locateEntityGuidsInMRD(List<String> theRRDGuids, String collectionName) throws Exception {
Set<String> foundEntityGuids = new HashSet<String>();
System.out.println("Searching MRD for entity guids (AKA: BASE_GUID)... for : " + collectionName);
MasterRecordDao myMRDRDao = null;
synchronized(collectionName.intern()){
myMRDRDao = new OracleMasterRecordDao(this.myMRDPartitionConn);
/*for(String li : theRRDGuids){
System.out.println(li);
}*/
List<PersonEntity> personEntities = myMRDRDao.getByEntityGuid(theRRDGuids);
System.out.println("Found: " + (personEntities == null ? "0" : personEntities.size()) + " entities in MRD for :" + collectionName);
if (personEntities != null) {
for (PersonEntity pe : personEntities) {
foundEntityGuids.add(pe.getEntityGuid());
}
}
this.notify();
}
myMRDRDao=null;
return foundEntityGuids;
}
public void determineOrphanRelationships(String collectionName) throws Exception {
Long startTime = null;
Long endTime = null;
startTime = System.currentTimeMillis();
System.out.println("Started on: " + new Timestamp(startTime).toString() + "::" + collectionName);
synchronized(collectionName.intern()){
EntityFileWriter myEntityFileWriter = null;
Map<EntityDocRelationship, String> entityDocRels = new HashMap<EntityDocRelationship, String>();
EntityDocRelationshipIterator entityDocRelIter = myRRDao.getByCollection(collectionName);
int entityFileWriterCount = 0;
while (entityDocRelIter.hasNext()) {
/*
* Reference: OracleRelationshipRepositoryDao.populateFrom()
*
* EntityDocRelationship.getPk() = EntityDocRelationship.REL_SEQ (1st column in prepared statement)
* EntityDocRelationship.getRelGuid() = EntityDocRelationship.REL_GUID (4th position in prepared statement)
* EntityDocRelationship.getEntityGuid() = EntityDocRelationship.BASE_GUID (5th position in prepared statement)
* EntityDocRelationship.getDocGuid() = EntityDocRelationship.TARGET_GUID (6th position in prepared statement)
*/
if(entityDocRels.size() < 20000) {
EntityDocRelationship entityDocRel = (EntityDocRelationship)entityDocRelIter.next();
entityDocRels.put(entityDocRel, entityDocRel.getEntityGuid());
}
else{
entityFileWriterCount = printNotFoundEntityGuids(entityDocRels, collectionName,
entityFileWriterCount, myEntityFileWriter, false);
}
this.wait();
}
if(entityDocRels.size() > 0){
entityFileWriterCount = printNotFoundEntityGuids(entityDocRels, collectionName,
entityFileWriterCount, myEntityFileWriter, true);
endTime = System.currentTimeMillis();
entityFileWriterCount = 0;
System.out.println("Ended on:" + endTime);
System.out.println("Run time: " + (endTime - startTime));
}
}
}
private int printNotFoundEntityGuids(Map<EntityDocRelationship, String> entityDocRels, String collectionName,
int entityFileWriterCount, EntityFileWriter myEntityFileWriter, boolean closeWriter) throws Exception{
// Do work with the distinct set of entity guids.
Set<String> foundEntityGuids = locateEntityGuidsInMRD(new ArrayList<String>(entityDocRels.values()), collectionName);
System.out.println(foundEntityGuids.size());
System.out.println("Printing not found entity guid list...");
for (Map.Entry<EntityDocRelationship, String> entry : entityDocRels.entrySet()) {
if (!foundEntityGuids.contains(entry.getValue())) {
synchronized(this){
if (entityFileWriterCount == 0) {
System.out.println("file for :"+collectionName);
myEntityFileWriter = new EntityFileWriter(new File(myOutputDir), myOutputFileName,
collectionName, System.currentTimeMillis());
}
entityFileWriterCount++;
addEntity(String.valueOf(entry.getKey().getPk()), entry.getValue(), entry.getKey().getRelGuid(), myEntityFileWriter);
System.out.println(collectionName + " :: Relationship pk: " + entry.getKey().getPk() + " is orphan to Entity: " + entry.getValue());
System.out.println("Finished printing.");
if(closeWriter || entityFileWriterCount >= 10){
System.out.println("Closing file writer...");
if (this.myEntityFileWriter != null) {
this.myEntityFileWriter.endDocument();
this.myEntityFileWriter=null;
}
System.out.println("File writer closed.");
}
}
}
}
return entityFileWriterCount;
}
public void deleteRelationships(final String collectionName) throws Exception {
SAXParserFactory factory = SAXParserFactory.newInstance();
SAXParser saxParser = factory.newSAXParser();
File outputDir = new File(this.myOutputDir);
FilenameFilter filter = new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("rrd_seq_guid_"+collectionName);
}
};
String[] files = outputDir.list(filter);
for (String file : files) {
logger.info("Getting relationships from file: " + file + "...");
EntityFileHandler parsedFile = new EntityFileHandler();
saxParser.parse(this.myOutputDir + file, parsedFile);
Map<String, String> relSeqsGuids = new HashMap<String, String>();
for (EntityFileHandler.EntityFile entityFile : parsedFile.getDocuments()){
relSeqsGuids.put(entityFile.getRelSeq(), entityFile.getRelGuid());
}
/*logger.info("Deleting relationship sequence guids...");
myRRDao.deleteRelSeqGuids(relSeqsGuids.keySet());
logger.info("Relationship sequence guids deleted.");*/
/*
logger.info("Validating relationship guids in Novus...");
List<String> validNovusRelGuids = novusDocGuidCaseFinder.search(
relSeqsGuids.values().toArray(new String[relSeqsGuids.values().size()]));
logger.info("Relationship guids are valid in Novus.");*/
List<String> validNovusRelGuids = new ArrayList<String>();
validNovusRelGuids.addAll(relSeqsGuids.values());
logger.info("Sending delete files to Novus...");
sendDeleteFileToNovus(file, validNovusRelGuids, collectionName);
logger.info("Delete files sent.");
}
}
/**
* This method will send delete files to Novus based on the list
* of relationship Guids passed in.
*
* Example delete file:
*
* <deletes>
* <collection>
* <name>...</name>
* <guid></guid>
* .
* . N times
* .
* <guid></guid>
* </collection>
* </deletes>
*
* @param theRelGuids - ERD Relationship Guids to send a delete file for.
* @throws Exception
*/
public void sendDeleteFileToNovus(String theFile, List<String> theRelGuids, String collectionName) {
logger.info("Sending delete files to Novus...");
try{
Document document = new DocumentImpl();
// Create the Delete Element (aka: root): <Deletes>
Element deleteElement = document.createElement("deletes");
/*
* Create the Collection Element <Collection> and then add it to the Deletes Element.
*
* <deletes>
* <collection></collection>
* </deletes>
*/
Element collectionElement = document.createElement("collection");
deleteElement.appendChild(collectionElement);
/*
* Create the Name Element <Name> with the Collection name
* and then add it to the Collection Element.
*
* <deletes>
* <collection>
* <name>...</Name>
* </collection>
* </deletes>
*/
Element nameElement = document.createElementNS(null, "name");
nameElement.appendChild(document.createTextNode(collectionName));
collectionElement.appendChild(nameElement);
if (theRelGuids != null && theRelGuids.size() > 0) {
/*
* For each Relationship Guid add a Guid Element to the Collection
* Element with the Relationship Guid.
*
* <deletes>
* <collection>
* <name>...</name>
* <guid>...</guid>
* </collection>
* </deletes>
*/
for(String relGuid : theRelGuids){
Element relGuidElement = document.createElementNS(null, "guid");
relGuidElement.appendChild(document.createTextNode(relGuid));
collectionElement.appendChild(relGuidElement);
}
}
logger.info("Add delete element text content: " + deleteElement.getTextContent());
document.appendChild(deleteElement);
String deleteFile =
File.separator + "ct-data" +
File.separator + "erd" +
File.separator + "wip" +
File.separator + "relAtishDeletes" +
File.separator + theFile;
logger.info("Creating Novus delete file: " + deleteFile + "...");
FileOutputStream outStream = new FileOutputStream(deleteFile);
OutputFormat of = new OutputFormat("XML","ISO-8859-1",true);
of.setIndent(1);
of.setIndenting(true);
XMLSerializer serializer = new XMLSerializer(outStream, of);
serializer.asDOMSerializer();
serializer.serialize(document.getDocumentElement());
outStream.close();
logger.info("Delete file: " + deleteFile + " created.");
}
ca开发者_开发知识库tch(Exception e){
e.printStackTrace();
}
/*MatchManageCommand mmc = new MatchManageCommand();
String publishXML = mmc.getPublishXml(-1, this.myCollectionName, "-1", theFile);
mmc.ms.putMessageOnQueue(publishXML, 2);*/
}
public void getCollections()
{
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;
try {
connection = ds.getConnection();
String query = "select novus_collection from erd_workflow.collection_info " +
"where Novus_collection in ('N-SALE', 'N-UCC00', 'N-UCC01', 'N-UCC02', 'N-UCC03') order by novus_collection asc";
statement = connection.createStatement();
resultSet = statement.executeQuery(query);
while (resultSet.next()) {
// Store the Novus Collection and associated Product.
collections.add(resultSet.getString(1));
}
}
catch (SQLException sqle) {
sqle.printStackTrace();
} catch(Exception e){
e.printStackTrace();
}
finally {
if (resultSet != null) {
try {
resultSet.close();
statement.close();
connection.close();
} catch (SQLException sqle) {
sqle.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception{
AuditMissingRelationshipGuidEntitiesToXml audit = new AuditMissingRelationshipGuidEntitiesToXml();
ExecutorService executor = Executors.newFixedThreadPool(NTHREADS); // makes thread pool
try {
audit.setupDatabases();
audit.getCollections();
audit.setCommandLineProperties(args);
//audit.runThreads(0, collections.size());
int start = 0;
while(start < collections.size()){
Runnable worker = audit.new RunThreads(audit, collections.get(start) );
/*
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread*/
executor.execute(worker);
start++; // increment to get next collection
}
/*
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be
* accepted. Invocation has no additional effect if already shut
* down. */
executor.shutdown();
System.out.println("Finished all threads");
// System.out.println("done with current threads..... creating new threads... ");
}catch(InterruptedException e) {
System.out.println("Interrupted :");
e.printStackTrace();
}catch(RejectedExecutionException e){
e.printStackTrace();
}
}
public String getMyCollectionName() {
return myCollectionName;
}
public void setMyCollectionName(String myCollectionName) {
this.myCollectionName = myCollectionName;
}
public class RunThreads implements Runnable{
AuditMissingRelationshipGuidEntitiesToXml auditThread;
private String myCollection;
private int sleepTime; // random sleep time for thread
private Random generator = new Random();
public RunThreads(AuditMissingRelationshipGuidEntitiesToXml audit, String collection){
try{
this.auditThread = audit;
myCollection = collection;
// pick random sleep time between 0 and 5 seconds
sleepTime = generator.nextInt(20000);
}catch(Exception e){
System.out.println("Interrupted");
}
}
/* Synchronize the AuditMissingRelationshipGuidEntitiesToXml object and call the methods on the particular object */
public void run() {
synchronized(this.myCollection.intern()) { // synchronized block
try {
//Thread.sleep(sleepTime);
System.out.println("Collection started : "+ this.myCollection);
this.auditThread.determineOrphanRelationships(this.myCollection);
this.auditThread.deleteRelationships(this.myCollection);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
精彩评论