# Copyright (c) Facebook, Inc. and its affiliates.
# VLPart: Going denser with open-vocabulary part segmentation
# Written by Peize Sun and Shoufa Chen
from detectron2.layers import ShapeSpec
from detectron2.modeling.box_regression import Box2BoxTransform
from detectron2.modeling.matcher import Matcher
from detectron2.modeling.poolers import ROIPooler
from detectron2.modeling.roi_heads.box_head import build_box_head
from detectron2.modeling.roi_heads.cascade_rcnn import CascadeROIHeads, _ScaleGradient
from detectron2.modeling.roi_heads.fast_rcnn import fast_rcnn_inference
from detectron2.structures import Boxes, Instances
from .vlpart_fast_rcnn import VLMFastRCNNOutputLayers
[docs]def build_vlpart_roi_heads(cfg, input_shape):
return CascadeVLMROIHeads(cfg, input_shape)
[docs]class CascadeVLMROIHeads(CascadeROIHeads):
@classmethod
def _init_box_head(self, cfg, input_shape):
# fmt: off
in_features = cfg.MODEL.ROI_HEADS.IN_FEATURES
pooler_resolution = cfg.MODEL.ROI_BOX_HEAD.POOLER_RESOLUTION
pooler_scales = tuple(1.0 / input_shape[k].stride for k in in_features)
sampling_ratio = cfg.MODEL.ROI_BOX_HEAD.POOLER_SAMPLING_RATIO
pooler_type = cfg.MODEL.ROI_BOX_HEAD.POOLER_TYPE
cascade_bbox_reg_weights = cfg.MODEL.ROI_BOX_CASCADE_HEAD.BBOX_REG_WEIGHTS
cascade_ious = cfg.MODEL.ROI_BOX_CASCADE_HEAD.IOUS
assert len(cascade_bbox_reg_weights) == len(cascade_ious)
assert cfg.MODEL.ROI_BOX_HEAD.CLS_AGNOSTIC_BBOX_REG, \
"CascadeROIHeads only support class-agnostic regression now!"
assert cascade_ious[0] == cfg.MODEL.ROI_HEADS.IOU_THRESHOLDS[0]
# fmt: on
# If StandardROIHeads is applied on multiple feature maps (as in FPN),
# then we share the same predictors and therefore the channel counts must be the same
in_channels = [input_shape[f].channels for f in in_features]
# Check all channel counts are equal
assert len(set(in_channels)) == 1, in_channels
in_channels = in_channels[0]
box_pooler = ROIPooler(
output_size=pooler_resolution,
scales=pooler_scales,
sampling_ratio=sampling_ratio,
pooler_type=pooler_type,
)
pooled_shape = ShapeSpec(
channels=in_channels, height=pooler_resolution, width=pooler_resolution
)
box_heads, box_predictors, proposal_matchers = [], [], []
for match_iou, bbox_reg_weights in zip(cascade_ious, cascade_bbox_reg_weights):
box_head = build_box_head(cfg, pooled_shape)
box_heads.append(box_head)
box_predictors.append(
VLMFastRCNNOutputLayers(
box_head.output_shape,
box2box_transform=Box2BoxTransform(weights=bbox_reg_weights),
)
)
proposal_matchers.append(Matcher([match_iou], [0, 1], allow_low_quality_matches=False))
return {
"box_in_features": in_features,
"box_pooler": box_pooler,
"box_heads": box_heads,
"box_predictors": box_predictors,
"proposal_matchers": proposal_matchers,
}
[docs] def forward(self, images, features, proposals, text_embed):
del images
assert not self.training, 'only support inference now'
pred_instances = self._forward_box(
features, proposals, text_embed=text_embed)
pred_instances = self.forward_with_given_boxes(features, pred_instances)
return pred_instances, {}
def _forward_box(self, features, proposals, text_embed):
features = [features[f] for f in self.box_in_features]
head_outputs = [] # (predictor, predictions, proposals)
prev_pred_boxes = None
image_sizes = [x.image_size for x in proposals]
for k in range(self.num_cascade_stages):
if k > 0:
proposals = self._create_proposals_from_boxes(
prev_pred_boxes, image_sizes)
if self.training and ann_type in ['box', 'part']:
proposals = self._match_and_label_boxes(
proposals, k, targets)
predictions = self._run_stage(features, proposals, k, text_embed)
prev_pred_boxes = self.box_predictor[k].predict_boxes(
(predictions[0], predictions[1]), proposals)
head_outputs.append((self.box_predictor[k], predictions, proposals))
assert not self.training, 'only support inference now'
# Each is a list[Tensor] of length #image. Each tensor is Ri x (K+1)
scores_per_stage = [h[0].predict_probs(h[1], h[2]) for h in head_outputs]
scores = [
sum(list(scores_per_image)) * (1.0 / self.num_cascade_stages)
for scores_per_image in zip(*scores_per_stage)
]
predictor, predictions, proposals = head_outputs[-1]
boxes = predictor.predict_boxes((predictions[0], predictions[1]), proposals)
pred_instances, _ = fast_rcnn_inference(
boxes,
scores,
image_sizes,
predictor.test_score_thresh,
predictor.test_nms_thresh,
predictor.test_topk_per_image,
)
return pred_instances
def _create_proposals_from_boxes(self, boxes, image_sizes):
boxes = [Boxes(b.detach()) for b in boxes]
proposals = []
for boxes_per_image, image_size in zip(boxes, image_sizes):
boxes_per_image.clip(image_size)
prop = Instances(image_size)
prop.proposal_boxes = boxes_per_image
proposals.append(prop)
return proposals
def _run_stage(self, features, proposals, stage, text_embed):
pool_boxes = [x.proposal_boxes for x in proposals]
box_features = self.box_pooler(features, pool_boxes)
box_features = _ScaleGradient.apply(box_features, 1.0 / self.num_cascade_stages)
box_features = self.box_head[stage](box_features)
return self.box_predictor[stage](box_features, text_embed)