`

nutch源码阅读(1)-Crawl

 
阅读更多

        org.apache.nutch.crawl.Crawl实现的是一个完成的抓取过程,所以由它开始。

       

  /* Perform complete crawling and indexing (to Solr) given a set of root urls and the -solr
     parameter respectively. More information and Usage parameters can be found below. */
  public static void main(String args[]) throws Exception {
    Configuration conf = NutchConfiguration.create();
    int res = ToolRunner.run(conf, new Crawl(), args);
    System.exit(res);
  }

          Open Declarationorg.apache.nutch.util.NutchConfiguration

 

  /**
   * Add the standard Nutch resources to {@link Configuration}.
   * 
   * @param conf               Configuration object to which
   *                           configuration is to be added.
   */
  private static Configuration addNutchResources(Configuration conf) {
    conf.addResource("nutch-default.xml");
    conf.addResource("nutch-site.xml");
    return conf;
  }

       初始化时,加载nutch-default.xml,nutch-site.xml.

 

 

 @Override
  public int run(String[] args) throws Exception {
    if (args.length < 1) {
      System.out.println
      ("Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]");
      return -1;
    }
    Path rootUrlDir = null;
    Path dir = new Path("crawl-" + getDate());
    int threads = getConf().getInt("fetcher.threads.fetch", 10);
    int depth = 5;
    long topN = Long.MAX_VALUE;
    String solrUrl = null;
    //获得输入参数
    for (int i = 0; i < args.length; i++) {
      if ("-dir".equals(args[i])) {
        dir = new Path(args[i+1]);
        i++;
      } else if ("-threads".equals(args[i])) {
        threads = Integer.parseInt(args[i+1]);
        i++;
      } else if ("-depth".equals(args[i])) {
        depth = Integer.parseInt(args[i+1]);
        i++;
      } else if ("-topN".equals(args[i])) {
          topN = Integer.parseInt(args[i+1]);
          i++;
      } else if ("-solr".equals(args[i])) {
        solrUrl = args[i + 1];
        i++;
      } else if (args[i] != null) {
        rootUrlDir = new Path(args[i]);
      }
    }
    
    JobConf job = new NutchJob(getConf());

    if (solrUrl == null) {
      LOG.warn("solrUrl is not set, indexing will be skipped...");
    }

    FileSystem fs = FileSystem.get(job);

    if (LOG.isInfoEnabled()) {
      LOG.info("crawl started in: " + dir);
      LOG.info("rootUrlDir = " + rootUrlDir);
      LOG.info("threads = " + threads);
      LOG.info("depth = " + depth);      
      LOG.info("solrUrl=" + solrUrl);
      if (topN != Long.MAX_VALUE)
        LOG.info("topN = " + topN);
    }
    //建立爬取过程中存放信息的文件夹,对应着各个阶段
    Path crawlDb = new Path(dir + "/crawldb");  //
    Path linkDb = new Path(dir + "/linkdb");
    Path segments = new Path(dir + "/segments");
    Path indexes = new Path(dir + "/indexes");
    Path index = new Path(dir + "/index");
    //初始化配置信息
    Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDate());
    Injector injector = new Injector(getConf());
    Generator generator = new Generator(getConf());
    Fetcher fetcher = new Fetcher(getConf());
    ParseSegment parseSegment = new ParseSegment(getConf());
    CrawlDb crawlDbTool = new CrawlDb(getConf());
    LinkDb linkDbTool = new LinkDb(getConf());
      
    // initialize crawlDb 初始化crawlDb
    injector.inject(crawlDb, rootUrlDir);
    int i;
    for (i = 0; i < depth; i++) {             // generate new segment 生成新的抓取队列
      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
          .currentTimeMillis()); 
      if (segs == null) {
        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
        break;
      }
      fetcher.fetch(segs[0], threads);  // fetch it 抓取
      if (!Fetcher.isParsing(job)) {
        parseSegment.parse(segs[0]);    // parse it, if needed 解析
      }
      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb 更新crawlDb数据库
    }
    if (i > 0) {
      linkDbTool.invert(linkDb, segments, true, true, false); // invert links 计算反向链接

      if (solrUrl != null) {
        // index, dedup & merge  使用solr建立索引
        FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));
        SolrIndexer indexer = new SolrIndexer(getConf());
        indexer.indexSolr(solrUrl, crawlDb, linkDb, 
          Arrays.asList(HadoopFSUtil.getPaths(fstats)));
        SolrDeleteDuplicates dedup = new SolrDeleteDuplicates();
        dedup.setConf(getConf());
        dedup.dedup(solrUrl);
      }
      
    } else {
      LOG.warn("No URLs to fetch - check your seed list and URL filters.");
    }
    if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); }
    return 0;
  }

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics