Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

import functools 

import inspect 

import logging 

import operator 

from abc import abstractmethod 

 

from django.contrib.auth.models import Permission 

from django.contrib.contenttypes.models import ContentType 

from django.db.models import Exists, OuterRef, Value, ExpressionWrapper, BooleanField 

from django.db.models.query import EmptyQuerySet 

from guardian.shortcuts import get_objects_for_user 

from rest_condition import Condition 

from rest_framework import permissions 

 

log = logging.getLogger(__file__) 

 

 

class BasePermission(permissions.BasePermission): 

""" 

 

A base class for all rest_delegated_permissions. Provides two methods: 

* ``has_object_permission`` to decide if user has a permission on an object for the given request operation 

* ``filter`` that filters the queryset so that user receives only those objects that he has the right to 

 

Note: ``filter()`` method returns generator of querysets, not a single queryset! 

""" 

 

@abstractmethod 

def has_object_permission(self, request, view, obj): 

""" 

checks if the user permforming request has permission to make operation upon obj in the context of given view 

 

:param request: REST request the user is performing. Use request.user to get the user, request.method for 

the method user wants to perform on the object 

:param view: Either the original ViewSet for the request or synthetic viewset for delegated permissions. 

:param obj: The object user wants to access 

:return: boolean value, True if user should be given the permission 

""" 

return False 

 

@abstractmethod 

def has_permission(self, request, view): 

""" 

Return `True` if permission is granted, `False` otherwise. 

""" 

return False 

 

@abstractmethod 

def filter(self, rest_permissions, filtered_queryset, user, action): 

""" 

returns a generator of querysets user has access to for the given operation 

 

:param rest_permissions: An instance of RestPermissions class, giving the "permission context" of the filter 

:param filtered_queryset: The original queryset of objects that is to be filtered 

:param user: Django user performing the action 

:param action: view, change, delete 

:return: a generator (yield ...) returning one or more querysets. If more than one queryset 

is returned, they are later combined via "|" operator 

""" 

yield filtered_queryset.model.objects.none() 

 

def get_queryset_filters(self, rest_permissions, qs, user, action): 

""" 

a helper method to wrap the querysets returned by ``filter`` with a ``__extra_condition`` annotation, which 

is required for the delegated permissions to work. The reason is that we use Exists subquery with ``OuterRef`` 

that needs to put the exists query into an annotation and later filter on that annotation. As we need to 

have all the querysets returning the same columns (otherwise django silently ignores some of the querysets), 

we need to wrap all the querysets with an extra ``__extra_condition`` annotation defaulting to ``True``. 

""" 

returns_extra_condition = getattr(self, 'returns__extra_condition', False) 

# because of filtering on delegated field which requires OuterRef we need to add 

# an extra annotation. If the implementor of the BasePermission does not supply 

# it, just add a True. 

for qs in self.filter(rest_permissions, qs, user, action): 

if returns_extra_condition: 

yield qs 

else: 

yield qs.annotate(__extra_condition=ExpressionWrapper(Value(True), output_field=BooleanField())) 

 

 

class DelegatedPermission(BasePermission): 

""" 

 

""" 

 

class DelegatedView: 

def __init__(self, rest_permissions, model_class, request, action): 

self.request = request 

self.action = action 

self.rest_permissions = rest_permissions 

self.model_class = model_class 

 

def get_queryset(self): 

return self.rest_permissions.create_queryset_factory(self.model_class)(self.request.user, self.action) 

 

def __init__(self, rest_permissions, *delegated_fields, mapping=None, delegated_objects_getter=None, 

allowed_safe_actions=('get', 'list')): 

""" 

 

:param rest_permissions: 

:param delegated_fields: 

:param mapping: either a string (all actions will be mapped to this string), dict or 

callable taking an action and returning mapped action name 

:param delegated_objects_getter: 

:param allowed_safe_actions: If set, these actions will be granted initial access for the processing. 

Later filtering is then used for estimating if user has permissions. 

""" 

self.rest_permissions = rest_permissions 

self.delegated_fields = delegated_fields 

self.mapping = mapping 

self.delegated_objects_getter = delegated_objects_getter 

self.allowed_safe_actions = allowed_safe_actions 

 

def has_object_permission(self, request, view, obj): 

return self._internal_has_permission(request, view, obj) 

 

def _internal_has_permission(self, request, view, obj): 

getter = self.delegated_objects_getter or DelegatedPermission.get_delegated_objects 

for delegated_obj in getter(request, view, obj, self.delegated_fields): 

if not delegated_obj: 

continue 

delegated_permissions = self.rest_permissions.permissions_for_model(delegated_obj) 

delegated_action = self._get_delegated_action(view.action) 

delegated_view = DelegatedPermission.DelegatedView( 

self.rest_permissions, type(delegated_obj), request, delegated_action) 

 

if delegated_permissions.has_object_permission(request, delegated_view, delegated_obj): 

return True 

return False 

 

def has_permission(self, request, view): 

if view.action in self.allowed_safe_actions: 

return True 

return self._internal_has_permission(request, view, None) 

 

def _get_delegated_action(self, action): 

if self.mapping: 

if callable(self.mapping): 

return self.mapping(action) 

if isinstance(self.mapping, str): 

action = self.mapping 

else: 

action = self.mapping[action] 

return action 

 

@staticmethod 

def get_delegated_objects(request, view, obj, field_names): 

if not obj: 

return 

 

for perm_field_name in field_names: 

perm_field = obj._meta.get_field(perm_field_name) 

delegated_objects = [] 

if perm_field.one_to_many or perm_field.many_to_many: 

delegated_objects = getattr(obj, perm_field_name).all() 

elif hasattr(obj, '%s_id' % perm_field_name) and getattr(obj, '%s_id' % perm_field_name): 

delegated_objects = [getattr(obj, perm_field_name)] 

for delegated_obj in delegated_objects: 

yield delegated_obj 

 

def get_queryset_filters(self, rest_permissions, qs, user, action): 

for delegated_field_name in self.delegated_fields: 

fld = qs.model._meta.get_field(delegated_field_name) 

related_model = fld.related_model 

related_model_qs = \ 

rest_permissions.create_queryset_factory(related_model)(user, self._get_delegated_action(action)) 

 

filtered_qs = \ 

qs.annotate( 

__extra_condition=Exists(related_model_qs.filter(pk=OuterRef(delegated_field_name)))).filter( 

__extra_condition=True) 

yield filtered_qs 

 

 

def kwargs_delegated_object_getter(field_name_to_kwarg_name_map, 

instantiator=lambda clazz, value, fldname: clazz.objects.get(pk=value)): 

def kwargs_delegated_object_getter_func(request, view, obj, delegated_fields): 

model = view.get_queryset().model 

for field_name, kwarg in field_name_to_kwarg_name_map.items(): 

kwarg_value = view.kwargs[kwarg] 

related_model_class = model._meta.get_field(field_name).related_model 

yield instantiator(related_model_class, kwarg_value, field_name) 

pass 

 

return kwargs_delegated_object_getter_func 

 

 

class RestrictedViewDjangoModelPermissions(permissions.DjangoModelPermissions): 

perms_map = {} 

perms_map.update(permissions.DjangoModelPermissions.perms_map) 

perms_map['GET'] = ['%(app_label)s.view_%(model_name)s'] 

perms_map['OPTIONS'] = ['%(app_label)s.view_%(model_name)s'] 

perms_map['HEAD'] = ['%(app_label)s.view_%(model_name)s'] 

 

 

class RestrictedViewDjangoObjectPermissions(permissions.DjangoObjectPermissions): 

perms_map = {} 

perms_map.update(permissions.DjangoModelPermissions.perms_map) 

perms_map['GET'] = ['%(app_label)s.view_%(model_name)s'] 

perms_map['OPTIONS'] = ['%(app_label)s.view_%(model_name)s'] 

perms_map['HEAD'] = ['%(app_label)s.view_%(model_name)s'] 

 

def has_object_permission(self, request, view, obj): 

queryset = self._queryset(view) 

model_cls = queryset.model 

user = request.user 

perms = self.get_required_object_permissions(request.method, model_cls) 

return user.has_perms(perms, obj) 

 

 

class DjangoCombinedPermission: 

 

def __init__(self): 

self.model_permissions = RestrictedViewDjangoModelPermissions() 

self.object_permissions = RestrictedViewDjangoObjectPermissions() 

 

def has_object_permission(self, request, view, obj): 

return self.model_permissions.has_permission(request, view) or \ 

self.object_permissions.has_object_permission(request, view, obj) 

 

def get_queryset_filters(self, rest_permissions, qs, user, action): 

known_operations = {'retrieve': 'view', 'view': 'view', 'update': 'change', 'change': 'change', 

'delete': 'delete', 

'destroy': 'delete', 'partial_update': 'change'} 

operation = known_operations.get(action, action) 

ct = ContentType.objects.get_for_model(qs.model) 

perm = '%s_%s' % (operation, ct.model) 

 

if DjangoCombinedPermission.check_permission_exists(ct, perm): 

if user.has_perm(perm): 

yield qs.annotate(__extra_condition=ExpressionWrapper(Value(True), output_field=BooleanField())) 

else: 

# add queryset for guardian 

guardian_qs = get_objects_for_user(user, [perm], qs) \ 

.annotate(__extra_condition=ExpressionWrapper(Value(True), output_field=BooleanField())) 

yield guardian_qs 

 

def has_permission(self, request, view): 

# let it pass, filter() method will handle rest apart of "create" which has to be dealt with in the code 

return True 

 

@staticmethod 

def check_permission_exists(ct, perm_name): 

return Permission.objects.filter(content_type=ct, codename=perm_name).exists() 

 

 

class RestPermissions: 

def __init__(self, default_queryset_factory=lambda model: model.objects.all(), 

initial_permissions=None, 

add_django_permissions=False): 

self.default_queryset_factory = default_queryset_factory 

self.add_django_permissions = add_django_permissions 

self.model_permission_map = {} 

if initial_permissions: 

self.update_permissions(initial_permissions) 

 

def update_permissions(self, model_permission_map): 

for model_class, model_permissions in model_permission_map.items(): 

self.set_model_permissions(model_class, model_permissions) 

 

def set_model_permissions(self, model_class, model_permissions, overwrite=False, force_add_django_permissions=None): 

if model_class in self.model_permission_map and not overwrite: 

raise AttributeError('Permissions for %s already registered' % model_class) 

 

if not isinstance(model_permissions, list) and not isinstance(model_permissions, tuple): 

model_permissions = [model_permissions] 

 

if self.add_django_permissions and force_add_django_permissions is None or force_add_django_permissions: 

perms = list(model_permissions) 

perms.insert(0, DjangoCombinedPermission()) 

else: 

perms = model_permissions 

self.model_permission_map[model_class] = Condition.Or(*perms) 

 

def filtered_model_queryset(self, model_class, root_queryset, user, action): 

perms = self.model_permission_map[model_class].perms_or_conds 

for perm in perms: 

yield from self._permission_to_queryset(perm, root_queryset, user, action) 

 

def _permission_to_queryset(self, perm, root_queryset, user, action): 

""" 

Converts permission into a generator of querysets. Ignores DjangoModelPermissions and DjangoObjectPermissions, 

because they are handled by guardian in the caller function (model_filter) 

 

:param perm: the permission being converted 

:param root_queryset: queryset for the model being checked 

:param user: user for which the check is made 

:param action: action being performed (such as view, change, ...) 

:return: a generator of querysets 

""" 

if isinstance(perm, Condition): 

if perm.negated: 

log.error('Negated subcondition is not implemented in .queryset(), expect narrower results') 

elif perm.reduce_op == operator.or_: 

for subperm in perm.perms_or_conds: 

yield from self._permission_to_queryset(subperm, root_queryset, user, action) 

elif perm.reduce_op == operator.and_: 

# return intersection of unions 

intersection_querysets = [] 

for subperm in perm.perms_or_conds: 

subperm_querysets = [ 

x for x in self._permission_to_queryset(subperm, root_queryset, user, action) 

if not isinstance(x, EmptyQuerySet) 

] 

if subperm_querysets: 

intersection_querysets.append(functools.reduce(operator.or_, subperm_querysets)) 

else: 

# no queryset returned => the result is False 

intersection_querysets = None 

break 

if intersection_querysets: 

yield functools.reduce(operator.and_, intersection_querysets) 

else: 

log.error('Subconditions are not implemented in .queryset(), expect narrower results') 

else: 

# django model permissions and object permissions are handled in model_filter() function 

# (faster for cases where user is assigned rights directly) 

yield from perm.get_queryset_filters(self, root_queryset, user, action) 

 

def permissions_for_model(self, model_class_or_model): 

if inspect.isclass(model_class_or_model): 

model_class = model_class_or_model 

else: 

model_class = type(model_class_or_model) 

return self.model_permission_map[model_class] 

 

def apply(self, permissions=None, add_django_permissions=None): 

""" 

Sets premissions for a ViewSet class 

:param permissions: If the permissions are set, they are registered upon class decoration 

:param add_django_permissions: if None, use the default, otherwise overwrite the default. 

Makes sense only if permissions parameter is filled as well 

""" 

 

def decorate(viewset_class): 

model_class = getattr(viewset_class, 'model', None) 

if not model_class: 

model_class = getattr(viewset_class, 'queryset').model 

 

if permissions is not None: 

self.set_model_permissions(model_class, permissions, 

force_add_django_permissions=add_django_permissions) 

 

model_queryset_factory = self.create_queryset_factory(model_class, 

getattr(viewset_class, 'get_queryset', None)) 

 

return type('%s_perms' % viewset_class.__name__, (viewset_class,), { 

'permission_classes': (self.get_model_permissions(model_class),), 

'get_queryset': lambda view_set: model_queryset_factory( 

view_set.request.user, 'view', view_set=view_set) 

}) 

 

return decorate 

 

def get_model_permissions(self, model_class): 

condition = self.model_permission_map[model_class] 

 

class _Permission(permissions.BasePermission): 

def has_object_permission(self, request, view, obj): 

return condition.has_object_permission(request, view, obj) 

 

def has_permission(self, request, view): 

return condition.has_permission(request, view) 

 

return _Permission 

 

def create_queryset_factory(self, model_class, base_queryset_getter=None): 

""" 

returns lambda (user, action) => queryset(model_class) 

""" 

 

def model_filter(user, action, view_set=None): 

querysets = [] 

 

if base_queryset_getter is not None and view_set is not None: 

qs = base_queryset_getter(view_set) 

else: 

qs = self.get_base_queryset(model_class) 

 

for partial_qs in self.filtered_model_queryset(model_class, qs, user, action): 

querysets.append(partial_qs) 

querysets = [ 

x for x in querysets if not isinstance(x, EmptyQuerySet) 

] 

if querysets: 

return functools.reduce(operator.or_, querysets).distinct() 

return model_class.objects.none() 

 

return model_filter 

 

def get_base_queryset(self, model_class): 

return self.default_queryset_factory(model_class) 

 

def has_object_permission(self, request, clazz, action, instance): 

instance_permissions = self.permissions_for_model(instance) 

instance_view = DelegatedPermission.DelegatedView(self, clazz, request, action) 

return instance_permissions.has_object_permission(request, instance_view, instance)